nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

commit c799a785a9918c078c2c633ab843e88a5d46f770
parent 733303dce79d02d498a39fb7af07a3145e2c17c0
Author: William Casarin <jb55@jb55.com>
Date:   Tue,  8 Aug 2023 12:58:52 -0700

queue: write many messages to the queue at once

Diffstat:
MMakefile | 2+-
Mnostrdb.c | 28+++++++++++++++++++++-------
Mprotected_queue.h | 44+++++++++++++++++++++++++++++++++++++++++++-
3 files changed, 65 insertions(+), 9 deletions(-)

diff --git a/Makefile b/Makefile @@ -19,7 +19,7 @@ all: lib bindings bindings: bindings-swift bindings-c check: test - ./test | uniq -c + ./test clean: rm -rf test bench bindings diff --git a/nostrdb.c b/nostrdb.c @@ -107,6 +107,13 @@ int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32], return 1; } +static inline int ndb_writer_queue_msgs(struct ndb_writer *writer, + struct ndb_writer_msg *msgs, + int num_msgs) +{ + return prot_queue_push_all(&writer->inbox, msgs, num_msgs); +} + static int ndb_writer_queue_note(struct ndb_writer *writer, struct ndb_note *note, size_t note_len) { @@ -122,7 +129,8 @@ static int ndb_writer_queue_note(struct ndb_writer *writer, static int ndb_ingester_process_event(secp256k1_context *ctx, struct ndb_ingester *ingester, - struct ndb_ingester_event *ev) + struct ndb_ingester_event *ev, + struct ndb_writer_msg *out) { struct ndb_tce tce; struct ndb_note *note; @@ -160,10 +168,9 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, note = realloc(note, note_size); - if (!ndb_writer_queue_note(ingester->writer, note, note_size)) { - fprintf(stderr, "failed to queue note. queue full.\n"); - goto cleanup; - } + out->type = NDB_WRITER_NOTE; + out->note.note = note; + out->note.note_len = note_size; // there's nothing left to do with the original json, so free it free(ev->json); @@ -218,15 +225,18 @@ static void *ndb_ingester_thread(void *data) // 1mb scratch buffer for struct ndb_ingester *ingester = data; struct ndb_ingester_msg msgs[max_pop], *msg; - int i, popped; + struct ndb_writer_msg outs[max_pop], *out; + int i, to_write, popped; secp256k1_context *ctx; ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); while (true) { + to_write = 0; popped = prot_queue_pop_all(&ingester->inbox, msgs, max_pop); ndb_debug("popped %d items in the ingester thread\n", popped); // look for a quit message to quit ASAP + // TODO: should I drain the queue first ? for (i = 0; i < popped; i++) { msg = &msgs[i]; if (msg->type == NDB_INGEST_QUIT) @@ -241,10 +251,14 @@ static void *ndb_ingester_thread(void *data) ndb_debug("ingester: unexpected quit message\n"); goto cleanup; case NDB_INGEST_EVENT: + out = &outs[to_write++]; ndb_ingester_process_event(ctx, ingester, - &msg->event); + &msg->event, out); } } + + if (to_write > 0) + ndb_writer_queue_msgs(ingester->writer, outs, to_write); } cleanup: diff --git a/protected_queue.h b/protected_queue.h @@ -83,7 +83,7 @@ static inline size_t prot_queue_capacity(struct prot_queue *q) { * * Returns 1 if successful, 0 if the queue is full. */ -static inline int prot_queue_push(struct prot_queue* q, void *data) +static int prot_queue_push(struct prot_queue* q, void *data) { int cap; @@ -106,6 +106,48 @@ static inline int prot_queue_push(struct prot_queue* q, void *data) return 1; } +/* + * Push multiple elements onto the queue. + * Params: + * q - Pointer to the queue. + * data - Pointer to the data elements to be pushed. + * count - Number of elements to push. + * + * Returns the number of elements successfully pushed, 0 if the queue is full or if there is not enough contiguous space. + */ +static int prot_queue_push_all(struct prot_queue* q, void *data, int count) +{ + int cap; + int first_copy_count, second_copy_count; + + pthread_mutex_lock(&q->mutex); + + cap = prot_queue_capacity(q); + if (q->count + count > cap) { + pthread_mutex_unlock(&q->mutex); + return 0; // Return failure if the queue is full + } + + first_copy_count = min(count, cap - q->tail); // Elements until the end of the buffer + second_copy_count = count - first_copy_count; // Remaining elements if wrap around + + memcpy(&q->buf[q->tail * q->elem_size], data, first_copy_count * q->elem_size); + q->tail = (q->tail + first_copy_count) % cap; + + if (second_copy_count > 0) { + // If there is a wrap around, copy the remaining elements + memcpy(&q->buf[q->tail * q->elem_size], (char *)data + first_copy_count * q->elem_size, second_copy_count * q->elem_size); + q->tail = (q->tail + second_copy_count) % cap; + } + + q->count += count; + + pthread_cond_signal(&q->cond); // Signal a waiting thread + pthread_mutex_unlock(&q->mutex); + + return count; +} + /* * Try to pop an element from the queue without blocking. * Params: