nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

commit 62b666aa690994b8929156bdc34bef88e9f5e64a
parent d23b0e3d8d3e079bd215673626de1a646991b7c5
Author: William Casarin <jb55@jb55.com>
Date:   Mon,  7 Aug 2023 15:35:36 -0700

queue: add pop_all

allows you to pop many things from the queue at once

Diffstat:
Mprotected_queue.h | 39+++++++++++++++++++++++++++++++++++----
1 file changed, 35 insertions(+), 4 deletions(-)

diff --git a/protected_queue.h b/protected_queue.h @@ -16,6 +16,7 @@ #include <stddef.h> #include <string.h> #include "cursor.h" +#include "util.h" #define BUFFER_SIZE 100 @@ -25,7 +26,7 @@ */ struct prot_queue { unsigned char *buf; - int buflen; + size_t buflen; int head; int tail; @@ -46,8 +47,8 @@ struct prot_queue { * elem_size - Size of each data element. * Returns 1 if successful, 0 otherwise. */ -static inline int prot_queue_init(struct prot_queue* q, void* buf, int buflen, - int elem_size) +static inline int prot_queue_init(struct prot_queue* q, void* buf, + size_t buflen, int elem_size) { // buffer elements must fit nicely in the buffer if (buflen == 0 || buflen % elem_size != 0) @@ -70,7 +71,7 @@ static inline int prot_queue_init(struct prot_queue* q, void* buf, int buflen, * Return the capacity of the queue. * q - Pointer to the queue. */ -static inline int prot_queue_capacity(struct prot_queue *q) { +static inline size_t prot_queue_capacity(struct prot_queue *q) { return q->buflen / q->elem_size; } @@ -128,6 +129,36 @@ static inline int prot_queue_try_pop(struct prot_queue *q, void *data) { return 1; } +/* + * Wait until we have elements, and then pop multiple elements from the queue + * up to the specified maximum. + * + * Params: + * q - Pointer to the queue. + * buffer - Pointer to the buffer where popped data will be stored. + * max_items - Maximum number of items to pop from the queue. + * Returns the actual number of items popped. + */ +static int prot_queue_pop_all(struct prot_queue *q, void *dest, int max_items) { + pthread_mutex_lock(&q->mutex); + + // Wait until there's at least one item to pop + while (q->count == 0) { + pthread_cond_wait(&q->cond, &q->mutex); + } + + int items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size; + int items_to_pop = min(q->count, max_items); + items_to_pop = min(items_to_pop, items_until_end); + + memcpy(dest, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size); + q->head = (q->head + items_to_pop) % prot_queue_capacity(q); + q->count -= items_to_pop; + + pthread_mutex_unlock(&q->mutex); + + return items_to_pop; +} /* * Pop an element from the queue. Blocks if the queue is empty.