commit e2dd7862ee67abdb437689953794124ec61f3aac
parent 0bbcdce17cb9fbfca9a89ad7f96536baaff06914
Author: William Casarin <jb55@jb55.com>
Date: Tue, 8 Aug 2023 14:59:36 -0700
ingester: switch single-thread to threadpool
Diffstat:
4 files changed, 41 insertions(+), 57 deletions(-)
diff --git a/nostrdb.c b/nostrdb.c
@@ -17,6 +17,12 @@
#include "secp256k1_ecdh.h"
#include "secp256k1_schnorrsig.h"
+// the maximum number of things threads pop and push in bulk
+static const int THREAD_QUEUE_BATCH = 1024;
+
+// the maximum size of inbox queues
+static const int DEFAULT_QUEUE_SIZE = 50000;
+
struct ndb_json_parser {
const char *json;
int json_len;
@@ -36,11 +42,7 @@ struct ndb_writer {
};
struct ndb_ingester {
- secp256k1_context *ctx;
- void *queue_buf;
- int queue_buflen;
- pthread_t thread_id;
- struct prot_queue inbox;
+ struct threadpool tp;
struct ndb_writer *writer;
};
@@ -187,13 +189,12 @@ cleanup:
static void *ndb_writer_thread(void *data)
{
- static const int max_pop = 1024;
struct ndb_writer *writer = data;
- struct ndb_writer_msg msgs[max_pop], *msg;
+ struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
int i, popped;
while (true) {
- popped = prot_queue_pop_all(&writer->inbox, msgs, max_pop);
+ popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
ndb_debug("popped %d items in the writer thread\n", popped);
// look for a quit message to quit ASAP
@@ -221,19 +222,19 @@ cleanup:
static void *ndb_ingester_thread(void *data)
{
- static const int max_pop = 1024;
-
- // 1mb scratch buffer for
- struct ndb_ingester *ingester = data;
- struct ndb_ingester_msg msgs[max_pop], *msg;
- struct ndb_writer_msg outs[max_pop], *out;
- int i, to_write, popped;
secp256k1_context *ctx;
+ struct thread *thread = data;
+ struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx;
+ struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg;
+ struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out;
+ int i, to_write, popped;
+
ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
+ ndb_debug("started ingester thread\n");
while (true) {
to_write = 0;
- popped = prot_queue_pop_all(&ingester->inbox, msgs, max_pop);
+ popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH);
ndb_debug("popped %d items in the ingester thread\n", popped);
// look for a quit message to quit ASAP
@@ -272,7 +273,7 @@ cleanup:
static int ndb_writer_init(struct ndb_writer *writer)
{
// make the queue big!
- writer->queue_buflen = sizeof(struct ndb_writer_msg) * 1000000;
+ writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE;
writer->queue_buf = malloc(writer->queue_buflen);
if (writer->queue_buf == NULL) {
fprintf(stderr, "ndb: failed to allocate space for writer queue");
@@ -295,30 +296,24 @@ static int ndb_writer_init(struct ndb_writer *writer)
// initialize the ingester queue and then spawn the thread
static int ndb_ingester_init(struct ndb_ingester *ingester,
- struct ndb_writer *writer)
+ struct ndb_writer *writer, int num_threads)
{
- ingester->writer = writer;
+ int elem_size, num_elems;
+ static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT };
- // make the queue big!
- ingester->queue_buflen = sizeof(struct ndb_ingester_msg) * 1000000;
- ingester->queue_buf = malloc(ingester->queue_buflen);
- if (ingester->queue_buf == NULL) {
- fprintf(stderr, "failed to allocate space for ingester queue");
- return 0;
- }
+ // TODO: configurable queue sizes
+ elem_size = sizeof(struct ndb_ingester_msg);
+ num_elems = DEFAULT_QUEUE_SIZE;
- // init the ingester queue.
- prot_queue_init(&ingester->inbox, ingester->queue_buf,
- ingester->queue_buflen, sizeof(struct ndb_ingester_msg));
+ ingester->writer = writer;
- // spin up the thread
- if (pthread_create(&ingester->thread_id, NULL, ndb_ingester_thread,
- ingester))
+ if (!threadpool_init(&ingester->tp, num_threads, elem_size, num_elems,
+ &quit_msg, ingester, ndb_ingester_thread))
{
- fprintf(stderr, "ndb ingester thread failed to create\n");
+ fprintf(stderr, "ndb ingester threadpool failed to init\n");
return 0;
}
-
+
return 1;
}
@@ -345,22 +340,7 @@ static int ndb_writer_destroy(struct ndb_writer *writer)
static int ndb_ingester_destroy(struct ndb_ingester *ingester)
{
- struct ndb_ingester_msg msg;
-
- // kill thread
- msg.type = NDB_INGEST_QUIT;
- if (!prot_queue_push(&ingester->inbox, &msg)) {
- // queue is too full to push quit message. just kill it.
- pthread_exit(ingester->thread_id);
- } else {
- pthread_join(ingester->thread_id, NULL);
- }
-
- // cleanup
- prot_queue_destroy(&ingester->inbox);
-
- free(ingester->queue_buf);
-
+ threadpool_destroy(&ingester->tp);
return 1;
}
@@ -373,7 +353,7 @@ static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
msg.event.json = json;
msg.event.len = len;
- return prot_queue_push(&ingester->inbox, &msg);
+ return threadpool_dispatch(&ingester->tp, &msg);
}
static void ndb_make_id_ts(unsigned char *id, uint32_t created,
@@ -383,7 +363,7 @@ static void ndb_make_id_ts(unsigned char *id, uint32_t created,
ts->created = created;
}
-int ndb_init(struct ndb **pndb, size_t mapsize)
+int ndb_init(struct ndb **pndb, size_t mapsize, int ingester_threads)
{
struct ndb *ndb;
//MDB_dbi ind_id; // TODO: ind_pk, etc
@@ -413,7 +393,7 @@ int ndb_init(struct ndb **pndb, size_t mapsize)
if (!ndb_writer_init(&ndb->writer))
return 0;
- if (!ndb_ingester_init(&ndb->ingester, &ndb->writer))
+ if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, ingester_threads))
return 0;
// Initialize LMDB environment and spin up threads
diff --git a/nostrdb.h b/nostrdb.h
@@ -138,7 +138,7 @@ int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair);
int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[32], unsigned char signature[64]);
// NDB
-int ndb_init(struct ndb **ndb, size_t mapsize);
+int ndb_init(struct ndb **ndb, size_t mapsize, int ingester_threads);
int ndb_process_event(struct ndb *, const char *json, int len);
void ndb_destroy(struct ndb *);
diff --git a/protected_queue.h b/protected_queue.h
@@ -6,6 +6,7 @@
* memory for the queue buffer and ensuring its correct lifespan.
*
* Author: William Casarin
+ * Inspired-by: https://github.com/hoytech/hoytech-cpp/blob/master/hoytech/protected_queue.h
*/
#ifndef PROT_QUEUE_H
diff --git a/test.c b/test.c
@@ -15,14 +15,17 @@ static void test_lmdb_put()
struct ndb *ndb;
static const int alloc_size = 2 << 18;
char *json = malloc(alloc_size);
- int i, written;
+ int i, mapsize, written, ingester_threads;
+
+ mapsize = 1024 * 1024 * 100;
+ ingester_threads = 8;
// 256MB
- assert(ndb_init(&ndb, 2 << 28));
+ assert(ndb_init(&ndb, mapsize, ingester_threads));
read_file("testdata/contacts-event.json", (unsigned char*)json, alloc_size, &written);
- for (i = 0; i < 6000; i++) {
+ for (i = 0; i < 50000; i++) {
ndb_process_event(ndb, json, written);
}