lib.rs (15672B)
1 use enostr::Pubkey; 2 use nostrdb::Note; 3 use rustc_hash::FxHashMap; 4 use std::thread; 5 use std::time::{Duration, Instant}; 6 7 use crossbeam_channel as chan; 8 9 use nostrdb::{Filter, Ndb, Transaction}; 10 use notedeck::{AppContext, AppResponse}; 11 12 use chrono::{Datelike, TimeZone, Utc}; 13 14 mod chart; 15 mod sparkline; 16 mod ui; 17 18 // ---------------------- 19 // Worker protocol 20 // ---------------------- 21 22 #[derive(Debug)] 23 enum WorkerCmd { 24 Refresh, 25 //Quit, 26 } 27 28 // Buckets are multiples of time ranges 29 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 30 pub enum Period { 31 Daily, 32 Weekly, 33 Monthly, 34 } 35 36 impl Period { 37 pub const ALL: [Period; 3] = [Period::Daily, Period::Weekly, Period::Monthly]; 38 39 pub fn label(self) -> &'static str { 40 match self { 41 Period::Daily => "day", 42 Period::Weekly => "week", 43 Period::Monthly => "month", 44 } 45 } 46 } 47 48 /// All the data we are interested in for a specific range 49 #[derive(Default, Clone, Debug)] 50 struct Bucket { 51 pub total: u64, 52 pub kinds: rustc_hash::FxHashMap<u64, u32>, 53 pub clients: rustc_hash::FxHashMap<String, u32>, 54 pub kind1_authors: rustc_hash::FxHashMap<Pubkey, u32>, 55 } 56 57 fn note_client_tag<'a>(note: &Note<'a>) -> Option<&'a str> { 58 for tag in note.tags() { 59 if tag.count() < 2 { 60 continue; 61 } 62 63 let Some("client") = tag.get_str(0) else { 64 continue; 65 }; 66 67 return tag.get_str(1); 68 } 69 70 None 71 } 72 73 impl Bucket { 74 #[inline(always)] 75 pub fn bump(&mut self, note: &Note<'_>) { 76 self.total += 1; 77 let kind = note.kind(); 78 *self.kinds.entry(kind as u64).or_default() += 1; 79 80 // Track kind1 authors 81 if kind == 1 { 82 let pk = Pubkey::new(*note.pubkey()); 83 *self.kind1_authors.entry(pk).or_default() += 1; 84 } 85 86 if let Some(client) = note_client_tag(note) { 87 *self.clients.entry(client.to_string()).or_default() += 1; 88 } else { 89 // TODO(jb55): client fingerprinting ? 90 } 91 } 92 } 93 94 // bucket_end_ts(idx) - self.bucket_size_secs 95 #[derive(Debug, Clone, Default)] 96 struct RollingCache { 97 pub bucket_size_secs: i64, 98 pub anchor_end_ts: i64, 99 pub buckets: Vec<Bucket>, 100 } 101 102 impl RollingCache { 103 pub fn bucket_end_ts(&self, idx: usize) -> i64 { 104 self.anchor_end_ts - (idx as i64) * self.bucket_size_secs 105 } 106 107 pub fn bucket_start_ts(&self, idx: usize) -> i64 { 108 self.bucket_end_ts(idx) - self.bucket_size_secs 109 } 110 111 pub fn daily(now_ts: i64, days: usize) -> Self { 112 let day_anchor = next_midnight_utc(now_ts); 113 114 Self { 115 bucket_size_secs: 86_400, 116 anchor_end_ts: day_anchor, 117 buckets: vec![Bucket::default(); days], 118 } 119 } 120 121 pub fn weekly(now_ts: i64, weeks: usize, week_starts_monday: bool) -> Self { 122 let anchor_end_ts = next_week_boundary_utc(now_ts, week_starts_monday); 123 Self { 124 bucket_size_secs: 7 * 86_400, 125 anchor_end_ts, 126 buckets: vec![Bucket::default(); weeks], 127 } 128 } 129 130 // “month-ish” (30d buckets) but aligned so bucket 0 ends at the next month boundary 131 pub fn monthly_30d(now_ts: i64, months: usize) -> Self { 132 let anchor_end_ts = next_month_boundary_utc(now_ts); 133 Self { 134 bucket_size_secs: 30 * 86_400, 135 anchor_end_ts, 136 buckets: vec![Bucket::default(); months], 137 } 138 } 139 140 #[inline(always)] 141 pub fn bump(&mut self, note: &Note<'_>) { 142 let ts = note.created_at() as i64; 143 144 // bucket windows are [end-(i+1)*size, end-i*size) 145 // so treat `end` itself as "future" 146 let delta = (self.anchor_end_ts - 1) - ts; 147 148 if delta < 0 { 149 return; // ignore future timestamps 150 } 151 152 let idx = (delta / self.bucket_size_secs) as usize; 153 if idx >= self.buckets.len() { 154 return; // outside window 155 } 156 157 self.buckets[idx].bump(note); 158 } 159 } 160 161 #[derive(Clone, Debug, Default)] 162 struct DashboardState { 163 total: Bucket, 164 daily: RollingCache, 165 weekly: RollingCache, 166 monthly: RollingCache, 167 } 168 169 #[derive(Debug, Clone)] 170 struct Snapshot { 171 started_at: Instant, 172 snapshot_at: Instant, 173 state: DashboardState, 174 } 175 176 #[derive(Debug)] 177 enum WorkerMsg { 178 Snapshot(Snapshot), 179 Finished { 180 started_at: Instant, 181 finished_at: Instant, 182 state: DashboardState, 183 }, 184 Failed { 185 started_at: Instant, 186 finished_at: Instant, 187 error: String, 188 }, 189 } 190 191 // ---------------------- 192 // Dashboard (single pass, single worker) 193 // ---------------------- 194 195 pub struct Dashboard { 196 initialized: bool, 197 198 // Worker channels 199 cmd_tx: Option<chan::Sender<WorkerCmd>>, 200 msg_rx: Option<chan::Receiver<WorkerMsg>>, 201 202 // Refresh policy 203 refresh_every: Duration, 204 next_tick: Instant, 205 206 // Global UI controls 207 period: Period, 208 209 // UI state (progressively filled via snapshots) 210 running: bool, 211 212 last_started: Option<Instant>, 213 last_snapshot: Option<Instant>, 214 last_finished: Option<Instant>, 215 last_duration: Option<Duration>, 216 last_error: Option<String>, 217 218 state: DashboardState, 219 } 220 221 impl Default for Dashboard { 222 fn default() -> Self { 223 Self { 224 initialized: false, 225 226 period: Period::Weekly, 227 228 cmd_tx: None, 229 msg_rx: None, 230 231 refresh_every: Duration::from_secs(300), 232 next_tick: Instant::now(), 233 234 running: false, 235 last_started: None, 236 last_snapshot: None, 237 last_finished: None, 238 last_duration: None, 239 last_error: None, 240 241 state: DashboardState::default(), 242 } 243 } 244 } 245 246 impl notedeck::App for Dashboard { 247 fn update(&mut self, ctx: &mut AppContext<'_>, egui_ctx: &egui::Context) { 248 if !self.initialized { 249 self.initialized = true; 250 self.init(egui_ctx.clone(), ctx); 251 } 252 253 self.process_worker_msgs(); 254 self.schedule_refresh(); 255 } 256 257 fn render(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse { 258 self.show(ui, ctx); 259 AppResponse::none() 260 } 261 } 262 263 impl Dashboard { 264 fn selected_cache(&self) -> &RollingCache { 265 match self.period { 266 Period::Daily => &self.state.daily, 267 Period::Weekly => &self.state.weekly, 268 Period::Monthly => &self.state.monthly, 269 } 270 } 271 272 fn init(&mut self, egui_ctx: egui::Context, ctx: &mut AppContext<'_>) { 273 // spawn single worker thread and keep it alive 274 let (cmd_tx, cmd_rx) = chan::unbounded::<WorkerCmd>(); 275 let (msg_tx, msg_rx) = chan::unbounded::<WorkerMsg>(); 276 277 self.cmd_tx = Some(cmd_tx.clone()); 278 self.msg_rx = Some(msg_rx); 279 280 // Clone the DB handle into the worker thread (Ndb is typically cheap/cloneable) 281 let ndb = ctx.ndb.clone(); 282 283 spawn_worker(egui_ctx, ndb, cmd_rx, msg_tx); 284 285 // kick the first run immediately 286 let _ = cmd_tx.send(WorkerCmd::Refresh); 287 self.running = true; 288 self.last_error = None; 289 self.last_started = Some(Instant::now()); 290 self.last_snapshot = None; 291 self.last_finished = None; 292 self.last_duration = None; 293 self.state = DashboardState::default(); 294 } 295 296 fn process_worker_msgs(&mut self) { 297 let Some(rx) = &self.msg_rx else { return }; 298 299 let mut got_any = false; 300 301 while let Ok(msg) = rx.try_recv() { 302 got_any = true; 303 match msg { 304 WorkerMsg::Snapshot(s) => { 305 self.running = true; 306 self.last_started = Some(s.started_at); 307 self.last_snapshot = Some(s.snapshot_at); 308 self.last_error = None; 309 310 self.state = s.state; 311 } 312 WorkerMsg::Finished { 313 started_at, 314 finished_at, 315 state, 316 } => { 317 self.running = false; 318 self.last_started = Some(started_at); 319 self.last_snapshot = Some(finished_at); 320 self.last_finished = Some(finished_at); 321 self.last_duration = Some(finished_at.saturating_duration_since(started_at)); 322 self.last_error = None; 323 324 self.state = state; 325 } 326 WorkerMsg::Failed { 327 started_at, 328 finished_at, 329 error, 330 } => { 331 self.running = false; 332 self.last_started = Some(started_at); 333 self.last_snapshot = Some(finished_at); 334 self.last_finished = Some(finished_at); 335 self.last_duration = Some(finished_at.saturating_duration_since(started_at)); 336 self.last_error = Some(error); 337 } 338 } 339 } 340 341 if got_any { 342 // No-op; we already requested repaint on every message. 343 } 344 } 345 346 fn schedule_refresh(&mut self) { 347 // throttle scheduling checks a bit 348 let now = Instant::now(); 349 if now < self.next_tick { 350 return; 351 } 352 self.next_tick = now + Duration::from_millis(200); 353 354 if self.running { 355 return; 356 } 357 358 // refresh every 30 seconds from the last finished time (or from init) 359 let last = self 360 .last_finished 361 .or(self.last_started) 362 .unwrap_or_else(Instant::now); 363 364 if now.saturating_duration_since(last) >= self.refresh_every 365 && let Some(tx) = &self.cmd_tx 366 { 367 // reset UI fields for progressive load, but keep old values visible until snapshots arrive 368 self.running = true; 369 self.last_error = None; 370 self.last_started = Some(now); 371 self.last_snapshot = None; 372 self.last_finished = None; 373 self.last_duration = None; 374 self.state = DashboardState::default(); 375 376 let _ = tx.send(WorkerCmd::Refresh); 377 } 378 } 379 380 fn show(&mut self, ui: &mut egui::Ui, ctx: &mut AppContext<'_>) { 381 crate::ui::dashboard_ui(self, ui, ctx); 382 } 383 } 384 385 // ---------------------- 386 // Worker side (single pass, periodic snapshots) 387 // ---------------------- 388 389 fn spawn_worker( 390 ctx: egui::Context, 391 ndb: Ndb, 392 cmd_rx: chan::Receiver<WorkerCmd>, 393 msg_tx: chan::Sender<WorkerMsg>, 394 ) { 395 thread::Builder::new() 396 .name("dashboard-worker".to_owned()) 397 .spawn(move || { 398 let mut should_quit = false; 399 400 while !should_quit { 401 match cmd_rx.recv() { 402 Ok(WorkerCmd::Refresh) => { 403 let started_at = Instant::now(); 404 405 match materialize_single_pass(&ctx, &ndb, &msg_tx, started_at) { 406 Ok(state) => { 407 let _ = msg_tx.send(WorkerMsg::Finished { 408 started_at, 409 finished_at: Instant::now(), 410 state, 411 }); 412 } 413 Err(e) => { 414 let _ = msg_tx.send(WorkerMsg::Failed { 415 started_at, 416 finished_at: Instant::now(), 417 error: format!("{e:?}"), 418 }); 419 } 420 } 421 } 422 Err(_) => { 423 should_quit = true; 424 } 425 } 426 } 427 }) 428 .expect("failed to spawn dashboard worker thread"); 429 } 430 431 struct Acc { 432 last_emit: Instant, 433 434 state: DashboardState, 435 } 436 437 fn materialize_single_pass( 438 ctx: &egui::Context, 439 ndb: &Ndb, 440 msg_tx: &chan::Sender<WorkerMsg>, 441 started_at: Instant, 442 ) -> Result<DashboardState, nostrdb::Error> { 443 // one transaction per refresh run 444 let txn = Transaction::new(ndb)?; 445 446 // all notes 447 let filters = vec![Filter::new_with_capacity(1).build()]; 448 449 let days = 14; 450 let weeks = 12; 451 let months = 12; 452 let week_starts_monday = true; 453 454 let now = Utc::now().timestamp(); 455 456 let mut acc = Acc { 457 last_emit: Instant::now(), 458 state: DashboardState { 459 total: Bucket::default(), 460 daily: RollingCache::daily(now, days), 461 weekly: RollingCache::weekly(now, weeks, week_starts_monday), 462 monthly: RollingCache::monthly_30d(now, months), 463 }, 464 }; 465 466 let emit_every = Duration::from_millis(32); 467 468 let _ = ndb.fold(&txn, &filters, &mut acc, |acc, note| { 469 acc.state.total.bump(¬e); 470 acc.state.daily.bump(¬e); 471 acc.state.weekly.bump(¬e); 472 acc.state.monthly.bump(¬e); 473 474 let now = Instant::now(); 475 if now.saturating_duration_since(acc.last_emit) >= emit_every { 476 acc.last_emit = now; 477 478 let _ = msg_tx.send(WorkerMsg::Snapshot(Snapshot { 479 started_at, 480 snapshot_at: now, 481 state: acc.state.clone(), 482 })); 483 484 ctx.request_repaint(); 485 } 486 487 acc 488 }); 489 490 Ok(acc.state) 491 } 492 493 fn next_midnight_utc(now_ts: i64) -> i64 { 494 let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap(); 495 let tomorrow = dt.date_naive().succ_opt().unwrap(); 496 Utc.from_utc_datetime(&tomorrow.and_hms_opt(0, 0, 0).unwrap()) 497 .timestamp() 498 } 499 500 fn next_week_boundary_utc(now_ts: i64, starts_monday: bool) -> i64 { 501 let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap(); 502 let today = dt.date_naive(); 503 504 let start = if starts_monday { 505 chrono::Weekday::Mon 506 } else { 507 chrono::Weekday::Sun 508 }; 509 let weekday = today.weekday(); 510 511 // days until next week start (0..6); if today is start, boundary is next week start (7 days) 512 let mut delta = 513 (7 + (start.num_days_from_monday() as i32) - (weekday.num_days_from_monday() as i32)) % 7; 514 if delta == 0 { 515 delta = 7; 516 } 517 518 let next = today + chrono::Duration::days(delta as i64); 519 Utc.from_utc_datetime(&next.and_hms_opt(0, 0, 0).unwrap()) 520 .timestamp() 521 } 522 523 fn next_month_boundary_utc(now_ts: i64) -> i64 { 524 let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap(); 525 let y = dt.year(); 526 let m = dt.month(); 527 528 let (ny, nm) = if m == 12 { (y + 1, 1) } else { (y, m + 1) }; 529 Utc.with_ymd_and_hms(ny, nm, 1, 0, 0, 0) 530 .single() 531 .unwrap() 532 .timestamp() 533 } 534 535 fn top_kinds_over(cache: &RollingCache, limit: usize) -> Vec<(u64, u64)> { 536 let mut agg: FxHashMap<u64, u64> = Default::default(); 537 538 for b in &cache.buckets { 539 for (kind, count) in &b.kinds { 540 *agg.entry(*kind).or_default() += *count as u64; 541 } 542 } 543 544 let mut v: Vec<_> = agg.into_iter().collect(); 545 v.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0))); 546 v.truncate(limit); 547 v 548 } 549 550 pub(crate) fn top_kind1_authors_over(cache: &RollingCache, limit: usize) -> Vec<(Pubkey, u64)> { 551 let mut agg: FxHashMap<Pubkey, u64> = Default::default(); 552 for b in &cache.buckets { 553 for (pubkey, count) in &b.kind1_authors { 554 *agg.entry(*pubkey).or_default() += *count as u64; 555 } 556 } 557 let mut v: Vec<_> = agg.into_iter().collect(); 558 v.sort_unstable_by(|a, b| b.1.cmp(&a.1)); 559 v.truncate(limit); 560 v 561 }