nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

protected_queue.h (6126B)


      1 /*
      2  *    This header file provides a thread-safe queue implementation for generic
      3  *    data elements. It uses POSIX threads (pthreads) to ensure thread safety.
      4  *    The queue allows for pushing and popping elements, with the ability to
      5  *    block or non-block on pop operations. Users are responsible for providing
      6  *    memory for the queue buffer and ensuring its correct lifespan.
      7  *
      8  *         Author:  William Casarin
      9  *         Inspired-by: https://github.com/hoytech/hoytech-cpp/blob/master/hoytech/protected_queue.h
     10  */
     11 
     12 #ifndef PROT_QUEUE_H
     13 #define PROT_QUEUE_H
     14 
     15 #include <pthread.h>
     16 #include <stdbool.h>
     17 #include <stddef.h>
     18 #include <string.h>
     19 #include "cursor.h"
     20 #include "util.h"
     21 
     22 #define BUFFER_SIZE 100
     23 
     24 /* 
     25  * The prot_queue structure represents a thread-safe queue that can hold
     26  * generic data elements.
     27  */
     28 struct prot_queue {
     29 	unsigned char *buf;
     30 	size_t buflen;
     31 
     32 	int head;
     33 	int tail;
     34 	int count;
     35 	int elem_size;
     36 
     37 	pthread_mutex_t mutex;
     38 	pthread_cond_t cond;
     39 };
     40 
     41 
     42 /* 
     43  * Initialize the queue. 
     44  * Params:
     45  * q         - Pointer to the queue.
     46  * buf       - Buffer for holding data elements.
     47  * buflen    - Length of the buffer.
     48  * elem_size - Size of each data element.
     49  * Returns 1 if successful, 0 otherwise.
     50  */
     51 static inline int prot_queue_init(struct prot_queue* q, void* buf,
     52 				  size_t buflen, int elem_size)
     53 {
     54 	// buffer elements must fit nicely in the buffer
     55 	if (buflen == 0 || buflen % elem_size != 0)
     56 		return 0;
     57 
     58 	q->head = 0;
     59 	q->tail = 0;
     60 	q->count = 0;
     61 	q->buf = buf;
     62 	q->buflen = buflen;
     63 	q->elem_size = elem_size;
     64 
     65 	pthread_mutex_init(&q->mutex, NULL);
     66 	pthread_cond_init(&q->cond, NULL);
     67 
     68 	return 1;
     69 }
     70 
     71 /* 
     72  * Return the capacity of the queue.
     73  * q    - Pointer to the queue.
     74  */
     75 static inline size_t prot_queue_capacity(struct prot_queue *q) {
     76 	return q->buflen / q->elem_size;
     77 }
     78 
     79 /* 
     80  * Push an element onto the queue.
     81  * Params:
     82  * q    - Pointer to the queue.
     83  * data - Pointer to the data element to be pushed.
     84  *
     85  * Returns 1 if successful, 0 if the queue is full.
     86  */
     87 static int prot_queue_push(struct prot_queue* q, void *data)
     88 {
     89 	int cap;
     90 
     91 	pthread_mutex_lock(&q->mutex);
     92 
     93 	cap = prot_queue_capacity(q);
     94 	if (q->count == cap) {
     95 		// only signal if the push was sucessful
     96 		pthread_mutex_unlock(&q->mutex);
     97 		return 0;
     98 	}
     99 
    100 	memcpy(&q->buf[q->tail * q->elem_size], data, q->elem_size);
    101 	q->tail = (q->tail + 1) % cap;
    102 	q->count++;
    103 
    104 	pthread_cond_signal(&q->cond);
    105 	pthread_mutex_unlock(&q->mutex);
    106 
    107 	return 1;
    108 }
    109 
    110 /*
    111  * Push multiple elements onto the queue.
    112  * Params:
    113  * q      - Pointer to the queue.
    114  * data   - Pointer to the data elements to be pushed.
    115  * count  - Number of elements to push.
    116  *
    117  * Returns the number of elements successfully pushed, 0 if the queue is full or if there is not enough contiguous space.
    118  */
    119 static int prot_queue_push_all(struct prot_queue* q, void *data, int count)
    120 {
    121 	int cap;
    122 	int first_copy_count, second_copy_count;
    123 
    124 	pthread_mutex_lock(&q->mutex);
    125 
    126 	cap = prot_queue_capacity(q);
    127 	if (q->count + count > cap) {
    128 		pthread_mutex_unlock(&q->mutex);
    129 		return 0; // Return failure if the queue is full
    130 	}
    131 
    132 	first_copy_count = min(count, cap - q->tail); // Elements until the end of the buffer
    133 	second_copy_count = count - first_copy_count; // Remaining elements if wrap around
    134 
    135 	memcpy(&q->buf[q->tail * q->elem_size], data, first_copy_count * q->elem_size);
    136 	q->tail = (q->tail + first_copy_count) % cap;
    137 
    138 	if (second_copy_count > 0) {
    139 		// If there is a wrap around, copy the remaining elements
    140 		memcpy(&q->buf[q->tail * q->elem_size], (char *)data + first_copy_count * q->elem_size, second_copy_count * q->elem_size);
    141 		q->tail = (q->tail + second_copy_count) % cap;
    142 	}
    143 
    144 	q->count += count;
    145 
    146 	pthread_cond_signal(&q->cond); // Signal a waiting thread
    147 	pthread_mutex_unlock(&q->mutex);
    148 
    149 	return count;
    150 }
    151 
    152 /* 
    153  * Try to pop an element from the queue without blocking.
    154  * Params:
    155  * q    - Pointer to the queue.
    156  * data - Pointer to where the popped data will be stored.
    157  * Returns 1 if successful, 0 if the queue is empty.
    158  */
    159 static inline int prot_queue_try_pop(struct prot_queue *q, void *data) {
    160 	pthread_mutex_lock(&q->mutex);
    161 
    162 	if (q->count == 0) {
    163 		pthread_mutex_unlock(&q->mutex);
    164 		return 0;
    165 	}
    166 
    167 	memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size);
    168 	q->head = (q->head + 1) % prot_queue_capacity(q);
    169 	q->count--;
    170 
    171 	pthread_mutex_unlock(&q->mutex);
    172 	return 1;
    173 }
    174 
    175 /* 
    176  * Wait until we have elements, and then pop multiple elements from the queue
    177  * up to the specified maximum.
    178  *
    179  * Params:
    180  * q		 - Pointer to the queue.
    181  * buffer	 - Pointer to the buffer where popped data will be stored.
    182  * max_items - Maximum number of items to pop from the queue.
    183  * Returns the actual number of items popped.
    184  */
    185 static int prot_queue_pop_all(struct prot_queue *q, void *dest, int max_items) {
    186 	pthread_mutex_lock(&q->mutex);
    187 
    188 	// Wait until there's at least one item to pop
    189 	while (q->count == 0) {
    190 		pthread_cond_wait(&q->cond, &q->mutex);
    191 	}
    192 
    193 	int items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size;
    194 	int items_to_pop = min(q->count, max_items);
    195 	items_to_pop = min(items_to_pop, items_until_end);
    196 
    197 	memcpy(dest, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size);
    198 	q->head = (q->head + items_to_pop) % prot_queue_capacity(q);
    199 	q->count -= items_to_pop;
    200 
    201 	pthread_mutex_unlock(&q->mutex);
    202 
    203 	return items_to_pop;
    204 }
    205 
    206 /* 
    207  * Pop an element from the queue. Blocks if the queue is empty.
    208  * Params:
    209  * q    - Pointer to the queue.
    210  * data - Pointer to where the popped data will be stored.
    211  */
    212 static inline void prot_queue_pop(struct prot_queue *q, void *data) {
    213 	pthread_mutex_lock(&q->mutex);
    214 
    215 	while (q->count == 0)
    216 		pthread_cond_wait(&q->cond, &q->mutex);
    217 
    218 	memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size);
    219 	q->head = (q->head + 1) % prot_queue_capacity(q);
    220 	q->count--;
    221 
    222 	pthread_mutex_unlock(&q->mutex);
    223 }
    224 
    225 /* 
    226  * Destroy the queue. Releases resources associated with the queue.
    227  * Params:
    228  * q - Pointer to the queue.
    229  */
    230 static inline void prot_queue_destroy(struct prot_queue* q) {
    231 	pthread_mutex_destroy(&q->mutex);
    232 	pthread_cond_destroy(&q->cond);
    233 }
    234 
    235 #endif // PROT_QUEUE_H