protected_queue.h (6491B)
1 /* 2 * This header file provides a thread-safe queue implementation for generic 3 * data elements. It uses POSIX threads (pthreads) to ensure thread safety. 4 * The queue allows for pushing and popping elements, with the ability to 5 * block or non-block on pop operations. Users are responsible for providing 6 * memory for the queue buffer and ensuring its correct lifespan. 7 * 8 * Author: William Casarin 9 * Inspired-by: https://github.com/hoytech/hoytech-cpp/blob/master/hoytech/protected_queue.h 10 */ 11 12 #ifndef PROT_QUEUE_H 13 #define PROT_QUEUE_H 14 15 #include <stdbool.h> 16 #include <stddef.h> 17 #include <string.h> 18 #include "cursor.h" 19 #include "util.h" 20 #include "thread.h" 21 22 #define max(a,b) ((a) > (b) ? (a) : (b)) 23 #define min(a,b) ((a) < (b) ? (a) : (b)) 24 25 /* 26 * The prot_queue structure represents a thread-safe queue that can hold 27 * generic data elements. 28 */ 29 struct prot_queue { 30 unsigned char *buf; 31 size_t buflen; 32 33 int head; 34 int tail; 35 int count; 36 int elem_size; 37 38 pthread_mutex_t mutex; 39 pthread_cond_t cond; 40 }; 41 42 43 /* 44 * Initialize the queue. 45 * Params: 46 * q - Pointer to the queue. 47 * buf - Buffer for holding data elements. 48 * buflen - Length of the buffer. 49 * elem_size - Size of each data element. 50 * Returns 1 if successful, 0 otherwise. 51 */ 52 static inline int prot_queue_init(struct prot_queue* q, void* buf, 53 size_t buflen, int elem_size) 54 { 55 // buffer elements must fit nicely in the buffer 56 if (buflen == 0 || buflen % elem_size != 0) 57 assert(!"queue elements don't fit nicely"); 58 59 q->head = 0; 60 q->tail = 0; 61 q->count = 0; 62 q->buf = buf; 63 q->buflen = buflen; 64 q->elem_size = elem_size; 65 66 pthread_mutex_init(&q->mutex, NULL); 67 pthread_cond_init(&q->cond, NULL); 68 69 return 1; 70 } 71 72 /* 73 * Return the capacity of the queue. 74 * q - Pointer to the queue. 75 */ 76 static inline size_t prot_queue_capacity(struct prot_queue *q) { 77 return q->buflen / q->elem_size; 78 } 79 80 /* 81 * Push an element onto the queue. 82 * Params: 83 * q - Pointer to the queue. 84 * data - Pointer to the data element to be pushed. 85 * 86 * Returns 1 if successful, 0 if the queue is full. 87 */ 88 static int prot_queue_push(struct prot_queue* q, void *data) 89 { 90 int cap; 91 92 pthread_mutex_lock(&q->mutex); 93 94 cap = prot_queue_capacity(q); 95 if (q->count == cap) { 96 // only signal if the push was sucessful 97 pthread_mutex_unlock(&q->mutex); 98 return 0; 99 } 100 101 memcpy(&q->buf[q->tail * q->elem_size], data, q->elem_size); 102 q->tail = (q->tail + 1) % cap; 103 q->count++; 104 105 pthread_cond_signal(&q->cond); 106 pthread_mutex_unlock(&q->mutex); 107 108 return 1; 109 } 110 111 /* 112 * Push multiple elements onto the queue. 113 * Params: 114 * q - Pointer to the queue. 115 * data - Pointer to the data elements to be pushed. 116 * count - Number of elements to push. 117 * 118 * Returns the number of elements successfully pushed, 0 if the queue is full or if there is not enough contiguous space. 119 */ 120 static int prot_queue_push_all(struct prot_queue* q, void *data, int count) 121 { 122 int cap; 123 int first_copy_count, second_copy_count; 124 125 pthread_mutex_lock(&q->mutex); 126 127 cap = prot_queue_capacity(q); 128 if (q->count + count > cap) { 129 pthread_mutex_unlock(&q->mutex); 130 return 0; // Return failure if the queue is full 131 } 132 133 first_copy_count = min(count, cap - q->tail); // Elements until the end of the buffer 134 second_copy_count = count - first_copy_count; // Remaining elements if wrap around 135 136 memcpy(&q->buf[q->tail * q->elem_size], data, first_copy_count * q->elem_size); 137 q->tail = (q->tail + first_copy_count) % cap; 138 139 if (second_copy_count > 0) { 140 // If there is a wrap around, copy the remaining elements 141 memcpy(&q->buf[q->tail * q->elem_size], (char *)data + first_copy_count * q->elem_size, second_copy_count * q->elem_size); 142 q->tail = (q->tail + second_copy_count) % cap; 143 } 144 145 q->count += count; 146 147 pthread_cond_signal(&q->cond); // Signal a waiting thread 148 pthread_mutex_unlock(&q->mutex); 149 150 return count; 151 } 152 153 /* 154 * Try to pop an element from the queue without blocking. 155 * Params: 156 * q - Pointer to the queue. 157 * data - Pointer to where the popped data will be stored. 158 * Returns 1 if successful, 0 if the queue is empty. 159 */ 160 static inline int prot_queue_try_pop_all(struct prot_queue *q, void *data, int max_items) { 161 int items_to_pop, items_until_end; 162 163 pthread_mutex_lock(&q->mutex); 164 165 if (q->count == 0) { 166 pthread_mutex_unlock(&q->mutex); 167 return 0; 168 } 169 170 items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size; 171 items_to_pop = min(q->count, max_items); 172 items_to_pop = min(items_to_pop, items_until_end); 173 174 memcpy(data, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size); 175 q->head = (q->head + items_to_pop) % prot_queue_capacity(q); 176 q->count -= items_to_pop; 177 178 pthread_mutex_unlock(&q->mutex); 179 return items_to_pop; 180 } 181 182 /* 183 * Wait until we have elements, and then pop multiple elements from the queue 184 * up to the specified maximum. 185 * 186 * Params: 187 * q - Pointer to the queue. 188 * buffer - Pointer to the buffer where popped data will be stored. 189 * max_items - Maximum number of items to pop from the queue. 190 * Returns the actual number of items popped. 191 */ 192 static int prot_queue_pop_all(struct prot_queue *q, void *dest, int max_items) { 193 pthread_mutex_lock(&q->mutex); 194 195 // Wait until there's at least one item to pop 196 while (q->count == 0) { 197 pthread_cond_wait(&q->cond, &q->mutex); 198 } 199 200 int items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size; 201 int items_to_pop = min(q->count, max_items); 202 items_to_pop = min(items_to_pop, items_until_end); 203 204 memcpy(dest, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size); 205 q->head = (q->head + items_to_pop) % prot_queue_capacity(q); 206 q->count -= items_to_pop; 207 208 pthread_mutex_unlock(&q->mutex); 209 210 return items_to_pop; 211 } 212 213 /* 214 * Pop an element from the queue. Blocks if the queue is empty. 215 * Params: 216 * q - Pointer to the queue. 217 * data - Pointer to where the popped data will be stored. 218 */ 219 static inline void prot_queue_pop(struct prot_queue *q, void *data) { 220 pthread_mutex_lock(&q->mutex); 221 222 while (q->count == 0) 223 pthread_cond_wait(&q->cond, &q->mutex); 224 225 memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size); 226 q->head = (q->head + 1) % prot_queue_capacity(q); 227 q->count--; 228 229 pthread_mutex_unlock(&q->mutex); 230 } 231 232 /* 233 * Destroy the queue. Releases resources associated with the queue. 234 * Params: 235 * q - Pointer to the queue. 236 */ 237 static inline void prot_queue_destroy(struct prot_queue* q) { 238 pthread_mutex_destroy(&q->mutex); 239 pthread_cond_destroy(&q->cond); 240 } 241 242 #endif // PROT_QUEUE_H