commit 102b3ddfb54884caf81aad03d46602ec25728cb8
Author: William Casarin <jb55@jb55.com>
Date: Tue, 25 Mar 2025 08:57:50 -0700
vibe coded work queue
Diffstat:
4 files changed, 218 insertions(+), 0 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -0,0 +1,7 @@
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+version = 4
+
+[[package]]
+name = "jobs"
+version = "0.1.0"
diff --git a/Cargo.toml b/Cargo.toml
@@ -0,0 +1,6 @@
+[package]
+name = "jobs"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
diff --git a/src/jobs.rs b/src/jobs.rs
@@ -0,0 +1,203 @@
+
+use std::sync::{mpsc, Arc, Mutex};
+use std::thread;
+
+// A worker that stores no T data directly.
+struct Worker {
+ id: usize,
+ handle: Option<thread::JoinHandle<()>>,
+}
+
+impl Worker {
+ /// Generic function that spawns a thread capable of processing `Message<T>`.
+ /// This does the same job as `Worker<T>::new` did, but doesn't make `Worker` itself generic.
+ fn spawn_worker_for<T: Send + 'static>(
+ id: usize,
+ receiver: Arc<Mutex<mpsc::Receiver<Message<T>>>>,
+ ) -> Self {
+ let handle = thread::spawn(move || loop {
+ match receiver.lock().expect("lock poisoned").recv() {
+ Ok(Message::NewJob(data, job)) => {
+ println!("Worker {id} got a job; executing on data...");
+ (job)(data);
+ }
+ Ok(Message::Terminate) => {
+ println!("Worker {id} told to terminate.");
+ break;
+ }
+ Err(_) => {
+ println!("Worker {id} failed to receive message; terminating.");
+ break;
+ }
+ }
+ });
+
+ Worker {
+ id,
+ handle: Some(handle),
+ }
+ }
+}
+
+// Our job and message definitions
+type Job<T> = Box<dyn FnOnce(T) + Send + 'static>;
+
+pub enum Message<T> {
+ NewJob(T, Job<T>),
+ Terminate,
+}
+
+// The thread pool is still generic over T
+pub struct ThreadPool<T> {
+ workers: Vec<Worker>,
+ sender: mpsc::Sender<Message<T>>,
+}
+
+impl<T: Send + 'static> ThreadPool<T> {
+ pub fn new(size: usize) -> Self {
+ assert!(size > 0);
+ let (sender, receiver) = mpsc::channel::<Message<T>>();
+ let receiver = Arc::new(Mutex::new(receiver));
+
+ // Create plain `Worker` structs using a generic spawn function
+ let mut workers = Vec::with_capacity(size);
+ for id in 0..size {
+ workers.push(Worker::spawn_worker_for(id, Arc::clone(&receiver)));
+ }
+
+ Self { workers, sender }
+ }
+
+ pub fn execute<F>(&self, data: T, f: F)
+ where
+ F: FnOnce(T) + Send + 'static,
+ {
+ let job = Box::new(f) as Job<T>;
+ self.sender
+ .send(Message::NewJob(data, job))
+ .expect("Failed to send job to worker.");
+ }
+}
+
+impl<T> Drop for ThreadPool<T> {
+ fn drop(&mut self) {
+ // Ask each worker to terminate
+ for _ in &self.workers {
+ let _ = self.sender.send(Message::Terminate);
+ }
+
+ // Join them
+ for worker in &mut self.workers {
+ if let Some(handle) = worker.handle.take() {
+ handle.join().expect("Worker thread panicked.");
+ }
+ }
+ }
+}
+
+// Example usage
+fn main() {
+ let pool = ThreadPool::new(4);
+
+ for i in 0..8 {
+ pool.execute(i, move |num| {
+ println!("Data {num} processed in a worker thread.");
+ });
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::sync::{mpsc, Arc, Mutex};
+ use std::thread;
+ use std::time::Duration;
+
+ /// Test basic task execution and verify that we can gather all results.
+ #[test]
+ fn test_execute_tasks() {
+ let pool = ThreadPool::new(4);
+ let (tx, rx) = mpsc::channel();
+
+ // Send 8 jobs to the thread pool. Each job sends back its data via tx.
+ for i in 0..8 {
+ let tx_clone = tx.clone();
+ pool.execute(i, move |num| {
+ // Simulate some work
+ thread::sleep(Duration::from_millis(10));
+ tx_clone.send(num).expect("Failed to send result");
+ });
+ }
+
+ // Gather results
+ let mut results = Vec::new();
+ for _ in 0..8 {
+ results.push(rx.recv().expect("Failed to receive result"));
+ }
+ results.sort();
+ assert_eq!(results, (0..8).collect::<Vec<_>>());
+ }
+
+ /// Test that the thread pool does not allow a size of 0.
+ /// This test should panic due to the assert!(size > 0) in `ThreadPool::new`.
+ #[test]
+ #[should_panic(expected = "size > 0")]
+ fn test_zero_size_pool_should_panic() {
+ let _ = ThreadPool::<i32>::new(0);
+ }
+
+ /// Test dropping the pool after submitting jobs.
+ /// If everything is correct, the workers will gracefully terminate.
+ #[test]
+ fn test_dropping_pool() {
+ let pool = ThreadPool::new(2);
+ let (tx, rx) = mpsc::channel();
+
+ for i in 0..4 {
+ let tx_clone = tx.clone();
+ pool.execute(i, move |num| {
+ // Simulate work
+ thread::sleep(Duration::from_millis(5));
+ tx_clone.send(num).unwrap();
+ });
+ }
+
+ // Scope so pool goes out of scope and triggers drop after sending messages
+ drop(pool);
+
+ // We should still be able to receive all 4 messages
+ let mut results = vec![];
+ for _ in 0..4 {
+ results.push(rx.recv().expect("Failed to receive result"));
+ }
+ results.sort();
+ assert_eq!(results, vec![0, 1, 2, 3]);
+ }
+
+ /// A test that ensures multiple data types can be handled.
+ /// In reality, we just need T: Send + 'static, but let's be explicit.
+ #[test]
+ fn test_execute_string_tasks() {
+ let pool = ThreadPool::new(2);
+ let (tx, rx) = mpsc::channel();
+
+ let strings = vec!["alpha", "beta", "gamma", "delta"];
+ for s in strings {
+ let tx_clone = tx.clone();
+ pool.execute(s.to_string(), move |st: String| {
+ // Just pass the string back
+ tx_clone.send(st).unwrap();
+ });
+ }
+
+ // Collect results
+ let mut results = Vec::new();
+ for _ in 0..4 {
+ let r = rx.recv().unwrap();
+ results.push(r);
+ }
+ results.sort();
+ assert_eq!(results, vec!["alpha", "beta", "delta", "gamma"]);
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
@@ -0,0 +1,2 @@
+
+mod jobs;