lib.rs (15673B)
1 use enostr::Pubkey; 2 use nostrdb::Note; 3 use rustc_hash::FxHashMap; 4 use std::thread; 5 use std::time::{Duration, Instant}; 6 7 use crossbeam_channel as chan; 8 9 use nostrdb::{Filter, Ndb, Transaction}; 10 use notedeck::{AppContext, AppResponse, try_process_events_core}; 11 12 use chrono::{Datelike, TimeZone, Utc}; 13 14 mod chart; 15 mod sparkline; 16 mod ui; 17 18 // ---------------------- 19 // Worker protocol 20 // ---------------------- 21 22 #[derive(Debug)] 23 enum WorkerCmd { 24 Refresh, 25 //Quit, 26 } 27 28 // Buckets are multiples of time ranges 29 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 30 pub enum Period { 31 Daily, 32 Weekly, 33 Monthly, 34 } 35 36 impl Period { 37 pub const ALL: [Period; 3] = [Period::Daily, Period::Weekly, Period::Monthly]; 38 39 pub fn label(self) -> &'static str { 40 match self { 41 Period::Daily => "day", 42 Period::Weekly => "week", 43 Period::Monthly => "month", 44 } 45 } 46 } 47 48 /// All the data we are interested in for a specific range 49 #[derive(Default, Clone, Debug)] 50 struct Bucket { 51 pub total: u64, 52 pub kinds: rustc_hash::FxHashMap<u64, u32>, 53 pub clients: rustc_hash::FxHashMap<String, u32>, 54 pub kind1_authors: rustc_hash::FxHashMap<Pubkey, u32>, 55 } 56 57 fn note_client_tag<'a>(note: &Note<'a>) -> Option<&'a str> { 58 for tag in note.tags() { 59 if tag.count() < 2 { 60 continue; 61 } 62 63 let Some("client") = tag.get_str(0) else { 64 continue; 65 }; 66 67 return tag.get_str(1); 68 } 69 70 None 71 } 72 73 impl Bucket { 74 #[inline(always)] 75 pub fn bump(&mut self, note: &Note<'_>) { 76 self.total += 1; 77 let kind = note.kind(); 78 *self.kinds.entry(kind as u64).or_default() += 1; 79 80 // Track kind1 authors 81 if kind == 1 { 82 let pk = Pubkey::new(*note.pubkey()); 83 *self.kind1_authors.entry(pk).or_default() += 1; 84 } 85 86 if let Some(client) = note_client_tag(note) { 87 *self.clients.entry(client.to_string()).or_default() += 1; 88 } else { 89 // TODO(jb55): client fingerprinting ? 90 } 91 } 92 } 93 94 // bucket_end_ts(idx) - self.bucket_size_secs 95 #[derive(Debug, Clone, Default)] 96 struct RollingCache { 97 pub bucket_size_secs: i64, 98 pub anchor_end_ts: i64, 99 pub buckets: Vec<Bucket>, 100 } 101 102 impl RollingCache { 103 pub fn bucket_end_ts(&self, idx: usize) -> i64 { 104 self.anchor_end_ts - (idx as i64) * self.bucket_size_secs 105 } 106 107 pub fn bucket_start_ts(&self, idx: usize) -> i64 { 108 self.bucket_end_ts(idx) - self.bucket_size_secs 109 } 110 111 pub fn daily(now_ts: i64, days: usize) -> Self { 112 let day_anchor = next_midnight_utc(now_ts); 113 114 Self { 115 bucket_size_secs: 86_400, 116 anchor_end_ts: day_anchor, 117 buckets: vec![Bucket::default(); days], 118 } 119 } 120 121 pub fn weekly(now_ts: i64, weeks: usize, week_starts_monday: bool) -> Self { 122 let anchor_end_ts = next_week_boundary_utc(now_ts, week_starts_monday); 123 Self { 124 bucket_size_secs: 7 * 86_400, 125 anchor_end_ts, 126 buckets: vec![Bucket::default(); weeks], 127 } 128 } 129 130 // “month-ish” (30d buckets) but aligned so bucket 0 ends at the next month boundary 131 pub fn monthly_30d(now_ts: i64, months: usize) -> Self { 132 let anchor_end_ts = next_month_boundary_utc(now_ts); 133 Self { 134 bucket_size_secs: 30 * 86_400, 135 anchor_end_ts, 136 buckets: vec![Bucket::default(); months], 137 } 138 } 139 140 #[inline(always)] 141 pub fn bump(&mut self, note: &Note<'_>) { 142 let ts = note.created_at() as i64; 143 144 // bucket windows are [end-(i+1)*size, end-i*size) 145 // so treat `end` itself as "future" 146 let delta = (self.anchor_end_ts - 1) - ts; 147 148 if delta < 0 { 149 return; // ignore future timestamps 150 } 151 152 let idx = (delta / self.bucket_size_secs) as usize; 153 if idx >= self.buckets.len() { 154 return; // outside window 155 } 156 157 self.buckets[idx].bump(note); 158 } 159 } 160 161 #[derive(Clone, Debug, Default)] 162 struct DashboardState { 163 total: Bucket, 164 daily: RollingCache, 165 weekly: RollingCache, 166 monthly: RollingCache, 167 } 168 169 #[derive(Debug, Clone)] 170 struct Snapshot { 171 started_at: Instant, 172 snapshot_at: Instant, 173 state: DashboardState, 174 } 175 176 #[derive(Debug)] 177 enum WorkerMsg { 178 Snapshot(Snapshot), 179 Finished { 180 started_at: Instant, 181 finished_at: Instant, 182 state: DashboardState, 183 }, 184 Failed { 185 started_at: Instant, 186 finished_at: Instant, 187 error: String, 188 }, 189 } 190 191 // ---------------------- 192 // Dashboard (single pass, single worker) 193 // ---------------------- 194 195 pub struct Dashboard { 196 initialized: bool, 197 198 // Worker channels 199 cmd_tx: Option<chan::Sender<WorkerCmd>>, 200 msg_rx: Option<chan::Receiver<WorkerMsg>>, 201 202 // Refresh policy 203 refresh_every: Duration, 204 next_tick: Instant, 205 206 // Global UI controls 207 period: Period, 208 209 // UI state (progressively filled via snapshots) 210 running: bool, 211 212 last_started: Option<Instant>, 213 last_snapshot: Option<Instant>, 214 last_finished: Option<Instant>, 215 last_duration: Option<Duration>, 216 last_error: Option<String>, 217 218 state: DashboardState, 219 } 220 221 impl Default for Dashboard { 222 fn default() -> Self { 223 Self { 224 initialized: false, 225 226 period: Period::Weekly, 227 228 cmd_tx: None, 229 msg_rx: None, 230 231 refresh_every: Duration::from_secs(300), 232 next_tick: Instant::now(), 233 234 running: false, 235 last_started: None, 236 last_snapshot: None, 237 last_finished: None, 238 last_duration: None, 239 last_error: None, 240 241 state: DashboardState::default(), 242 } 243 } 244 } 245 246 impl notedeck::App for Dashboard { 247 fn update(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse { 248 try_process_events_core(ctx, ui.ctx(), |_, _| {}); 249 250 if !self.initialized { 251 self.initialized = true; 252 self.init(ui.ctx().clone(), ctx); 253 } 254 255 self.process_worker_msgs(); 256 self.schedule_refresh(); 257 258 self.show(ui, ctx); 259 260 AppResponse::none() 261 } 262 } 263 264 impl Dashboard { 265 fn selected_cache(&self) -> &RollingCache { 266 match self.period { 267 Period::Daily => &self.state.daily, 268 Period::Weekly => &self.state.weekly, 269 Period::Monthly => &self.state.monthly, 270 } 271 } 272 273 fn init(&mut self, egui_ctx: egui::Context, ctx: &mut AppContext<'_>) { 274 // spawn single worker thread and keep it alive 275 let (cmd_tx, cmd_rx) = chan::unbounded::<WorkerCmd>(); 276 let (msg_tx, msg_rx) = chan::unbounded::<WorkerMsg>(); 277 278 self.cmd_tx = Some(cmd_tx.clone()); 279 self.msg_rx = Some(msg_rx); 280 281 // Clone the DB handle into the worker thread (Ndb is typically cheap/cloneable) 282 let ndb = ctx.ndb.clone(); 283 284 spawn_worker(egui_ctx, ndb, cmd_rx, msg_tx); 285 286 // kick the first run immediately 287 let _ = cmd_tx.send(WorkerCmd::Refresh); 288 self.running = true; 289 self.last_error = None; 290 self.last_started = Some(Instant::now()); 291 self.last_snapshot = None; 292 self.last_finished = None; 293 self.last_duration = None; 294 self.state = DashboardState::default(); 295 } 296 297 fn process_worker_msgs(&mut self) { 298 let Some(rx) = &self.msg_rx else { return }; 299 300 let mut got_any = false; 301 302 while let Ok(msg) = rx.try_recv() { 303 got_any = true; 304 match msg { 305 WorkerMsg::Snapshot(s) => { 306 self.running = true; 307 self.last_started = Some(s.started_at); 308 self.last_snapshot = Some(s.snapshot_at); 309 self.last_error = None; 310 311 self.state = s.state; 312 } 313 WorkerMsg::Finished { 314 started_at, 315 finished_at, 316 state, 317 } => { 318 self.running = false; 319 self.last_started = Some(started_at); 320 self.last_snapshot = Some(finished_at); 321 self.last_finished = Some(finished_at); 322 self.last_duration = Some(finished_at.saturating_duration_since(started_at)); 323 self.last_error = None; 324 325 self.state = state; 326 } 327 WorkerMsg::Failed { 328 started_at, 329 finished_at, 330 error, 331 } => { 332 self.running = false; 333 self.last_started = Some(started_at); 334 self.last_snapshot = Some(finished_at); 335 self.last_finished = Some(finished_at); 336 self.last_duration = Some(finished_at.saturating_duration_since(started_at)); 337 self.last_error = Some(error); 338 } 339 } 340 } 341 342 if got_any { 343 // No-op; we already requested repaint on every message. 344 } 345 } 346 347 fn schedule_refresh(&mut self) { 348 // throttle scheduling checks a bit 349 let now = Instant::now(); 350 if now < self.next_tick { 351 return; 352 } 353 self.next_tick = now + Duration::from_millis(200); 354 355 if self.running { 356 return; 357 } 358 359 // refresh every 30 seconds from the last finished time (or from init) 360 let last = self 361 .last_finished 362 .or(self.last_started) 363 .unwrap_or_else(Instant::now); 364 365 if now.saturating_duration_since(last) >= self.refresh_every 366 && let Some(tx) = &self.cmd_tx 367 { 368 // reset UI fields for progressive load, but keep old values visible until snapshots arrive 369 self.running = true; 370 self.last_error = None; 371 self.last_started = Some(now); 372 self.last_snapshot = None; 373 self.last_finished = None; 374 self.last_duration = None; 375 self.state = DashboardState::default(); 376 377 let _ = tx.send(WorkerCmd::Refresh); 378 } 379 } 380 381 fn show(&mut self, ui: &mut egui::Ui, ctx: &mut AppContext<'_>) { 382 crate::ui::dashboard_ui(self, ui, ctx); 383 } 384 } 385 386 // ---------------------- 387 // Worker side (single pass, periodic snapshots) 388 // ---------------------- 389 390 fn spawn_worker( 391 ctx: egui::Context, 392 ndb: Ndb, 393 cmd_rx: chan::Receiver<WorkerCmd>, 394 msg_tx: chan::Sender<WorkerMsg>, 395 ) { 396 thread::Builder::new() 397 .name("dashboard-worker".to_owned()) 398 .spawn(move || { 399 let mut should_quit = false; 400 401 while !should_quit { 402 match cmd_rx.recv() { 403 Ok(WorkerCmd::Refresh) => { 404 let started_at = Instant::now(); 405 406 match materialize_single_pass(&ctx, &ndb, &msg_tx, started_at) { 407 Ok(state) => { 408 let _ = msg_tx.send(WorkerMsg::Finished { 409 started_at, 410 finished_at: Instant::now(), 411 state, 412 }); 413 } 414 Err(e) => { 415 let _ = msg_tx.send(WorkerMsg::Failed { 416 started_at, 417 finished_at: Instant::now(), 418 error: format!("{e:?}"), 419 }); 420 } 421 } 422 } 423 Err(_) => { 424 should_quit = true; 425 } 426 } 427 } 428 }) 429 .expect("failed to spawn dashboard worker thread"); 430 } 431 432 struct Acc { 433 last_emit: Instant, 434 435 state: DashboardState, 436 } 437 438 fn materialize_single_pass( 439 ctx: &egui::Context, 440 ndb: &Ndb, 441 msg_tx: &chan::Sender<WorkerMsg>, 442 started_at: Instant, 443 ) -> Result<DashboardState, nostrdb::Error> { 444 // one transaction per refresh run 445 let txn = Transaction::new(ndb)?; 446 447 // all notes 448 let filters = vec![Filter::new_with_capacity(1).build()]; 449 450 let days = 14; 451 let weeks = 12; 452 let months = 12; 453 let week_starts_monday = true; 454 455 let now = Utc::now().timestamp(); 456 457 let mut acc = Acc { 458 last_emit: Instant::now(), 459 state: DashboardState { 460 total: Bucket::default(), 461 daily: RollingCache::daily(now, days), 462 weekly: RollingCache::weekly(now, weeks, week_starts_monday), 463 monthly: RollingCache::monthly_30d(now, months), 464 }, 465 }; 466 467 let emit_every = Duration::from_millis(32); 468 469 let _ = ndb.fold(&txn, &filters, &mut acc, |acc, note| { 470 acc.state.total.bump(¬e); 471 acc.state.daily.bump(¬e); 472 acc.state.weekly.bump(¬e); 473 acc.state.monthly.bump(¬e); 474 475 let now = Instant::now(); 476 if now.saturating_duration_since(acc.last_emit) >= emit_every { 477 acc.last_emit = now; 478 479 let _ = msg_tx.send(WorkerMsg::Snapshot(Snapshot { 480 started_at, 481 snapshot_at: now, 482 state: acc.state.clone(), 483 })); 484 485 ctx.request_repaint(); 486 } 487 488 acc 489 }); 490 491 Ok(acc.state) 492 } 493 494 fn next_midnight_utc(now_ts: i64) -> i64 { 495 let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap(); 496 let tomorrow = dt.date_naive().succ_opt().unwrap(); 497 Utc.from_utc_datetime(&tomorrow.and_hms_opt(0, 0, 0).unwrap()) 498 .timestamp() 499 } 500 501 fn next_week_boundary_utc(now_ts: i64, starts_monday: bool) -> i64 { 502 let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap(); 503 let today = dt.date_naive(); 504 505 let start = if starts_monday { 506 chrono::Weekday::Mon 507 } else { 508 chrono::Weekday::Sun 509 }; 510 let weekday = today.weekday(); 511 512 // days until next week start (0..6); if today is start, boundary is next week start (7 days) 513 let mut delta = 514 (7 + (start.num_days_from_monday() as i32) - (weekday.num_days_from_monday() as i32)) % 7; 515 if delta == 0 { 516 delta = 7; 517 } 518 519 let next = today + chrono::Duration::days(delta as i64); 520 Utc.from_utc_datetime(&next.and_hms_opt(0, 0, 0).unwrap()) 521 .timestamp() 522 } 523 524 fn next_month_boundary_utc(now_ts: i64) -> i64 { 525 let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap(); 526 let y = dt.year(); 527 let m = dt.month(); 528 529 let (ny, nm) = if m == 12 { (y + 1, 1) } else { (y, m + 1) }; 530 Utc.with_ymd_and_hms(ny, nm, 1, 0, 0, 0) 531 .single() 532 .unwrap() 533 .timestamp() 534 } 535 536 fn top_kinds_over(cache: &RollingCache, limit: usize) -> Vec<(u64, u64)> { 537 let mut agg: FxHashMap<u64, u64> = Default::default(); 538 539 for b in &cache.buckets { 540 for (kind, count) in &b.kinds { 541 *agg.entry(*kind).or_default() += *count as u64; 542 } 543 } 544 545 let mut v: Vec<_> = agg.into_iter().collect(); 546 v.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0))); 547 v.truncate(limit); 548 v 549 } 550 551 pub(crate) fn top_kind1_authors_over(cache: &RollingCache, limit: usize) -> Vec<(Pubkey, u64)> { 552 let mut agg: FxHashMap<Pubkey, u64> = Default::default(); 553 for b in &cache.buckets { 554 for (pubkey, count) in &b.kind1_authors { 555 *agg.entry(*pubkey).or_default() += *count as u64; 556 } 557 } 558 let mut v: Vec<_> = agg.into_iter().collect(); 559 v.sort_unstable_by(|a, b| b.1.cmp(&a.1)); 560 v.truncate(limit); 561 v 562 }