nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

threadpool.h (2272B)


      1 
      2 #ifndef THREADPOOL_H
      3 #define THREADPOOL_H
      4 
      5 #include "protected_queue.h"
      6 
      7 struct thread
      8 {
      9 	pthread_t thread_id;
     10 	struct prot_queue inbox;
     11 	void *qmem;
     12 	void *ctx;
     13 };
     14 
     15 struct threadpool
     16 {
     17 	int num_threads;
     18 	struct thread *pool;
     19 	int next_thread;
     20 	void *quit_msg;
     21 };
     22 
     23 static int threadpool_init(struct threadpool *tp, int num_threads,
     24 			   int q_elem_size, int q_num_elems,
     25 			   void *quit_msg, void *ctx, void* (*thread_fn)(void*))
     26 {
     27 	int i;
     28 	struct thread *t;
     29 
     30 	if (num_threads <= 0)
     31 		return 0;
     32 
     33 	tp->num_threads = num_threads;
     34 	tp->pool = malloc(sizeof(*tp->pool) * num_threads);
     35 	tp->quit_msg = quit_msg;
     36 	tp->next_thread = -1;
     37 
     38 	if (tp->pool == NULL) {
     39 		fprintf(stderr, "threadpool_init: couldn't allocate memory for pool");
     40 		return 0;
     41 	}
     42 
     43 	for (i = 0; i < num_threads; i++) {
     44 		t = &tp->pool[i];
     45 		t->qmem = malloc(q_elem_size * q_num_elems);
     46 		t->ctx = ctx;
     47 
     48 		if (t->qmem == NULL) {
     49 			fprintf(stderr, "threadpool_init: couldn't allocate memory for queue");
     50 			return 0;
     51 		}
     52 
     53 		if (!prot_queue_init(&t->inbox, t->qmem, q_elem_size * q_num_elems, q_elem_size)) {
     54 			fprintf(stderr, "threadpool_init: couldn't init queue. buffer alignment is wrong.");
     55 			return 0;
     56 		}
     57 
     58 		if (pthread_create(&t->thread_id, NULL, thread_fn, t) != 0) {
     59 			fprintf(stderr, "threadpool_init: failed to create thread\n");
     60 			return 0;
     61 		}
     62 	}
     63 
     64 	return 1;
     65 }
     66 
     67 static inline struct thread *threadpool_next_thread(struct threadpool *tp)
     68 {
     69 	tp->next_thread = (tp->next_thread + 1) % tp->num_threads;
     70 	return &tp->pool[tp->next_thread];
     71 }
     72 
     73 static inline int threadpool_dispatch(struct threadpool *tp, void *msg)
     74 {
     75 	struct thread *t = threadpool_next_thread(tp);
     76 	return prot_queue_push(&t->inbox, msg);
     77 }
     78 
     79 static inline int threadpool_dispatch_all(struct threadpool *tp, void *msgs,
     80 					  int num_msgs)
     81 {
     82 	struct thread *t = threadpool_next_thread(tp);
     83 	return prot_queue_push_all(&t->inbox, msgs, num_msgs);
     84 }
     85 
     86 static inline void threadpool_destroy(struct threadpool *tp)
     87 {
     88 	struct thread *t;
     89 
     90 	for (int i = 0; i < tp->num_threads; i++) {
     91 		t = &tp->pool[i];
     92 		if (!prot_queue_push(&t->inbox, tp->quit_msg)) {
     93 			pthread_exit(&t->thread_id);
     94 		} else {
     95 			pthread_join(t->thread_id, NULL);
     96 		}
     97 		prot_queue_destroy(&t->inbox);
     98 		free(t->qmem);
     99 	}
    100 	free(tp->pool);
    101 }
    102 
    103 #endif // THREADPOOL_H