protected_queue.h (6409B)
1 /* 2 * This header file provides a thread-safe queue implementation for generic 3 * data elements. It uses POSIX threads (pthreads) to ensure thread safety. 4 * The queue allows for pushing and popping elements, with the ability to 5 * block or non-block on pop operations. Users are responsible for providing 6 * memory for the queue buffer and ensuring its correct lifespan. 7 * 8 * Author: William Casarin 9 * Inspired-by: https://github.com/hoytech/hoytech-cpp/blob/master/hoytech/protected_queue.h 10 */ 11 12 #ifndef PROT_QUEUE_H 13 #define PROT_QUEUE_H 14 15 #include <pthread.h> 16 #include <stdbool.h> 17 #include <stddef.h> 18 #include <string.h> 19 #include "cursor.h" 20 #include "util.h" 21 22 /* 23 * The prot_queue structure represents a thread-safe queue that can hold 24 * generic data elements. 25 */ 26 struct prot_queue { 27 unsigned char *buf; 28 size_t buflen; 29 30 int head; 31 int tail; 32 int count; 33 int elem_size; 34 35 pthread_mutex_t mutex; 36 pthread_cond_t cond; 37 }; 38 39 40 /* 41 * Initialize the queue. 42 * Params: 43 * q - Pointer to the queue. 44 * buf - Buffer for holding data elements. 45 * buflen - Length of the buffer. 46 * elem_size - Size of each data element. 47 * Returns 1 if successful, 0 otherwise. 48 */ 49 static inline int prot_queue_init(struct prot_queue* q, void* buf, 50 size_t buflen, int elem_size) 51 { 52 // buffer elements must fit nicely in the buffer 53 if (buflen == 0 || buflen % elem_size != 0) 54 assert(!"queue elements don't fit nicely"); 55 56 q->head = 0; 57 q->tail = 0; 58 q->count = 0; 59 q->buf = buf; 60 q->buflen = buflen; 61 q->elem_size = elem_size; 62 63 pthread_mutex_init(&q->mutex, NULL); 64 pthread_cond_init(&q->cond, NULL); 65 66 return 1; 67 } 68 69 /* 70 * Return the capacity of the queue. 71 * q - Pointer to the queue. 72 */ 73 static inline size_t prot_queue_capacity(struct prot_queue *q) { 74 return q->buflen / q->elem_size; 75 } 76 77 /* 78 * Push an element onto the queue. 79 * Params: 80 * q - Pointer to the queue. 81 * data - Pointer to the data element to be pushed. 82 * 83 * Returns 1 if successful, 0 if the queue is full. 84 */ 85 static int prot_queue_push(struct prot_queue* q, void *data) 86 { 87 int cap; 88 89 pthread_mutex_lock(&q->mutex); 90 91 cap = prot_queue_capacity(q); 92 if (q->count == cap) { 93 // only signal if the push was sucessful 94 pthread_mutex_unlock(&q->mutex); 95 return 0; 96 } 97 98 memcpy(&q->buf[q->tail * q->elem_size], data, q->elem_size); 99 q->tail = (q->tail + 1) % cap; 100 q->count++; 101 102 pthread_cond_signal(&q->cond); 103 pthread_mutex_unlock(&q->mutex); 104 105 return 1; 106 } 107 108 /* 109 * Push multiple elements onto the queue. 110 * Params: 111 * q - Pointer to the queue. 112 * data - Pointer to the data elements to be pushed. 113 * count - Number of elements to push. 114 * 115 * Returns the number of elements successfully pushed, 0 if the queue is full or if there is not enough contiguous space. 116 */ 117 static int prot_queue_push_all(struct prot_queue* q, void *data, int count) 118 { 119 int cap; 120 int first_copy_count, second_copy_count; 121 122 pthread_mutex_lock(&q->mutex); 123 124 cap = prot_queue_capacity(q); 125 if (q->count + count > cap) { 126 pthread_mutex_unlock(&q->mutex); 127 return 0; // Return failure if the queue is full 128 } 129 130 first_copy_count = min(count, cap - q->tail); // Elements until the end of the buffer 131 second_copy_count = count - first_copy_count; // Remaining elements if wrap around 132 133 memcpy(&q->buf[q->tail * q->elem_size], data, first_copy_count * q->elem_size); 134 q->tail = (q->tail + first_copy_count) % cap; 135 136 if (second_copy_count > 0) { 137 // If there is a wrap around, copy the remaining elements 138 memcpy(&q->buf[q->tail * q->elem_size], (char *)data + first_copy_count * q->elem_size, second_copy_count * q->elem_size); 139 q->tail = (q->tail + second_copy_count) % cap; 140 } 141 142 q->count += count; 143 144 pthread_cond_signal(&q->cond); // Signal a waiting thread 145 pthread_mutex_unlock(&q->mutex); 146 147 return count; 148 } 149 150 /* 151 * Try to pop an element from the queue without blocking. 152 * Params: 153 * q - Pointer to the queue. 154 * data - Pointer to where the popped data will be stored. 155 * Returns 1 if successful, 0 if the queue is empty. 156 */ 157 static inline int prot_queue_try_pop_all(struct prot_queue *q, void *data, int max_items) { 158 int items_to_pop, items_until_end; 159 160 pthread_mutex_lock(&q->mutex); 161 162 if (q->count == 0) { 163 pthread_mutex_unlock(&q->mutex); 164 return 0; 165 } 166 167 items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size; 168 items_to_pop = min(q->count, max_items); 169 items_to_pop = min(items_to_pop, items_until_end); 170 171 memcpy(data, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size); 172 q->head = (q->head + items_to_pop) % prot_queue_capacity(q); 173 q->count -= items_to_pop; 174 175 pthread_mutex_unlock(&q->mutex); 176 return items_to_pop; 177 } 178 179 /* 180 * Wait until we have elements, and then pop multiple elements from the queue 181 * up to the specified maximum. 182 * 183 * Params: 184 * q - Pointer to the queue. 185 * buffer - Pointer to the buffer where popped data will be stored. 186 * max_items - Maximum number of items to pop from the queue. 187 * Returns the actual number of items popped. 188 */ 189 static int prot_queue_pop_all(struct prot_queue *q, void *dest, int max_items) { 190 pthread_mutex_lock(&q->mutex); 191 192 // Wait until there's at least one item to pop 193 while (q->count == 0) { 194 pthread_cond_wait(&q->cond, &q->mutex); 195 } 196 197 int items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size; 198 int items_to_pop = min(q->count, max_items); 199 items_to_pop = min(items_to_pop, items_until_end); 200 201 memcpy(dest, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size); 202 q->head = (q->head + items_to_pop) % prot_queue_capacity(q); 203 q->count -= items_to_pop; 204 205 pthread_mutex_unlock(&q->mutex); 206 207 return items_to_pop; 208 } 209 210 /* 211 * Pop an element from the queue. Blocks if the queue is empty. 212 * Params: 213 * q - Pointer to the queue. 214 * data - Pointer to where the popped data will be stored. 215 */ 216 static inline void prot_queue_pop(struct prot_queue *q, void *data) { 217 pthread_mutex_lock(&q->mutex); 218 219 while (q->count == 0) 220 pthread_cond_wait(&q->cond, &q->mutex); 221 222 memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size); 223 q->head = (q->head + 1) % prot_queue_capacity(q); 224 q->count--; 225 226 pthread_mutex_unlock(&q->mutex); 227 } 228 229 /* 230 * Destroy the queue. Releases resources associated with the queue. 231 * Params: 232 * q - Pointer to the queue. 233 */ 234 static inline void prot_queue_destroy(struct prot_queue* q) { 235 pthread_mutex_destroy(&q->mutex); 236 pthread_cond_destroy(&q->cond); 237 } 238 239 #endif // PROT_QUEUE_H