commit 3fff947f57174e2e581b1f53718c5518c6066311
parent 62b666aa690994b8929156bdc34bef88e9f5e64a
Author: William Casarin <jb55@jb55.com>
Date: Mon, 7 Aug 2023 15:36:30 -0700
ndb: add threaded ingester and writer queues
This adds an ingester and writer queue to nostrdb.
Ingester: used for queueing json for processing
Writer: A single-thread for writing parsed notes to the database.
For now there is only one ingester thread, but in the future was can
expand this into a thread pool for max CPU utilization.
This also implements a share-nothing architecture similar to strfry.
These queues simply pass around owned pointers, thread contention is
minimized since locks held on queues are very short lived.
Diffstat:
M | nostrdb.c | | | 352 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- |
M | nostrdb.h | | | 9 | +++++++++ |
2 files changed, 351 insertions(+), 10 deletions(-)
diff --git a/nostrdb.c b/nostrdb.c
@@ -6,9 +6,11 @@
#include "random.h"
#include "sha256.h"
#include "lmdb.h"
+#include "util.h"
#include "protected_queue.h"
#include <stdlib.h>
#include <limits.h>
+#include <assert.h>
#include "secp256k1.h"
#include "secp256k1_ecdh.h"
@@ -24,13 +26,27 @@ struct ndb_json_parser {
int num_tokens;
};
+struct ndb_writer {
+ void *queue_buf;
+ int queue_buflen;
+ pthread_t thread_id;
+
+ struct prot_queue inbox;
+};
+
struct ndb_ingester {
+ secp256k1_context *ctx;
+ void *queue_buf;
+ int queue_buflen;
+ pthread_t thread_id;
struct prot_queue inbox;
+ struct ndb_writer *writer;
};
struct ndb {
MDB_env *env;
struct ndb_ingester ingester;
+ struct ndb_writer writer;
// lmdb environ handles, etc
};
@@ -40,6 +56,310 @@ struct ndb_id_ts {
uint32_t created;
};
+enum ndb_ingester_msgtype {
+ NDB_INGEST_EVENT, // write json to the ingester queue for processing
+ NDB_INGEST_QUIT, // kill ingester thread immediately
+};
+
+enum ndb_writer_msgtype {
+ NDB_WRITER_NOTE, // write a note to the db
+ NDB_WRITER_QUIT, // kill thread immediately
+};
+
+struct ndb_ingester_event {
+ const char *json;
+ int len;
+};
+
+struct ndb_writer_note {
+ struct ndb_note *note;
+ size_t note_len;
+};
+
+struct ndb_ingester_msg {
+ enum ndb_ingester_msgtype type;
+ union {
+ struct ndb_ingester_event event;
+ };
+};
+
+struct ndb_writer_msg {
+ enum ndb_writer_msgtype type;
+ union {
+ struct ndb_writer_note note;
+ };
+};
+
+int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32],
+ unsigned char sig[64])
+{
+ secp256k1_xonly_pubkey xonly_pubkey;
+ int ok;
+
+ ok = secp256k1_xonly_pubkey_parse((secp256k1_context*)ctx, &xonly_pubkey,
+ pubkey) != 0;
+ if (!ok) return 0;
+
+ ok = secp256k1_schnorrsig_verify((secp256k1_context*)ctx, sig, id, 32,
+ &xonly_pubkey) > 0;
+ if (!ok) return 0;
+
+ return 1;
+}
+
+static int ndb_writer_queue_note(struct ndb_writer *writer,
+ struct ndb_note *note, size_t note_len)
+{
+ struct ndb_writer_msg msg;
+ msg.type = NDB_WRITER_NOTE;
+
+ msg.note.note = note;
+ msg.note.note_len = note_len;
+
+ return prot_queue_push(&writer->inbox, &msg);
+}
+
+
+static int ndb_ingester_process_event(secp256k1_context *ctx,
+ struct ndb_ingester *ingester,
+ struct ndb_ingester_event *ev)
+{
+ struct ndb_tce tce;
+ struct ndb_note *note;
+ void *buf;
+ size_t bufsize, note_size;
+
+ // since we're going to be passing this allocated note to a different
+ // thread, we can't use thread-local buffers. just allocate a block
+ bufsize = max(ev->len * 8.0, 4096);
+ buf = malloc(bufsize);
+ if (!buf)
+ return 0;
+
+ note_size =
+ ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize);
+
+ switch (tce.evtype) {
+ case NDB_TCE_NOTICE: goto cleanup;
+ case NDB_TCE_EOSE: goto cleanup;
+ case NDB_TCE_OK: goto cleanup;
+ case NDB_TCE_EVENT:
+ note = tce.event.note;
+
+ // Verify! If it's an invalid note we don't need to bothter
+ // writing it to the database
+ if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
+ ndb_debug("signature verification failed\n");
+ goto cleanup;
+ }
+
+ if (note != buf) {
+ ndb_debug("note buffer not equal to malloc'd buffer\n");
+ goto cleanup;
+ }
+
+ note = realloc(note, note_size);
+
+ if (!ndb_writer_queue_note(ingester->writer, note, note_size)) {
+ fprintf(stderr, "failed to queue note. queue full.\n");
+ goto cleanup;
+ }
+
+ // there's nothing left to do with the original json, so free it
+ free((void*)ev->json);
+ return 1;
+ }
+
+cleanup:
+ free((void*)ev->json);
+ free(buf);
+
+ return 0;
+}
+
+static void *ndb_writer_thread(void *data)
+{
+ static const int max_pop = 1024;
+ struct ndb_writer *writer = data;
+ struct ndb_writer_msg msgs[max_pop], *msg;
+ int i, popped;
+
+ while (true) {
+ popped = prot_queue_pop_all(&writer->inbox, msgs, max_pop);
+ ndb_debug("popped %d items in the writer thread\n", popped);
+
+ // look for a quit message to quit ASAP
+ for (i = 0; i < popped; i++) {
+ msg = &msgs[i];
+ if (msg->type == NDB_WRITER_QUIT)
+ goto cleanup;
+ }
+
+ switch (msg->type) {
+ case NDB_WRITER_QUIT:
+ // quits are handled before this
+ ndb_debug("writer: unexpected quit message\n");
+ goto cleanup;
+ case NDB_WRITER_NOTE:
+ ndb_debug("writing note %ld bytes\n", msg->note.note_len);
+ }
+ }
+
+cleanup:
+ ndb_debug("quitting writer thread\n");
+ return NULL;
+}
+
+static void *ndb_ingester_thread(void *data)
+{
+ static const int max_pop = 1024;
+
+ // 1mb scratch buffer for
+ struct ndb_ingester *ingester = data;
+ struct ndb_ingester_msg msgs[max_pop], *msg;
+ int i, popped;
+ secp256k1_context *ctx;
+ ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
+
+ while (true) {
+ popped = prot_queue_pop_all(&ingester->inbox, msgs, max_pop);
+ ndb_debug("popped %d items in the ingester thread\n", popped);
+
+ // look for a quit message to quit ASAP
+ for (i = 0; i < popped; i++) {
+ msg = &msgs[i];
+ if (msg->type == NDB_INGEST_QUIT)
+ goto cleanup;
+ }
+
+ for (i = 0; i < popped; i++) {
+ msg = &msg[i];
+ switch (msg->type) {
+ case NDB_INGEST_QUIT:
+ // quits are handled before this
+ ndb_debug("ingester: unexpected quit message\n");
+ goto cleanup;
+ case NDB_INGEST_EVENT:
+ ndb_ingester_process_event(ctx, ingester,
+ &msg->event);
+ }
+ }
+ }
+
+cleanup:
+ ndb_debug("quitting ingester thread\n");
+ secp256k1_context_destroy(ctx);
+ return NULL;
+}
+
+
+static int ndb_writer_init(struct ndb_writer *writer)
+{
+ // make the queue big!
+ writer->queue_buflen = sizeof(struct ndb_writer_msg) * 1000000;
+ writer->queue_buf = malloc(writer->queue_buflen);
+ if (writer->queue_buf == NULL) {
+ fprintf(stderr, "ndb: failed to allocate space for writer queue");
+ return 0;
+ }
+
+ // init the writer queue.
+ prot_queue_init(&writer->inbox, writer->queue_buf,
+ writer->queue_buflen, sizeof(struct ndb_writer_msg));
+
+ // spin up the writer thread
+ if (pthread_create(&writer->thread_id, NULL, ndb_writer_thread, writer))
+ {
+ fprintf(stderr, "ndb writer thread failed to create\n");
+ return 0;
+ }
+
+ return 1;
+}
+
+// initialize the ingester queue and then spawn the thread
+static int ndb_ingester_init(struct ndb_ingester *ingester,
+ struct ndb_writer *writer)
+{
+ ingester->writer = writer;
+
+ // make the queue big!
+ ingester->queue_buflen = sizeof(struct ndb_ingester_msg) * 1000000;
+ ingester->queue_buf = malloc(ingester->queue_buflen);
+ if (ingester->queue_buf == NULL) {
+ fprintf(stderr, "failed to allocate space for ingester queue");
+ return 0;
+ }
+
+ // init the ingester queue.
+ prot_queue_init(&ingester->inbox, ingester->queue_buf,
+ ingester->queue_buflen, sizeof(struct ndb_ingester_msg));
+
+ // spin up the thread
+ if (pthread_create(&ingester->thread_id, NULL, ndb_ingester_thread,
+ ingester))
+ {
+ fprintf(stderr, "ndb ingester thread failed to create\n");
+ return 0;
+ }
+
+ return 1;
+}
+
+static int ndb_writer_destroy(struct ndb_writer *writer)
+{
+ struct ndb_writer_msg msg;
+
+ // kill thread
+ msg.type = NDB_WRITER_QUIT;
+ if (!prot_queue_push(&writer->inbox, &msg)) {
+ // queue is too full to push quit message. just kill it.
+ pthread_exit(writer->thread_id);
+ } else {
+ pthread_join(writer->thread_id, NULL);
+ }
+
+ // cleanup
+ prot_queue_destroy(&writer->inbox);
+
+ free(writer->queue_buf);
+
+ return 1;
+}
+
+static int ndb_ingester_destroy(struct ndb_ingester *ingester)
+{
+ struct ndb_ingester_msg msg;
+
+ // kill thread
+ msg.type = NDB_INGEST_QUIT;
+ if (!prot_queue_push(&ingester->inbox, &msg)) {
+ // queue is too full to push quit message. just kill it.
+ pthread_exit(ingester->thread_id);
+ } else {
+ pthread_join(ingester->thread_id, NULL);
+ }
+
+ // cleanup
+ prot_queue_destroy(&ingester->inbox);
+
+ free(ingester->queue_buf);
+
+ return 1;
+}
+
+static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
+ const char *json, int len)
+{
+ struct ndb_ingester_msg msg;
+ msg.type = NDB_INGEST_EVENT;
+
+ msg.event.json = json;
+ msg.event.len = len;
+
+ return prot_queue_push(&ingester->inbox, &msg);
+}
+
static void ndb_make_id_ts(unsigned char *id, uint32_t created,
struct ndb_id_ts *ts)
{
@@ -47,33 +367,39 @@ static void ndb_make_id_ts(unsigned char *id, uint32_t created,
ts->created = created;
}
-int ndb_init(struct ndb **ndb, size_t mapsize)
+int ndb_init(struct ndb **pndb, size_t mapsize)
{
- struct ndb *db;
+ struct ndb *ndb;
//MDB_dbi ind_id; // TODO: ind_pk, etc
int rc;
- db = *ndb = calloc(1, sizeof(struct ndb));
- if (*ndb == NULL) {
+ ndb = *pndb = calloc(1, sizeof(struct ndb));
+ if (ndb == NULL) {
fprintf(stderr, "ndb_init: malloc failed\n");
return 0;
}
- if ((rc = mdb_env_create(&db->env))) {
+ if ((rc = mdb_env_create(&ndb->env))) {
fprintf(stderr, "mdb_env_create failed, error %d\n", rc);
return 0;
}
- if ((rc = mdb_env_set_mapsize(db->env, mapsize))) {
+ if ((rc = mdb_env_set_mapsize(ndb->env, mapsize))) {
fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc);
return 0;
}
- if ((rc = mdb_env_open(db->env, "./testdata/db", 0, 0664))) {
+ if ((rc = mdb_env_open(ndb->env, "./testdata/db", 0, 0664))) {
fprintf(stderr, "mdb_env_open failed, error %d\n", rc);
return 0;
}
+ if (!ndb_writer_init(&ndb->writer))
+ return 0;
+
+ if (!ndb_ingester_init(&ndb->ingester, &ndb->writer))
+ return 0;
+
// Initialize LMDB environment and spin up threads
return 1;
}
@@ -84,6 +410,11 @@ void ndb_destroy(struct ndb *ndb)
return;
mdb_env_close(ndb->env);
+
+ // ingester depends on writer and must be destroyed first
+ ndb_ingester_destroy(&ndb->ingester);
+ ndb_writer_destroy(&ndb->writer);
+
free(ndb);
}
@@ -110,11 +441,12 @@ int ndb_process_event(struct ndb *ndb, const char *json, int json_len)
// have our thread that manages a websocket connection, we can
// avoid the copy and just use the buffer we get from that
// thread.
- char *json_copy = strdup(json);
+ char *json_copy = strdupn(json, json_len);
if (json_copy == NULL)
return 0;
- return 0;
+ ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len);
+ return 1;
}
static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag)
@@ -449,7 +781,7 @@ int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note,
unsigned char *end = builder->mem.end;
unsigned char *start = (unsigned char*)(*note) + total_size;
- ndb_builder_set_pubkey(builder, keypair->pubkey);
+ ndb_builder_set_pubkey(builder, keypair->pubkey);
if (!ndb_calculate_id(builder->note, start, end - start))
return 0;
diff --git a/nostrdb.h b/nostrdb.h
@@ -7,6 +7,14 @@
#define NDB_PACKED_STR 0x1
#define NDB_PACKED_ID 0x2
+#define DEBUG 1
+
+#ifdef DEBUG
+#define ndb_debug(...) printf(__VA_ARGS__)
+#else
+#define ndb_debug(...) (void)0
+#endif
+
struct ndb_json_parser;
struct ndb;
@@ -127,6 +135,7 @@ int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen);
int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32], unsigned char sig[64]);
int ndb_create_keypair(struct ndb_keypair *key);
int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair);
+int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[32], unsigned char signature[64]);
// NDB
int ndb_init(struct ndb **ndb, size_t mapsize);