notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit 3cac808791d191f6c7081e5f207109e94b2945fe
parent 59419fcf8e74476640fd09fd7f15d9f6c9344d0a
Author: kernelkind <kernelkind@gmail.com>
Date:   Tue, 28 Oct 2025 16:32:31 -0400

feat(jobs): mpsc -> mpmc

Signed-off-by: kernelkind <kernelkind@gmail.com>

Diffstat:
Mcrates/notedeck/src/jobs/job_pool.rs | 55++++++++++++++++++++++++-------------------------------
1 file changed, 24 insertions(+), 31 deletions(-)

diff --git a/crates/notedeck/src/jobs/job_pool.rs b/crates/notedeck/src/jobs/job_pool.rs @@ -1,16 +1,11 @@ -use std::{ - future::Future, - sync::{ - mpsc::{self, Sender}, - Arc, Mutex, - }, -}; -use tokio::sync::oneshot; +use crossbeam::channel; +use std::future::Future; +use tokio::sync::oneshot::{self}; type Job = Box<dyn FnOnce() + Send + 'static>; pub struct JobPool { - tx: Sender<Job>, + tx: channel::Sender<Job>, } impl Default for JobPool { @@ -21,25 +16,15 @@ impl Default for JobPool { impl JobPool { pub fn new(num_threads: usize) -> Self { - let (tx, rx) = mpsc::channel::<Job>(); - - // TODO(jb55) why not mpmc here !??? - let arc_rx = Arc::new(Mutex::new(rx)); - for _ in 0..num_threads { - let arc_rx_clone = arc_rx.clone(); - std::thread::spawn(move || loop { - let job = { - let Ok(unlocked) = arc_rx_clone.lock() else { - continue; - }; - let Ok(job) = unlocked.recv() else { - continue; - }; - - job - }; - - job(); + let (tx, rx) = channel::unbounded::<Job>(); + for i in 0..num_threads { + let rx = rx.clone(); + std::thread::spawn(move || { + for job in rx.iter() { + tracing::trace!("Starting job on thread {i}"); + job(); + tracing::trace!("Finished job on thread {i}"); + } }); } @@ -58,9 +43,7 @@ impl JobPool { let _ = tx_result.send(output); }); - self.tx - .send(job) - .expect("receiver should not be deallocated"); + self.push_job(job); async move { rx_result.await.unwrap_or_else(|_| { @@ -68,6 +51,16 @@ impl JobPool { }) } } + + pub fn schedule_no_output(&self, job: impl FnOnce() + Send + 'static) { + self.push_job(Box::new(job)); + } + + fn push_job(&self, job: Job) { + if let Err(e) = self.tx.send(job) { + tracing::error!("job queue closed unexpectedly: {e}"); + } + } } #[cfg(test)]