notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit 025b4d749c41b829d9e5887ba1f7afcca6cf5246
parent 4f542e318029104c43714565fce5946e846dc9cb
Author: kernelkind <kernelkind@gmail.com>
Date:   Tue, 28 Oct 2025 17:45:09 -0400

refactor(jobs): move related jobs things to own module

Signed-off-by: kernelkind <kernelkind@gmail.com>

Diffstat:
Dcrates/notedeck/src/job_pool.rs | 100-------------------------------------------------------------------------------
Dcrates/notedeck/src/jobs.rs | 153-------------------------------------------------------------------------------
Acrates/notedeck/src/jobs/cache.rs | 154+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/notedeck/src/jobs/job_pool.rs | 100+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/notedeck/src/jobs/mod.rs | 7+++++++
Mcrates/notedeck/src/lib.rs | 6++----
6 files changed, 263 insertions(+), 257 deletions(-)

diff --git a/crates/notedeck/src/job_pool.rs b/crates/notedeck/src/job_pool.rs @@ -1,100 +0,0 @@ -use std::{ - future::Future, - sync::{ - mpsc::{self, Sender}, - Arc, Mutex, - }, -}; -use tokio::sync::oneshot; - -type Job = Box<dyn FnOnce() + Send + 'static>; - -pub struct JobPool { - tx: Sender<Job>, -} - -impl Default for JobPool { - fn default() -> Self { - JobPool::new(2) - } -} - -impl JobPool { - pub fn new(num_threads: usize) -> Self { - let (tx, rx) = mpsc::channel::<Job>(); - - // TODO(jb55) why not mpmc here !??? - let arc_rx = Arc::new(Mutex::new(rx)); - for _ in 0..num_threads { - let arc_rx_clone = arc_rx.clone(); - std::thread::spawn(move || loop { - let job = { - let Ok(unlocked) = arc_rx_clone.lock() else { - continue; - }; - let Ok(job) = unlocked.recv() else { - continue; - }; - - job - }; - - job(); - }); - } - - Self { tx } - } - - pub fn schedule<F, T>(&self, job: F) -> impl Future<Output = T> - where - F: FnOnce() -> T + Send + 'static, - T: Send + 'static, - { - let (tx_result, rx_result) = oneshot::channel::<T>(); - - let job = Box::new(move || { - let output = job(); - let _ = tx_result.send(output); - }); - - self.tx - .send(job) - .expect("receiver should not be deallocated"); - - async move { - rx_result.await.unwrap_or_else(|_| { - panic!("Worker thread or channel dropped before returning the result.") - }) - } - } -} - -#[cfg(test)] -mod tests { - use crate::job_pool::JobPool; - - fn test_fn(a: u32, b: u32) -> u32 { - a + b - } - - #[tokio::test] - async fn test() { - let pool = JobPool::default(); - - // Now each job can return different T - let future_str = pool.schedule(|| -> String { "hello from string job".into() }); - - let a = 5; - let b = 6; - let future_int = pool.schedule(move || -> u32 { test_fn(a, b) }); - - println!("(Meanwhile we can do more async work) ..."); - - let s = future_str.await; - let i = future_int.await; - - println!("Got string: {:?}", s); - println!("Got integer: {}", i); - } -} diff --git a/crates/notedeck/src/jobs.rs b/crates/notedeck/src/jobs.rs @@ -1,153 +0,0 @@ -use crate::JobPool; -use egui::TextureHandle; -use hashbrown::{hash_map::RawEntryMut, HashMap}; -use poll_promise::Promise; - -#[derive(Default)] -pub struct JobsCache { - jobs: HashMap<JobIdOwned, JobState>, -} - -pub enum JobState { - Pending(Promise<Option<Result<Job, JobError>>>), - Error(JobError), - Completed(Job), -} - -pub enum JobError { - InvalidParameters, -} - -#[derive(Debug)] -pub enum JobParams<'a> { - Blurhash(BlurhashParams<'a>), -} - -#[derive(Debug)] -pub enum JobParamsOwned { - Blurhash(BlurhashParamsOwned), -} - -impl<'a> From<BlurhashParams<'a>> for BlurhashParamsOwned { - fn from(params: BlurhashParams<'a>) -> Self { - BlurhashParamsOwned { - blurhash: params.blurhash.to_owned(), - url: params.url.to_owned(), - ctx: params.ctx.clone(), - } - } -} - -impl<'a> From<JobParams<'a>> for JobParamsOwned { - fn from(params: JobParams<'a>) -> Self { - match params { - JobParams::Blurhash(bp) => JobParamsOwned::Blurhash(bp.into()), - } - } -} - -#[derive(Debug)] -pub struct BlurhashParams<'a> { - pub blurhash: &'a str, - pub url: &'a str, - pub ctx: &'a egui::Context, -} - -#[derive(Debug)] -pub struct BlurhashParamsOwned { - pub blurhash: String, - pub url: String, - pub ctx: egui::Context, -} - -impl JobsCache { - pub fn get_or_insert_with< - 'a, - F: FnOnce(Option<JobParamsOwned>) -> Result<Job, JobError> + Send + 'static, - >( - &'a mut self, - job_pool: &mut JobPool, - jobid: &JobId, - params: Option<JobParams>, - run_job: F, - ) -> &'a mut JobState { - match self.jobs.raw_entry_mut().from_key(jobid) { - RawEntryMut::Occupied(entry) => 's: { - let mut state = entry.into_mut(); - - let JobState::Pending(promise) = &mut state else { - break 's state; - }; - - let Some(res) = promise.ready_mut() else { - break 's state; - }; - - let Some(res) = res.take() else { - tracing::error!("Failed to take the promise for job: {:?}", jobid); - break 's state; - }; - - *state = match res { - Ok(j) => JobState::Completed(j), - Err(e) => JobState::Error(e), - }; - - state - } - RawEntryMut::Vacant(entry) => { - let owned_params = params.map(JobParams::into); - let wrapped: Box<dyn FnOnce() -> Option<Result<Job, JobError>> + Send + 'static> = - Box::new(move || Some(run_job(owned_params))); - - let promise = Promise::spawn_async(job_pool.schedule(wrapped)); - - let (_, state) = entry.insert(jobid.into(), JobState::Pending(promise)); - - state - } - } - } - - pub fn get(&self, jobid: &JobId) -> Option<&JobState> { - self.jobs.get(jobid) - } -} - -impl<'a> From<&JobId<'a>> for JobIdOwned { - fn from(jobid: &JobId<'a>) -> Self { - match jobid { - JobId::Blurhash(s) => JobIdOwned::Blurhash(s.to_string()), - } - } -} - -impl hashbrown::Equivalent<JobIdOwned> for JobId<'_> { - fn equivalent(&self, key: &JobIdOwned) -> bool { - match (self, key) { - (JobId::Blurhash(a), JobIdOwned::Blurhash(b)) => *a == b.as_str(), - } - } -} - -#[derive(Debug, PartialEq, Eq, Clone, Hash)] -enum JobIdOwned { - Blurhash(String), // image URL -} - -#[derive(Debug, Hash)] -pub enum JobId<'a> { - Blurhash(&'a str), // image URL -} - -pub enum Job { - Blurhash(Option<TextureHandle>), -} - -impl std::fmt::Debug for Job { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - Job::Blurhash(_) => write!(f, "Blurhash"), - } - } -} diff --git a/crates/notedeck/src/jobs/cache.rs b/crates/notedeck/src/jobs/cache.rs @@ -0,0 +1,154 @@ +use egui::TextureHandle; +use hashbrown::{hash_map::RawEntryMut, HashMap}; +use poll_promise::Promise; + +use crate::jobs::JobPool; + +#[derive(Default)] +pub struct JobsCache { + jobs: HashMap<JobIdOwned, JobState>, +} + +pub enum JobState { + Pending(Promise<Option<Result<Job, JobError>>>), + Error(JobError), + Completed(Job), +} + +pub enum JobError { + InvalidParameters, +} + +#[derive(Debug)] +pub enum JobParams<'a> { + Blurhash(BlurhashParams<'a>), +} + +#[derive(Debug)] +pub enum JobParamsOwned { + Blurhash(BlurhashParamsOwned), +} + +impl<'a> From<BlurhashParams<'a>> for BlurhashParamsOwned { + fn from(params: BlurhashParams<'a>) -> Self { + BlurhashParamsOwned { + blurhash: params.blurhash.to_owned(), + url: params.url.to_owned(), + ctx: params.ctx.clone(), + } + } +} + +impl<'a> From<JobParams<'a>> for JobParamsOwned { + fn from(params: JobParams<'a>) -> Self { + match params { + JobParams::Blurhash(bp) => JobParamsOwned::Blurhash(bp.into()), + } + } +} + +#[derive(Debug)] +pub struct BlurhashParams<'a> { + pub blurhash: &'a str, + pub url: &'a str, + pub ctx: &'a egui::Context, +} + +#[derive(Debug)] +pub struct BlurhashParamsOwned { + pub blurhash: String, + pub url: String, + pub ctx: egui::Context, +} + +impl JobsCache { + pub fn get_or_insert_with< + 'a, + F: FnOnce(Option<JobParamsOwned>) -> Result<Job, JobError> + Send + 'static, + >( + &'a mut self, + job_pool: &mut JobPool, + jobid: &JobId, + params: Option<JobParams>, + run_job: F, + ) -> &'a mut JobState { + match self.jobs.raw_entry_mut().from_key(jobid) { + RawEntryMut::Occupied(entry) => 's: { + let mut state = entry.into_mut(); + + let JobState::Pending(promise) = &mut state else { + break 's state; + }; + + let Some(res) = promise.ready_mut() else { + break 's state; + }; + + let Some(res) = res.take() else { + tracing::error!("Failed to take the promise for job: {:?}", jobid); + break 's state; + }; + + *state = match res { + Ok(j) => JobState::Completed(j), + Err(e) => JobState::Error(e), + }; + + state + } + RawEntryMut::Vacant(entry) => { + let owned_params = params.map(JobParams::into); + let wrapped: Box<dyn FnOnce() -> Option<Result<Job, JobError>> + Send + 'static> = + Box::new(move || Some(run_job(owned_params))); + + let promise = Promise::spawn_async(job_pool.schedule(wrapped)); + + let (_, state) = entry.insert(jobid.into(), JobState::Pending(promise)); + + state + } + } + } + + pub fn get(&self, jobid: &JobId) -> Option<&JobState> { + self.jobs.get(jobid) + } +} + +impl<'a> From<&JobId<'a>> for JobIdOwned { + fn from(jobid: &JobId<'a>) -> Self { + match jobid { + JobId::Blurhash(s) => JobIdOwned::Blurhash(s.to_string()), + } + } +} + +impl hashbrown::Equivalent<JobIdOwned> for JobId<'_> { + fn equivalent(&self, key: &JobIdOwned) -> bool { + match (self, key) { + (JobId::Blurhash(a), JobIdOwned::Blurhash(b)) => *a == b.as_str(), + } + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +enum JobIdOwned { + Blurhash(String), // image URL +} + +#[derive(Debug, Hash)] +pub enum JobId<'a> { + Blurhash(&'a str), // image URL +} + +pub enum Job { + Blurhash(Option<TextureHandle>), +} + +impl std::fmt::Debug for Job { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Job::Blurhash(_) => write!(f, "Blurhash"), + } + } +} diff --git a/crates/notedeck/src/jobs/job_pool.rs b/crates/notedeck/src/jobs/job_pool.rs @@ -0,0 +1,100 @@ +use std::{ + future::Future, + sync::{ + mpsc::{self, Sender}, + Arc, Mutex, + }, +}; +use tokio::sync::oneshot; + +type Job = Box<dyn FnOnce() + Send + 'static>; + +pub struct JobPool { + tx: Sender<Job>, +} + +impl Default for JobPool { + fn default() -> Self { + JobPool::new(2) + } +} + +impl JobPool { + pub fn new(num_threads: usize) -> Self { + let (tx, rx) = mpsc::channel::<Job>(); + + // TODO(jb55) why not mpmc here !??? + let arc_rx = Arc::new(Mutex::new(rx)); + for _ in 0..num_threads { + let arc_rx_clone = arc_rx.clone(); + std::thread::spawn(move || loop { + let job = { + let Ok(unlocked) = arc_rx_clone.lock() else { + continue; + }; + let Ok(job) = unlocked.recv() else { + continue; + }; + + job + }; + + job(); + }); + } + + Self { tx } + } + + pub fn schedule<F, T>(&self, job: F) -> impl Future<Output = T> + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let (tx_result, rx_result) = oneshot::channel::<T>(); + + let job = Box::new(move || { + let output = job(); + let _ = tx_result.send(output); + }); + + self.tx + .send(job) + .expect("receiver should not be deallocated"); + + async move { + rx_result.await.unwrap_or_else(|_| { + panic!("Worker thread or channel dropped before returning the result.") + }) + } + } +} + +#[cfg(test)] +mod tests { + use crate::jobs::JobPool; + + fn test_fn(a: u32, b: u32) -> u32 { + a + b + } + + #[tokio::test] + async fn test() { + let pool = JobPool::default(); + + // Now each job can return different T + let future_str = pool.schedule(|| -> String { "hello from string job".into() }); + + let a = 5; + let b = 6; + let future_int = pool.schedule(move || -> u32 { test_fn(a, b) }); + + println!("(Meanwhile we can do more async work) ..."); + + let s = future_str.await; + let i = future_int.await; + + println!("Got string: {:?}", s); + println!("Got integer: {}", i); + } +} diff --git a/crates/notedeck/src/jobs/mod.rs b/crates/notedeck/src/jobs/mod.rs @@ -0,0 +1,7 @@ +mod cache; +mod job_pool; + +pub use cache::{ + BlurhashParams, Job, JobError, JobId, JobParams, JobParamsOwned, JobState, JobsCache, +}; +pub use job_pool::JobPool; diff --git a/crates/notedeck/src/lib.rs b/crates/notedeck/src/lib.rs @@ -13,8 +13,7 @@ pub mod fonts; mod frame_history; pub mod i18n; mod imgcache; -mod job_pool; -mod jobs; +pub mod jobs; pub mod media; mod muted; pub mod name; @@ -59,9 +58,8 @@ pub use imgcache::{ LoadableTextureState, MediaCache, MediaCacheType, RenderState, TextureFrame, TextureState, TexturedImage, TexturesCache, }; -pub use job_pool::JobPool; pub use jobs::{ - BlurhashParams, Job, JobError, JobId, JobParams, JobParamsOwned, JobState, JobsCache, + BlurhashParams, Job, JobError, JobId, JobParams, JobParamsOwned, JobPool, JobState, JobsCache, }; pub use media::{ compute_blurhash, update_imeta_blurhashes, ImageMetadata, ImageType, MediaAction,