nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

protected_queue.h (6409B)


      1 /*
      2  *    This header file provides a thread-safe queue implementation for generic
      3  *    data elements. It uses POSIX threads (pthreads) to ensure thread safety.
      4  *    The queue allows for pushing and popping elements, with the ability to
      5  *    block or non-block on pop operations. Users are responsible for providing
      6  *    memory for the queue buffer and ensuring its correct lifespan.
      7  *
      8  *         Author:  William Casarin
      9  *         Inspired-by: https://github.com/hoytech/hoytech-cpp/blob/master/hoytech/protected_queue.h
     10  */
     11 
     12 #ifndef PROT_QUEUE_H
     13 #define PROT_QUEUE_H
     14 
     15 #include <pthread.h>
     16 #include <stdbool.h>
     17 #include <stddef.h>
     18 #include <string.h>
     19 #include "cursor.h"
     20 #include "util.h"
     21 
     22 /* 
     23  * The prot_queue structure represents a thread-safe queue that can hold
     24  * generic data elements.
     25  */
     26 struct prot_queue {
     27 	unsigned char *buf;
     28 	size_t buflen;
     29 
     30 	int head;
     31 	int tail;
     32 	int count;
     33 	int elem_size;
     34 
     35 	pthread_mutex_t mutex;
     36 	pthread_cond_t cond;
     37 };
     38 
     39 
     40 /* 
     41  * Initialize the queue. 
     42  * Params:
     43  * q         - Pointer to the queue.
     44  * buf       - Buffer for holding data elements.
     45  * buflen    - Length of the buffer.
     46  * elem_size - Size of each data element.
     47  * Returns 1 if successful, 0 otherwise.
     48  */
     49 static inline int prot_queue_init(struct prot_queue* q, void* buf,
     50 				  size_t buflen, int elem_size)
     51 {
     52 	// buffer elements must fit nicely in the buffer
     53 	if (buflen == 0 || buflen % elem_size != 0)
     54 		assert(!"queue elements don't fit nicely");
     55 
     56 	q->head = 0;
     57 	q->tail = 0;
     58 	q->count = 0;
     59 	q->buf = buf;
     60 	q->buflen = buflen;
     61 	q->elem_size = elem_size;
     62 
     63 	pthread_mutex_init(&q->mutex, NULL);
     64 	pthread_cond_init(&q->cond, NULL);
     65 
     66 	return 1;
     67 }
     68 
     69 /* 
     70  * Return the capacity of the queue.
     71  * q    - Pointer to the queue.
     72  */
     73 static inline size_t prot_queue_capacity(struct prot_queue *q) {
     74 	return q->buflen / q->elem_size;
     75 }
     76 
     77 /* 
     78  * Push an element onto the queue.
     79  * Params:
     80  * q    - Pointer to the queue.
     81  * data - Pointer to the data element to be pushed.
     82  *
     83  * Returns 1 if successful, 0 if the queue is full.
     84  */
     85 static int prot_queue_push(struct prot_queue* q, void *data)
     86 {
     87 	int cap;
     88 
     89 	pthread_mutex_lock(&q->mutex);
     90 
     91 	cap = prot_queue_capacity(q);
     92 	if (q->count == cap) {
     93 		// only signal if the push was sucessful
     94 		pthread_mutex_unlock(&q->mutex);
     95 		return 0;
     96 	}
     97 
     98 	memcpy(&q->buf[q->tail * q->elem_size], data, q->elem_size);
     99 	q->tail = (q->tail + 1) % cap;
    100 	q->count++;
    101 
    102 	pthread_cond_signal(&q->cond);
    103 	pthread_mutex_unlock(&q->mutex);
    104 
    105 	return 1;
    106 }
    107 
    108 /*
    109  * Push multiple elements onto the queue.
    110  * Params:
    111  * q      - Pointer to the queue.
    112  * data   - Pointer to the data elements to be pushed.
    113  * count  - Number of elements to push.
    114  *
    115  * Returns the number of elements successfully pushed, 0 if the queue is full or if there is not enough contiguous space.
    116  */
    117 static int prot_queue_push_all(struct prot_queue* q, void *data, int count)
    118 {
    119 	int cap;
    120 	int first_copy_count, second_copy_count;
    121 
    122 	pthread_mutex_lock(&q->mutex);
    123 
    124 	cap = prot_queue_capacity(q);
    125 	if (q->count + count > cap) {
    126 		pthread_mutex_unlock(&q->mutex);
    127 		return 0; // Return failure if the queue is full
    128 	}
    129 
    130 	first_copy_count = min(count, cap - q->tail); // Elements until the end of the buffer
    131 	second_copy_count = count - first_copy_count; // Remaining elements if wrap around
    132 
    133 	memcpy(&q->buf[q->tail * q->elem_size], data, first_copy_count * q->elem_size);
    134 	q->tail = (q->tail + first_copy_count) % cap;
    135 
    136 	if (second_copy_count > 0) {
    137 		// If there is a wrap around, copy the remaining elements
    138 		memcpy(&q->buf[q->tail * q->elem_size], (char *)data + first_copy_count * q->elem_size, second_copy_count * q->elem_size);
    139 		q->tail = (q->tail + second_copy_count) % cap;
    140 	}
    141 
    142 	q->count += count;
    143 
    144 	pthread_cond_signal(&q->cond); // Signal a waiting thread
    145 	pthread_mutex_unlock(&q->mutex);
    146 
    147 	return count;
    148 }
    149 
    150 /* 
    151  * Try to pop an element from the queue without blocking.
    152  * Params:
    153  * q    - Pointer to the queue.
    154  * data - Pointer to where the popped data will be stored.
    155  * Returns 1 if successful, 0 if the queue is empty.
    156  */
    157 static inline int prot_queue_try_pop_all(struct prot_queue *q, void *data, int max_items) {
    158 	int items_to_pop, items_until_end;
    159 
    160 	pthread_mutex_lock(&q->mutex);
    161 
    162 	if (q->count == 0) {
    163 		pthread_mutex_unlock(&q->mutex);
    164 		return 0;
    165 	}
    166 
    167 	items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size;
    168 	items_to_pop = min(q->count, max_items);
    169 	items_to_pop = min(items_to_pop, items_until_end);
    170 
    171 	memcpy(data, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size);
    172 	q->head = (q->head + items_to_pop) % prot_queue_capacity(q);
    173 	q->count -= items_to_pop;
    174 
    175 	pthread_mutex_unlock(&q->mutex);
    176 	return items_to_pop;
    177 }
    178 
    179 /* 
    180  * Wait until we have elements, and then pop multiple elements from the queue
    181  * up to the specified maximum.
    182  *
    183  * Params:
    184  * q		 - Pointer to the queue.
    185  * buffer	 - Pointer to the buffer where popped data will be stored.
    186  * max_items - Maximum number of items to pop from the queue.
    187  * Returns the actual number of items popped.
    188  */
    189 static int prot_queue_pop_all(struct prot_queue *q, void *dest, int max_items) {
    190 	pthread_mutex_lock(&q->mutex);
    191 
    192 	// Wait until there's at least one item to pop
    193 	while (q->count == 0) {
    194 		pthread_cond_wait(&q->cond, &q->mutex);
    195 	}
    196 
    197 	int items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size;
    198 	int items_to_pop = min(q->count, max_items);
    199 	items_to_pop = min(items_to_pop, items_until_end);
    200 
    201 	memcpy(dest, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size);
    202 	q->head = (q->head + items_to_pop) % prot_queue_capacity(q);
    203 	q->count -= items_to_pop;
    204 
    205 	pthread_mutex_unlock(&q->mutex);
    206 
    207 	return items_to_pop;
    208 }
    209 
    210 /* 
    211  * Pop an element from the queue. Blocks if the queue is empty.
    212  * Params:
    213  * q    - Pointer to the queue.
    214  * data - Pointer to where the popped data will be stored.
    215  */
    216 static inline void prot_queue_pop(struct prot_queue *q, void *data) {
    217 	pthread_mutex_lock(&q->mutex);
    218 
    219 	while (q->count == 0)
    220 		pthread_cond_wait(&q->cond, &q->mutex);
    221 
    222 	memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size);
    223 	q->head = (q->head + 1) % prot_queue_capacity(q);
    224 	q->count--;
    225 
    226 	pthread_mutex_unlock(&q->mutex);
    227 }
    228 
    229 /* 
    230  * Destroy the queue. Releases resources associated with the queue.
    231  * Params:
    232  * q - Pointer to the queue.
    233  */
    234 static inline void prot_queue_destroy(struct prot_queue* q) {
    235 	pthread_mutex_destroy(&q->mutex);
    236 	pthread_cond_destroy(&q->cond);
    237 }
    238 
    239 #endif // PROT_QUEUE_H