protected_queue.h (6126B)
1 /* 2 * This header file provides a thread-safe queue implementation for generic 3 * data elements. It uses POSIX threads (pthreads) to ensure thread safety. 4 * The queue allows for pushing and popping elements, with the ability to 5 * block or non-block on pop operations. Users are responsible for providing 6 * memory for the queue buffer and ensuring its correct lifespan. 7 * 8 * Author: William Casarin 9 * Inspired-by: https://github.com/hoytech/hoytech-cpp/blob/master/hoytech/protected_queue.h 10 */ 11 12 #ifndef PROT_QUEUE_H 13 #define PROT_QUEUE_H 14 15 #include <pthread.h> 16 #include <stdbool.h> 17 #include <stddef.h> 18 #include <string.h> 19 #include "cursor.h" 20 #include "util.h" 21 22 #define BUFFER_SIZE 100 23 24 /* 25 * The prot_queue structure represents a thread-safe queue that can hold 26 * generic data elements. 27 */ 28 struct prot_queue { 29 unsigned char *buf; 30 size_t buflen; 31 32 int head; 33 int tail; 34 int count; 35 int elem_size; 36 37 pthread_mutex_t mutex; 38 pthread_cond_t cond; 39 }; 40 41 42 /* 43 * Initialize the queue. 44 * Params: 45 * q - Pointer to the queue. 46 * buf - Buffer for holding data elements. 47 * buflen - Length of the buffer. 48 * elem_size - Size of each data element. 49 * Returns 1 if successful, 0 otherwise. 50 */ 51 static inline int prot_queue_init(struct prot_queue* q, void* buf, 52 size_t buflen, int elem_size) 53 { 54 // buffer elements must fit nicely in the buffer 55 if (buflen == 0 || buflen % elem_size != 0) 56 return 0; 57 58 q->head = 0; 59 q->tail = 0; 60 q->count = 0; 61 q->buf = buf; 62 q->buflen = buflen; 63 q->elem_size = elem_size; 64 65 pthread_mutex_init(&q->mutex, NULL); 66 pthread_cond_init(&q->cond, NULL); 67 68 return 1; 69 } 70 71 /* 72 * Return the capacity of the queue. 73 * q - Pointer to the queue. 74 */ 75 static inline size_t prot_queue_capacity(struct prot_queue *q) { 76 return q->buflen / q->elem_size; 77 } 78 79 /* 80 * Push an element onto the queue. 81 * Params: 82 * q - Pointer to the queue. 83 * data - Pointer to the data element to be pushed. 84 * 85 * Returns 1 if successful, 0 if the queue is full. 86 */ 87 static int prot_queue_push(struct prot_queue* q, void *data) 88 { 89 int cap; 90 91 pthread_mutex_lock(&q->mutex); 92 93 cap = prot_queue_capacity(q); 94 if (q->count == cap) { 95 // only signal if the push was sucessful 96 pthread_mutex_unlock(&q->mutex); 97 return 0; 98 } 99 100 memcpy(&q->buf[q->tail * q->elem_size], data, q->elem_size); 101 q->tail = (q->tail + 1) % cap; 102 q->count++; 103 104 pthread_cond_signal(&q->cond); 105 pthread_mutex_unlock(&q->mutex); 106 107 return 1; 108 } 109 110 /* 111 * Push multiple elements onto the queue. 112 * Params: 113 * q - Pointer to the queue. 114 * data - Pointer to the data elements to be pushed. 115 * count - Number of elements to push. 116 * 117 * Returns the number of elements successfully pushed, 0 if the queue is full or if there is not enough contiguous space. 118 */ 119 static int prot_queue_push_all(struct prot_queue* q, void *data, int count) 120 { 121 int cap; 122 int first_copy_count, second_copy_count; 123 124 pthread_mutex_lock(&q->mutex); 125 126 cap = prot_queue_capacity(q); 127 if (q->count + count > cap) { 128 pthread_mutex_unlock(&q->mutex); 129 return 0; // Return failure if the queue is full 130 } 131 132 first_copy_count = min(count, cap - q->tail); // Elements until the end of the buffer 133 second_copy_count = count - first_copy_count; // Remaining elements if wrap around 134 135 memcpy(&q->buf[q->tail * q->elem_size], data, first_copy_count * q->elem_size); 136 q->tail = (q->tail + first_copy_count) % cap; 137 138 if (second_copy_count > 0) { 139 // If there is a wrap around, copy the remaining elements 140 memcpy(&q->buf[q->tail * q->elem_size], (char *)data + first_copy_count * q->elem_size, second_copy_count * q->elem_size); 141 q->tail = (q->tail + second_copy_count) % cap; 142 } 143 144 q->count += count; 145 146 pthread_cond_signal(&q->cond); // Signal a waiting thread 147 pthread_mutex_unlock(&q->mutex); 148 149 return count; 150 } 151 152 /* 153 * Try to pop an element from the queue without blocking. 154 * Params: 155 * q - Pointer to the queue. 156 * data - Pointer to where the popped data will be stored. 157 * Returns 1 if successful, 0 if the queue is empty. 158 */ 159 static inline int prot_queue_try_pop(struct prot_queue *q, void *data) { 160 pthread_mutex_lock(&q->mutex); 161 162 if (q->count == 0) { 163 pthread_mutex_unlock(&q->mutex); 164 return 0; 165 } 166 167 memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size); 168 q->head = (q->head + 1) % prot_queue_capacity(q); 169 q->count--; 170 171 pthread_mutex_unlock(&q->mutex); 172 return 1; 173 } 174 175 /* 176 * Wait until we have elements, and then pop multiple elements from the queue 177 * up to the specified maximum. 178 * 179 * Params: 180 * q - Pointer to the queue. 181 * buffer - Pointer to the buffer where popped data will be stored. 182 * max_items - Maximum number of items to pop from the queue. 183 * Returns the actual number of items popped. 184 */ 185 static int prot_queue_pop_all(struct prot_queue *q, void *dest, int max_items) { 186 pthread_mutex_lock(&q->mutex); 187 188 // Wait until there's at least one item to pop 189 while (q->count == 0) { 190 pthread_cond_wait(&q->cond, &q->mutex); 191 } 192 193 int items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size; 194 int items_to_pop = min(q->count, max_items); 195 items_to_pop = min(items_to_pop, items_until_end); 196 197 memcpy(dest, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size); 198 q->head = (q->head + items_to_pop) % prot_queue_capacity(q); 199 q->count -= items_to_pop; 200 201 pthread_mutex_unlock(&q->mutex); 202 203 return items_to_pop; 204 } 205 206 /* 207 * Pop an element from the queue. Blocks if the queue is empty. 208 * Params: 209 * q - Pointer to the queue. 210 * data - Pointer to where the popped data will be stored. 211 */ 212 static inline void prot_queue_pop(struct prot_queue *q, void *data) { 213 pthread_mutex_lock(&q->mutex); 214 215 while (q->count == 0) 216 pthread_cond_wait(&q->cond, &q->mutex); 217 218 memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size); 219 q->head = (q->head + 1) % prot_queue_capacity(q); 220 q->count--; 221 222 pthread_mutex_unlock(&q->mutex); 223 } 224 225 /* 226 * Destroy the queue. Releases resources associated with the queue. 227 * Params: 228 * q - Pointer to the queue. 229 */ 230 static inline void prot_queue_destroy(struct prot_queue* q) { 231 pthread_mutex_destroy(&q->mutex); 232 pthread_cond_destroy(&q->cond); 233 } 234 235 #endif // PROT_QUEUE_H