notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

lib.rs (15672B)


      1 use enostr::Pubkey;
      2 use nostrdb::Note;
      3 use rustc_hash::FxHashMap;
      4 use std::thread;
      5 use std::time::{Duration, Instant};
      6 
      7 use crossbeam_channel as chan;
      8 
      9 use nostrdb::{Filter, Ndb, Transaction};
     10 use notedeck::{AppContext, AppResponse};
     11 
     12 use chrono::{Datelike, TimeZone, Utc};
     13 
     14 mod chart;
     15 mod sparkline;
     16 mod ui;
     17 
     18 // ----------------------
     19 // Worker protocol
     20 // ----------------------
     21 
     22 #[derive(Debug)]
     23 enum WorkerCmd {
     24     Refresh,
     25     //Quit,
     26 }
     27 
     28 // Buckets are multiples of time ranges
     29 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
     30 pub enum Period {
     31     Daily,
     32     Weekly,
     33     Monthly,
     34 }
     35 
     36 impl Period {
     37     pub const ALL: [Period; 3] = [Period::Daily, Period::Weekly, Period::Monthly];
     38 
     39     pub fn label(self) -> &'static str {
     40         match self {
     41             Period::Daily => "day",
     42             Period::Weekly => "week",
     43             Period::Monthly => "month",
     44         }
     45     }
     46 }
     47 
     48 /// All the data we are interested in for a specific range
     49 #[derive(Default, Clone, Debug)]
     50 struct Bucket {
     51     pub total: u64,
     52     pub kinds: rustc_hash::FxHashMap<u64, u32>,
     53     pub clients: rustc_hash::FxHashMap<String, u32>,
     54     pub kind1_authors: rustc_hash::FxHashMap<Pubkey, u32>,
     55 }
     56 
     57 fn note_client_tag<'a>(note: &Note<'a>) -> Option<&'a str> {
     58     for tag in note.tags() {
     59         if tag.count() < 2 {
     60             continue;
     61         }
     62 
     63         let Some("client") = tag.get_str(0) else {
     64             continue;
     65         };
     66 
     67         return tag.get_str(1);
     68     }
     69 
     70     None
     71 }
     72 
     73 impl Bucket {
     74     #[inline(always)]
     75     pub fn bump(&mut self, note: &Note<'_>) {
     76         self.total += 1;
     77         let kind = note.kind();
     78         *self.kinds.entry(kind as u64).or_default() += 1;
     79 
     80         // Track kind1 authors
     81         if kind == 1 {
     82             let pk = Pubkey::new(*note.pubkey());
     83             *self.kind1_authors.entry(pk).or_default() += 1;
     84         }
     85 
     86         if let Some(client) = note_client_tag(note) {
     87             *self.clients.entry(client.to_string()).or_default() += 1;
     88         } else {
     89             // TODO(jb55): client fingerprinting ?
     90         }
     91     }
     92 }
     93 
     94 // bucket_end_ts(idx) - self.bucket_size_secs
     95 #[derive(Debug, Clone, Default)]
     96 struct RollingCache {
     97     pub bucket_size_secs: i64,
     98     pub anchor_end_ts: i64,
     99     pub buckets: Vec<Bucket>,
    100 }
    101 
    102 impl RollingCache {
    103     pub fn bucket_end_ts(&self, idx: usize) -> i64 {
    104         self.anchor_end_ts - (idx as i64) * self.bucket_size_secs
    105     }
    106 
    107     pub fn bucket_start_ts(&self, idx: usize) -> i64 {
    108         self.bucket_end_ts(idx) - self.bucket_size_secs
    109     }
    110 
    111     pub fn daily(now_ts: i64, days: usize) -> Self {
    112         let day_anchor = next_midnight_utc(now_ts);
    113 
    114         Self {
    115             bucket_size_secs: 86_400,
    116             anchor_end_ts: day_anchor,
    117             buckets: vec![Bucket::default(); days],
    118         }
    119     }
    120 
    121     pub fn weekly(now_ts: i64, weeks: usize, week_starts_monday: bool) -> Self {
    122         let anchor_end_ts = next_week_boundary_utc(now_ts, week_starts_monday);
    123         Self {
    124             bucket_size_secs: 7 * 86_400,
    125             anchor_end_ts,
    126             buckets: vec![Bucket::default(); weeks],
    127         }
    128     }
    129 
    130     // “month-ish” (30d buckets) but aligned so bucket 0 ends at the next month boundary
    131     pub fn monthly_30d(now_ts: i64, months: usize) -> Self {
    132         let anchor_end_ts = next_month_boundary_utc(now_ts);
    133         Self {
    134             bucket_size_secs: 30 * 86_400,
    135             anchor_end_ts,
    136             buckets: vec![Bucket::default(); months],
    137         }
    138     }
    139 
    140     #[inline(always)]
    141     pub fn bump(&mut self, note: &Note<'_>) {
    142         let ts = note.created_at() as i64;
    143 
    144         // bucket windows are [end-(i+1)*size, end-i*size)
    145         // so treat `end` itself as "future"
    146         let delta = (self.anchor_end_ts - 1) - ts;
    147 
    148         if delta < 0 {
    149             return; // ignore future timestamps
    150         }
    151 
    152         let idx = (delta / self.bucket_size_secs) as usize;
    153         if idx >= self.buckets.len() {
    154             return; // outside window
    155         }
    156 
    157         self.buckets[idx].bump(note);
    158     }
    159 }
    160 
    161 #[derive(Clone, Debug, Default)]
    162 struct DashboardState {
    163     total: Bucket,
    164     daily: RollingCache,
    165     weekly: RollingCache,
    166     monthly: RollingCache,
    167 }
    168 
    169 #[derive(Debug, Clone)]
    170 struct Snapshot {
    171     started_at: Instant,
    172     snapshot_at: Instant,
    173     state: DashboardState,
    174 }
    175 
    176 #[derive(Debug)]
    177 enum WorkerMsg {
    178     Snapshot(Snapshot),
    179     Finished {
    180         started_at: Instant,
    181         finished_at: Instant,
    182         state: DashboardState,
    183     },
    184     Failed {
    185         started_at: Instant,
    186         finished_at: Instant,
    187         error: String,
    188     },
    189 }
    190 
    191 // ----------------------
    192 // Dashboard (single pass, single worker)
    193 // ----------------------
    194 
    195 pub struct Dashboard {
    196     initialized: bool,
    197 
    198     // Worker channels
    199     cmd_tx: Option<chan::Sender<WorkerCmd>>,
    200     msg_rx: Option<chan::Receiver<WorkerMsg>>,
    201 
    202     // Refresh policy
    203     refresh_every: Duration,
    204     next_tick: Instant,
    205 
    206     // Global UI controls
    207     period: Period,
    208 
    209     // UI state (progressively filled via snapshots)
    210     running: bool,
    211 
    212     last_started: Option<Instant>,
    213     last_snapshot: Option<Instant>,
    214     last_finished: Option<Instant>,
    215     last_duration: Option<Duration>,
    216     last_error: Option<String>,
    217 
    218     state: DashboardState,
    219 }
    220 
    221 impl Default for Dashboard {
    222     fn default() -> Self {
    223         Self {
    224             initialized: false,
    225 
    226             period: Period::Weekly,
    227 
    228             cmd_tx: None,
    229             msg_rx: None,
    230 
    231             refresh_every: Duration::from_secs(300),
    232             next_tick: Instant::now(),
    233 
    234             running: false,
    235             last_started: None,
    236             last_snapshot: None,
    237             last_finished: None,
    238             last_duration: None,
    239             last_error: None,
    240 
    241             state: DashboardState::default(),
    242         }
    243     }
    244 }
    245 
    246 impl notedeck::App for Dashboard {
    247     fn update(&mut self, ctx: &mut AppContext<'_>, egui_ctx: &egui::Context) {
    248         if !self.initialized {
    249             self.initialized = true;
    250             self.init(egui_ctx.clone(), ctx);
    251         }
    252 
    253         self.process_worker_msgs();
    254         self.schedule_refresh();
    255     }
    256 
    257     fn render(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse {
    258         self.show(ui, ctx);
    259         AppResponse::none()
    260     }
    261 }
    262 
    263 impl Dashboard {
    264     fn selected_cache(&self) -> &RollingCache {
    265         match self.period {
    266             Period::Daily => &self.state.daily,
    267             Period::Weekly => &self.state.weekly,
    268             Period::Monthly => &self.state.monthly,
    269         }
    270     }
    271 
    272     fn init(&mut self, egui_ctx: egui::Context, ctx: &mut AppContext<'_>) {
    273         // spawn single worker thread and keep it alive
    274         let (cmd_tx, cmd_rx) = chan::unbounded::<WorkerCmd>();
    275         let (msg_tx, msg_rx) = chan::unbounded::<WorkerMsg>();
    276 
    277         self.cmd_tx = Some(cmd_tx.clone());
    278         self.msg_rx = Some(msg_rx);
    279 
    280         // Clone the DB handle into the worker thread (Ndb is typically cheap/cloneable)
    281         let ndb = ctx.ndb.clone();
    282 
    283         spawn_worker(egui_ctx, ndb, cmd_rx, msg_tx);
    284 
    285         // kick the first run immediately
    286         let _ = cmd_tx.send(WorkerCmd::Refresh);
    287         self.running = true;
    288         self.last_error = None;
    289         self.last_started = Some(Instant::now());
    290         self.last_snapshot = None;
    291         self.last_finished = None;
    292         self.last_duration = None;
    293         self.state = DashboardState::default();
    294     }
    295 
    296     fn process_worker_msgs(&mut self) {
    297         let Some(rx) = &self.msg_rx else { return };
    298 
    299         let mut got_any = false;
    300 
    301         while let Ok(msg) = rx.try_recv() {
    302             got_any = true;
    303             match msg {
    304                 WorkerMsg::Snapshot(s) => {
    305                     self.running = true;
    306                     self.last_started = Some(s.started_at);
    307                     self.last_snapshot = Some(s.snapshot_at);
    308                     self.last_error = None;
    309 
    310                     self.state = s.state;
    311                 }
    312                 WorkerMsg::Finished {
    313                     started_at,
    314                     finished_at,
    315                     state,
    316                 } => {
    317                     self.running = false;
    318                     self.last_started = Some(started_at);
    319                     self.last_snapshot = Some(finished_at);
    320                     self.last_finished = Some(finished_at);
    321                     self.last_duration = Some(finished_at.saturating_duration_since(started_at));
    322                     self.last_error = None;
    323 
    324                     self.state = state;
    325                 }
    326                 WorkerMsg::Failed {
    327                     started_at,
    328                     finished_at,
    329                     error,
    330                 } => {
    331                     self.running = false;
    332                     self.last_started = Some(started_at);
    333                     self.last_snapshot = Some(finished_at);
    334                     self.last_finished = Some(finished_at);
    335                     self.last_duration = Some(finished_at.saturating_duration_since(started_at));
    336                     self.last_error = Some(error);
    337                 }
    338             }
    339         }
    340 
    341         if got_any {
    342             // No-op; we already requested repaint on every message.
    343         }
    344     }
    345 
    346     fn schedule_refresh(&mut self) {
    347         // throttle scheduling checks a bit
    348         let now = Instant::now();
    349         if now < self.next_tick {
    350             return;
    351         }
    352         self.next_tick = now + Duration::from_millis(200);
    353 
    354         if self.running {
    355             return;
    356         }
    357 
    358         // refresh every 30 seconds from the last finished time (or from init)
    359         let last = self
    360             .last_finished
    361             .or(self.last_started)
    362             .unwrap_or_else(Instant::now);
    363 
    364         if now.saturating_duration_since(last) >= self.refresh_every
    365             && let Some(tx) = &self.cmd_tx
    366         {
    367             // reset UI fields for progressive load, but keep old values visible until snapshots arrive
    368             self.running = true;
    369             self.last_error = None;
    370             self.last_started = Some(now);
    371             self.last_snapshot = None;
    372             self.last_finished = None;
    373             self.last_duration = None;
    374             self.state = DashboardState::default();
    375 
    376             let _ = tx.send(WorkerCmd::Refresh);
    377         }
    378     }
    379 
    380     fn show(&mut self, ui: &mut egui::Ui, ctx: &mut AppContext<'_>) {
    381         crate::ui::dashboard_ui(self, ui, ctx);
    382     }
    383 }
    384 
    385 // ----------------------
    386 // Worker side (single pass, periodic snapshots)
    387 // ----------------------
    388 
    389 fn spawn_worker(
    390     ctx: egui::Context,
    391     ndb: Ndb,
    392     cmd_rx: chan::Receiver<WorkerCmd>,
    393     msg_tx: chan::Sender<WorkerMsg>,
    394 ) {
    395     thread::Builder::new()
    396         .name("dashboard-worker".to_owned())
    397         .spawn(move || {
    398             let mut should_quit = false;
    399 
    400             while !should_quit {
    401                 match cmd_rx.recv() {
    402                     Ok(WorkerCmd::Refresh) => {
    403                         let started_at = Instant::now();
    404 
    405                         match materialize_single_pass(&ctx, &ndb, &msg_tx, started_at) {
    406                             Ok(state) => {
    407                                 let _ = msg_tx.send(WorkerMsg::Finished {
    408                                     started_at,
    409                                     finished_at: Instant::now(),
    410                                     state,
    411                                 });
    412                             }
    413                             Err(e) => {
    414                                 let _ = msg_tx.send(WorkerMsg::Failed {
    415                                     started_at,
    416                                     finished_at: Instant::now(),
    417                                     error: format!("{e:?}"),
    418                                 });
    419                             }
    420                         }
    421                     }
    422                     Err(_) => {
    423                         should_quit = true;
    424                     }
    425                 }
    426             }
    427         })
    428         .expect("failed to spawn dashboard worker thread");
    429 }
    430 
    431 struct Acc {
    432     last_emit: Instant,
    433 
    434     state: DashboardState,
    435 }
    436 
    437 fn materialize_single_pass(
    438     ctx: &egui::Context,
    439     ndb: &Ndb,
    440     msg_tx: &chan::Sender<WorkerMsg>,
    441     started_at: Instant,
    442 ) -> Result<DashboardState, nostrdb::Error> {
    443     // one transaction per refresh run
    444     let txn = Transaction::new(ndb)?;
    445 
    446     // all notes
    447     let filters = vec![Filter::new_with_capacity(1).build()];
    448 
    449     let days = 14;
    450     let weeks = 12;
    451     let months = 12;
    452     let week_starts_monday = true;
    453 
    454     let now = Utc::now().timestamp();
    455 
    456     let mut acc = Acc {
    457         last_emit: Instant::now(),
    458         state: DashboardState {
    459             total: Bucket::default(),
    460             daily: RollingCache::daily(now, days),
    461             weekly: RollingCache::weekly(now, weeks, week_starts_monday),
    462             monthly: RollingCache::monthly_30d(now, months),
    463         },
    464     };
    465 
    466     let emit_every = Duration::from_millis(32);
    467 
    468     let _ = ndb.fold(&txn, &filters, &mut acc, |acc, note| {
    469         acc.state.total.bump(&note);
    470         acc.state.daily.bump(&note);
    471         acc.state.weekly.bump(&note);
    472         acc.state.monthly.bump(&note);
    473 
    474         let now = Instant::now();
    475         if now.saturating_duration_since(acc.last_emit) >= emit_every {
    476             acc.last_emit = now;
    477 
    478             let _ = msg_tx.send(WorkerMsg::Snapshot(Snapshot {
    479                 started_at,
    480                 snapshot_at: now,
    481                 state: acc.state.clone(),
    482             }));
    483 
    484             ctx.request_repaint();
    485         }
    486 
    487         acc
    488     });
    489 
    490     Ok(acc.state)
    491 }
    492 
    493 fn next_midnight_utc(now_ts: i64) -> i64 {
    494     let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap();
    495     let tomorrow = dt.date_naive().succ_opt().unwrap();
    496     Utc.from_utc_datetime(&tomorrow.and_hms_opt(0, 0, 0).unwrap())
    497         .timestamp()
    498 }
    499 
    500 fn next_week_boundary_utc(now_ts: i64, starts_monday: bool) -> i64 {
    501     let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap();
    502     let today = dt.date_naive();
    503 
    504     let start = if starts_monday {
    505         chrono::Weekday::Mon
    506     } else {
    507         chrono::Weekday::Sun
    508     };
    509     let weekday = today.weekday();
    510 
    511     // days until next week start (0..6); if today is start, boundary is next week start (7 days)
    512     let mut delta =
    513         (7 + (start.num_days_from_monday() as i32) - (weekday.num_days_from_monday() as i32)) % 7;
    514     if delta == 0 {
    515         delta = 7;
    516     }
    517 
    518     let next = today + chrono::Duration::days(delta as i64);
    519     Utc.from_utc_datetime(&next.and_hms_opt(0, 0, 0).unwrap())
    520         .timestamp()
    521 }
    522 
    523 fn next_month_boundary_utc(now_ts: i64) -> i64 {
    524     let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap();
    525     let y = dt.year();
    526     let m = dt.month();
    527 
    528     let (ny, nm) = if m == 12 { (y + 1, 1) } else { (y, m + 1) };
    529     Utc.with_ymd_and_hms(ny, nm, 1, 0, 0, 0)
    530         .single()
    531         .unwrap()
    532         .timestamp()
    533 }
    534 
    535 fn top_kinds_over(cache: &RollingCache, limit: usize) -> Vec<(u64, u64)> {
    536     let mut agg: FxHashMap<u64, u64> = Default::default();
    537 
    538     for b in &cache.buckets {
    539         for (kind, count) in &b.kinds {
    540             *agg.entry(*kind).or_default() += *count as u64;
    541         }
    542     }
    543 
    544     let mut v: Vec<_> = agg.into_iter().collect();
    545     v.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
    546     v.truncate(limit);
    547     v
    548 }
    549 
    550 pub(crate) fn top_kind1_authors_over(cache: &RollingCache, limit: usize) -> Vec<(Pubkey, u64)> {
    551     let mut agg: FxHashMap<Pubkey, u64> = Default::default();
    552     for b in &cache.buckets {
    553         for (pubkey, count) in &b.kind1_authors {
    554             *agg.entry(*pubkey).or_default() += *count as u64;
    555         }
    556     }
    557     let mut v: Vec<_> = agg.into_iter().collect();
    558     v.sort_unstable_by(|a, b| b.1.cmp(&a.1));
    559     v.truncate(limit);
    560     v
    561 }