nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

protected_queue.h (6491B)


      1 /*
      2  *    This header file provides a thread-safe queue implementation for generic
      3  *    data elements. It uses POSIX threads (pthreads) to ensure thread safety.
      4  *    The queue allows for pushing and popping elements, with the ability to
      5  *    block or non-block on pop operations. Users are responsible for providing
      6  *    memory for the queue buffer and ensuring its correct lifespan.
      7  *
      8  *         Author:  William Casarin
      9  *         Inspired-by: https://github.com/hoytech/hoytech-cpp/blob/master/hoytech/protected_queue.h
     10  */
     11 
     12 #ifndef PROT_QUEUE_H
     13 #define PROT_QUEUE_H
     14 
     15 #include <stdbool.h>
     16 #include <stddef.h>
     17 #include <string.h>
     18 #include "cursor.h"
     19 #include "util.h"
     20 #include "thread.h"
     21 
     22 #define max(a,b) ((a) > (b) ? (a) : (b))
     23 #define min(a,b) ((a) < (b) ? (a) : (b))
     24 
     25 /* 
     26  * The prot_queue structure represents a thread-safe queue that can hold
     27  * generic data elements.
     28  */
     29 struct prot_queue {
     30 	unsigned char *buf;
     31 	size_t buflen;
     32 
     33 	int head;
     34 	int tail;
     35 	int count;
     36 	int elem_size;
     37 
     38 	pthread_mutex_t mutex;
     39 	pthread_cond_t cond;
     40 };
     41 
     42 
     43 /* 
     44  * Initialize the queue. 
     45  * Params:
     46  * q         - Pointer to the queue.
     47  * buf       - Buffer for holding data elements.
     48  * buflen    - Length of the buffer.
     49  * elem_size - Size of each data element.
     50  * Returns 1 if successful, 0 otherwise.
     51  */
     52 static inline int prot_queue_init(struct prot_queue* q, void* buf,
     53 				  size_t buflen, int elem_size)
     54 {
     55 	// buffer elements must fit nicely in the buffer
     56 	if (buflen == 0 || buflen % elem_size != 0)
     57 		assert(!"queue elements don't fit nicely");
     58 
     59 	q->head = 0;
     60 	q->tail = 0;
     61 	q->count = 0;
     62 	q->buf = buf;
     63 	q->buflen = buflen;
     64 	q->elem_size = elem_size;
     65 
     66 	pthread_mutex_init(&q->mutex, NULL);
     67 	pthread_cond_init(&q->cond, NULL);
     68 
     69 	return 1;
     70 }
     71 
     72 /* 
     73  * Return the capacity of the queue.
     74  * q    - Pointer to the queue.
     75  */
     76 static inline size_t prot_queue_capacity(struct prot_queue *q) {
     77 	return q->buflen / q->elem_size;
     78 }
     79 
     80 /* 
     81  * Push an element onto the queue.
     82  * Params:
     83  * q    - Pointer to the queue.
     84  * data - Pointer to the data element to be pushed.
     85  *
     86  * Returns 1 if successful, 0 if the queue is full.
     87  */
     88 static int prot_queue_push(struct prot_queue* q, void *data)
     89 {
     90 	int cap;
     91 
     92 	pthread_mutex_lock(&q->mutex);
     93 
     94 	cap = prot_queue_capacity(q);
     95 	if (q->count == cap) {
     96 		// only signal if the push was sucessful
     97 		pthread_mutex_unlock(&q->mutex);
     98 		return 0;
     99 	}
    100 
    101 	memcpy(&q->buf[q->tail * q->elem_size], data, q->elem_size);
    102 	q->tail = (q->tail + 1) % cap;
    103 	q->count++;
    104 
    105 	pthread_cond_signal(&q->cond);
    106 	pthread_mutex_unlock(&q->mutex);
    107 
    108 	return 1;
    109 }
    110 
    111 /*
    112  * Push multiple elements onto the queue.
    113  * Params:
    114  * q      - Pointer to the queue.
    115  * data   - Pointer to the data elements to be pushed.
    116  * count  - Number of elements to push.
    117  *
    118  * Returns the number of elements successfully pushed, 0 if the queue is full or if there is not enough contiguous space.
    119  */
    120 static int prot_queue_push_all(struct prot_queue* q, void *data, int count)
    121 {
    122 	int cap;
    123 	int first_copy_count, second_copy_count;
    124 
    125 	pthread_mutex_lock(&q->mutex);
    126 
    127 	cap = prot_queue_capacity(q);
    128 	if (q->count + count > cap) {
    129 		pthread_mutex_unlock(&q->mutex);
    130 		return 0; // Return failure if the queue is full
    131 	}
    132 
    133 	first_copy_count = min(count, cap - q->tail); // Elements until the end of the buffer
    134 	second_copy_count = count - first_copy_count; // Remaining elements if wrap around
    135 
    136 	memcpy(&q->buf[q->tail * q->elem_size], data, first_copy_count * q->elem_size);
    137 	q->tail = (q->tail + first_copy_count) % cap;
    138 
    139 	if (second_copy_count > 0) {
    140 		// If there is a wrap around, copy the remaining elements
    141 		memcpy(&q->buf[q->tail * q->elem_size], (char *)data + first_copy_count * q->elem_size, second_copy_count * q->elem_size);
    142 		q->tail = (q->tail + second_copy_count) % cap;
    143 	}
    144 
    145 	q->count += count;
    146 
    147 	pthread_cond_signal(&q->cond); // Signal a waiting thread
    148 	pthread_mutex_unlock(&q->mutex);
    149 
    150 	return count;
    151 }
    152 
    153 /* 
    154  * Try to pop an element from the queue without blocking.
    155  * Params:
    156  * q    - Pointer to the queue.
    157  * data - Pointer to where the popped data will be stored.
    158  * Returns 1 if successful, 0 if the queue is empty.
    159  */
    160 static inline int prot_queue_try_pop_all(struct prot_queue *q, void *data, int max_items) {
    161 	int items_to_pop, items_until_end;
    162 
    163 	pthread_mutex_lock(&q->mutex);
    164 
    165 	if (q->count == 0) {
    166 		pthread_mutex_unlock(&q->mutex);
    167 		return 0;
    168 	}
    169 
    170 	items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size;
    171 	items_to_pop = min(q->count, max_items);
    172 	items_to_pop = min(items_to_pop, items_until_end);
    173 
    174 	memcpy(data, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size);
    175 	q->head = (q->head + items_to_pop) % prot_queue_capacity(q);
    176 	q->count -= items_to_pop;
    177 
    178 	pthread_mutex_unlock(&q->mutex);
    179 	return items_to_pop;
    180 }
    181 
    182 /* 
    183  * Wait until we have elements, and then pop multiple elements from the queue
    184  * up to the specified maximum.
    185  *
    186  * Params:
    187  * q		 - Pointer to the queue.
    188  * buffer	 - Pointer to the buffer where popped data will be stored.
    189  * max_items - Maximum number of items to pop from the queue.
    190  * Returns the actual number of items popped.
    191  */
    192 static int prot_queue_pop_all(struct prot_queue *q, void *dest, int max_items) {
    193 	pthread_mutex_lock(&q->mutex);
    194 
    195 	// Wait until there's at least one item to pop
    196 	while (q->count == 0) {
    197 		pthread_cond_wait(&q->cond, &q->mutex);
    198 	}
    199 
    200 	int items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size;
    201 	int items_to_pop = min(q->count, max_items);
    202 	items_to_pop = min(items_to_pop, items_until_end);
    203 
    204 	memcpy(dest, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size);
    205 	q->head = (q->head + items_to_pop) % prot_queue_capacity(q);
    206 	q->count -= items_to_pop;
    207 
    208 	pthread_mutex_unlock(&q->mutex);
    209 
    210 	return items_to_pop;
    211 }
    212 
    213 /* 
    214  * Pop an element from the queue. Blocks if the queue is empty.
    215  * Params:
    216  * q    - Pointer to the queue.
    217  * data - Pointer to where the popped data will be stored.
    218  */
    219 static inline void prot_queue_pop(struct prot_queue *q, void *data) {
    220 	pthread_mutex_lock(&q->mutex);
    221 
    222 	while (q->count == 0)
    223 		pthread_cond_wait(&q->cond, &q->mutex);
    224 
    225 	memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size);
    226 	q->head = (q->head + 1) % prot_queue_capacity(q);
    227 	q->count--;
    228 
    229 	pthread_mutex_unlock(&q->mutex);
    230 }
    231 
    232 /* 
    233  * Destroy the queue. Releases resources associated with the queue.
    234  * Params:
    235  * q - Pointer to the queue.
    236  */
    237 static inline void prot_queue_destroy(struct prot_queue* q) {
    238 	pthread_mutex_destroy(&q->mutex);
    239 	pthread_cond_destroy(&q->cond);
    240 }
    241 
    242 #endif // PROT_QUEUE_H