threadpool.h (2272B)
1 2 #ifndef THREADPOOL_H 3 #define THREADPOOL_H 4 5 #include "protected_queue.h" 6 7 struct thread 8 { 9 pthread_t thread_id; 10 struct prot_queue inbox; 11 void *qmem; 12 void *ctx; 13 }; 14 15 struct threadpool 16 { 17 int num_threads; 18 struct thread *pool; 19 int next_thread; 20 void *quit_msg; 21 }; 22 23 static int threadpool_init(struct threadpool *tp, int num_threads, 24 int q_elem_size, int q_num_elems, 25 void *quit_msg, void *ctx, void* (*thread_fn)(void*)) 26 { 27 int i; 28 struct thread *t; 29 30 if (num_threads <= 0) 31 return 0; 32 33 tp->num_threads = num_threads; 34 tp->pool = malloc(sizeof(*tp->pool) * num_threads); 35 tp->quit_msg = quit_msg; 36 tp->next_thread = -1; 37 38 if (tp->pool == NULL) { 39 fprintf(stderr, "threadpool_init: couldn't allocate memory for pool"); 40 return 0; 41 } 42 43 for (i = 0; i < num_threads; i++) { 44 t = &tp->pool[i]; 45 t->qmem = malloc(q_elem_size * q_num_elems); 46 t->ctx = ctx; 47 48 if (t->qmem == NULL) { 49 fprintf(stderr, "threadpool_init: couldn't allocate memory for queue"); 50 return 0; 51 } 52 53 if (!prot_queue_init(&t->inbox, t->qmem, q_elem_size * q_num_elems, q_elem_size)) { 54 fprintf(stderr, "threadpool_init: couldn't init queue. buffer alignment is wrong."); 55 return 0; 56 } 57 58 if (pthread_create(&t->thread_id, NULL, thread_fn, t) != 0) { 59 fprintf(stderr, "threadpool_init: failed to create thread\n"); 60 return 0; 61 } 62 } 63 64 return 1; 65 } 66 67 static inline struct thread *threadpool_next_thread(struct threadpool *tp) 68 { 69 tp->next_thread = (tp->next_thread + 1) % tp->num_threads; 70 return &tp->pool[tp->next_thread]; 71 } 72 73 static inline int threadpool_dispatch(struct threadpool *tp, void *msg) 74 { 75 struct thread *t = threadpool_next_thread(tp); 76 return prot_queue_push(&t->inbox, msg); 77 } 78 79 static inline int threadpool_dispatch_all(struct threadpool *tp, void *msgs, 80 int num_msgs) 81 { 82 struct thread *t = threadpool_next_thread(tp); 83 return prot_queue_push_all(&t->inbox, msgs, num_msgs); 84 } 85 86 static inline void threadpool_destroy(struct threadpool *tp) 87 { 88 struct thread *t; 89 90 for (int i = 0; i < tp->num_threads; i++) { 91 t = &tp->pool[i]; 92 if (!prot_queue_push(&t->inbox, tp->quit_msg)) { 93 pthread_exit(&t->thread_id); 94 } else { 95 pthread_join(t->thread_id, NULL); 96 } 97 prot_queue_destroy(&t->inbox); 98 free(t->qmem); 99 } 100 free(tp->pool); 101 } 102 103 #endif // THREADPOOL_H