nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

commit 804df0bdf801c583e2f935361ab18edfb4f5261a
parent 91cadd58f63b9c00d22932fefec0c0a8d10278e7
Author: William Casarin <jb55@jb55.com>
Date:   Fri,  4 Aug 2023 16:26:21 -0700

queue: add protected queue implementation

Link: https://chat.openai.com/share/e4e17d6b-e664-44c1-b4bd-220ad8cb0df3
Co-authored-by: GPT4

Diffstat:
Aprotected_queue.h | 161+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtest.c | 114+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 275 insertions(+), 0 deletions(-)

diff --git a/protected_queue.h b/protected_queue.h @@ -0,0 +1,161 @@ +/* + * This header file provides a thread-safe queue implementation for generic + * data elements. It uses POSIX threads (pthreads) to ensure thread safety. + * The queue allows for pushing and popping elements, with the ability to + * block or non-block on pop operations. Users are responsible for providing + * memory for the queue buffer and ensuring its correct lifespan. + * + * Author: William Casarin + */ + +#ifndef PROT_QUEUE_H +#define PROT_QUEUE_H + +#include <pthread.h> +#include <stdbool.h> +#include <stddef.h> +#include <string.h> +#include "cursor.h" + +#define BUFFER_SIZE 100 + +/* + * The prot_queue structure represents a thread-safe queue that can hold + * generic data elements. + */ +struct prot_queue { + unsigned char *buf; + int buflen; + + int head; + int tail; + int count; + int elem_size; + + pthread_mutex_t mutex; + pthread_cond_t cond; +}; + + +/* + * Initialize the queue. + * Params: + * q - Pointer to the queue. + * buf - Buffer for holding data elements. + * buflen - Length of the buffer. + * elem_size - Size of each data element. + * Returns 1 if successful, 0 otherwise. + */ +static inline int prot_queue_init(struct prot_queue* q, void* buf, int buflen, + int elem_size) +{ + // buffer elements must fit nicely in the buffer + if (buflen == 0 || buflen % elem_size != 0) + return 0; + + q->head = 0; + q->tail = 0; + q->count = 0; + q->buf = buf; + q->buflen = buflen; + q->elem_size = elem_size; + + pthread_mutex_init(&q->mutex, NULL); + pthread_cond_init(&q->cond, NULL); + + return 1; +} + +/* + * Return the capacity of the queue. + * q - Pointer to the queue. + */ +static inline int prot_queue_capacity(struct prot_queue *q) { + return q->buflen / q->elem_size; +} + +/* + * Push an element onto the queue. + * Params: + * q - Pointer to the queue. + * data - Pointer to the data element to be pushed. + * + * Returns 1 if successful, 0 if the queue is full. + */ +static inline int prot_queue_push(struct prot_queue* q, void *data) +{ + int cap; + + pthread_mutex_lock(&q->mutex); + + cap = prot_queue_capacity(q); + if (q->count == cap) { + // only signal if the push was sucessful + pthread_mutex_unlock(&q->mutex); + return 0; + } + + memcpy(&q->buf[q->tail * q->elem_size], data, q->elem_size); + q->tail = (q->tail + 1) % cap; + q->count++; + + pthread_cond_signal(&q->cond); + pthread_mutex_unlock(&q->mutex); + + return 1; +} + +/* + * Try to pop an element from the queue without blocking. + * Params: + * q - Pointer to the queue. + * data - Pointer to where the popped data will be stored. + * Returns 1 if successful, 0 if the queue is empty. + */ +static inline int prot_queue_try_pop(struct prot_queue *q, void *data) { + pthread_mutex_lock(&q->mutex); + + if (q->count == 0) { + pthread_mutex_unlock(&q->mutex); + return 0; + } + + memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size); + q->head = (q->head + 1) % prot_queue_capacity(q); + q->count--; + + pthread_mutex_unlock(&q->mutex); + return 1; +} + + +/* + * Pop an element from the queue. Blocks if the queue is empty. + * Params: + * q - Pointer to the queue. + * data - Pointer to where the popped data will be stored. + */ +static inline void prot_queue_pop(struct prot_queue *q, void *data) { + pthread_mutex_lock(&q->mutex); + + while (q->count == 0) + pthread_cond_wait(&q->cond, &q->mutex); + + memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size); + q->head = (q->head + 1) % prot_queue_capacity(q); + q->count--; + + pthread_mutex_unlock(&q->mutex); +} + +/* + * Destroy the queue. Releases resources associated with the queue. + * Params: + * q - Pointer to the queue. + */ +static inline void prot_queue_destroy(struct prot_queue* q) { + pthread_mutex_destroy(&q->mutex); + pthread_cond_destroy(&q->cond); +} + +#endif // PROT_QUEUE_H diff --git a/test.c b/test.c @@ -2,6 +2,7 @@ #include "nostrdb.h" #include "hex.h" #include "io.h" +#include "protected_queue.h" #include <stdio.h> #include <assert.h> @@ -351,6 +352,112 @@ static void test_tce() { #undef JSON } +#define TEST_BUF_SIZE 10 // For simplicity + +static void test_queue_init_pop_push() { + struct prot_queue q; + int buffer[TEST_BUF_SIZE]; + int data; + + // Initialize + assert(prot_queue_init(&q, buffer, sizeof(buffer), sizeof(int)) == 1); + + // Push and Pop + data = 5; + assert(prot_queue_push(&q, &data) == 1); + prot_queue_pop(&q, &data); + assert(data == 5); + + // Push to full, and then fail to push + for (int i = 0; i < TEST_BUF_SIZE; i++) { + assert(prot_queue_push(&q, &i) == 1); + } + assert(prot_queue_push(&q, &data) == 0); // Should fail as queue is full + + // Pop to empty, and then fail to pop + for (int i = 0; i < TEST_BUF_SIZE; i++) { + assert(prot_queue_try_pop(&q, &data) == 1); + assert(data == i); + } + assert(prot_queue_try_pop(&q, &data) == 0); // Should fail as queue is empty +} + +// This function will be used by threads to test thread safety. +void* thread_func(void* arg) { + struct prot_queue* q = (struct prot_queue*) arg; + int data; + + for (int i = 0; i < 100; i++) { + data = i; + prot_queue_push(q, &data); + prot_queue_pop(q, &data); + } + return NULL; +} + +static void test_queue_thread_safety() { + struct prot_queue q; + int buffer[TEST_BUF_SIZE]; + pthread_t threads[2]; + + assert(prot_queue_init(&q, buffer, sizeof(buffer), sizeof(int)) == 1); + + // Create threads + for (int i = 0; i < 2; i++) { + pthread_create(&threads[i], NULL, thread_func, &q); + } + + // Join threads + for (int i = 0; i < 2; i++) { + pthread_join(threads[i], NULL); + } + + // After all operations, the queue should be empty + int data; + assert(prot_queue_try_pop(&q, &data) == 0); +} + +static void test_queue_boundary_conditions() { + struct prot_queue q; + int buffer[TEST_BUF_SIZE]; + int data; + + // Initialize + assert(prot_queue_init(&q, buffer, sizeof(buffer), sizeof(int)) == 1); + + // Push to full + for (int i = 0; i < TEST_BUF_SIZE; i++) { + assert(prot_queue_push(&q, &i) == 1); + } + + // Try to push to a full queue + int old_head = q.head; + int old_tail = q.tail; + int old_count = q.count; + assert(prot_queue_push(&q, &data) == 0); + + // Assert the queue's state has not changed + assert(old_head == q.head); + assert(old_tail == q.tail); + assert(old_count == q.count); + + // Pop to empty + for (int i = 0; i < TEST_BUF_SIZE; i++) { + assert(prot_queue_try_pop(&q, &data) == 1); + } + + // Try to pop from an empty queue + old_head = q.head; + old_tail = q.tail; + old_count = q.count; + assert(prot_queue_try_pop(&q, &data) == 0); + + // Assert the queue's state has not changed + assert(old_head == q.head); + assert(old_tail == q.tail); + assert(old_count == q.count); +} + int main(int argc, const char *argv[]) { test_basic_event(); test_empty_tags(); @@ -362,4 +469,11 @@ int main(int argc, const char *argv[]) { test_tce_eose(); test_tce_command_result_empty_msg(); test_content_len(); + + // protected queue tests + test_queue_init_pop_push(); + test_queue_thread_safety(); + test_queue_boundary_conditions(); + + printf("All tests passed!\n"); // Print this if all tests pass. }