commit 70d8b9f3622e8a663e5853833be740794be8464d
parent 7a134ac03ee4ec8e639b5add57a46771c2669e1f
Author: William Casarin <jb55@jb55.com>
Date: Wed, 9 Aug 2023 21:03:22 -0700
ndb: actually write notes to DB
Before sending to the writer thread, we check the database to see if we
already have the record.
Diffstat:
M | nostrdb.c | | | 211 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------- |
M | test.c | | | 1 | + |
2 files changed, 162 insertions(+), 50 deletions(-)
diff --git a/nostrdb.c b/nostrdb.c
@@ -45,6 +45,8 @@ struct ndb_lmdb {
};
struct ndb_writer {
+ struct ndb_lmdb *lmdb;
+
void *queue_buf;
int queue_buflen;
pthread_t thread_id;
@@ -57,8 +59,9 @@ struct ndb_ingester {
struct ndb_writer *writer;
};
+
struct ndb {
- MDB_env *env;
+ struct ndb_lmdb lmdb;
struct ndb_ingester ingester;
struct ndb_writer writer;
// lmdb environ handles, etc
@@ -144,12 +147,19 @@ static int ndb_writer_queue_note(struct ndb_writer *writer,
static int ndb_ingester_process_event(secp256k1_context *ctx,
struct ndb_ingester *ingester,
struct ndb_ingester_event *ev,
- struct ndb_writer_msg *out)
+ struct ndb_writer_msg *out,
+ MDB_txn *read_txn
+ )
{
struct ndb_tce tce;
struct ndb_note *note;
+ struct ndb_lmdb *lmdb;
void *buf;
size_t bufsize, note_size;
+ int rc;
+
+ // we will use this to check if we already have it in the DB
+ lmdb = ingester->writer->lmdb;
// since we're going to be passing this allocated note to a different
// thread, we can't use thread-local buffers. just allocate a block
@@ -180,15 +190,28 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
goto cleanup;
}
- note = realloc(note, note_size);
+ // let's see if we already have it
+ MDB_val key, val;
+ key.mv_size = 32;
+ key.mv_data = note->id;
+ rc = mdb_get(read_txn, lmdb->dbis[NDB_DBI_ID], &key, &val);
- out->type = NDB_WRITER_NOTE;
- out->note.note = note;
- out->note.note_len = note_size;
+ if (rc == MDB_NOTFOUND) {
+ // we didn't find anything. let's send it
+ // to the writer thread
+ note = realloc(note, note_size);
- // there's nothing left to do with the original json, so free it
- free(ev->json);
- return 1;
+ out->type = NDB_WRITER_NOTE;
+ out->note.note = note;
+ out->note.note_len = note_size;
+
+ // there's nothing left to do with the original json, so free it
+ free(ev->json);
+ return 1;
+ }
+
+ // we already have it or there's an error, either way
+ // cleanup and return 0
}
cleanup:
@@ -202,31 +225,69 @@ static void *ndb_writer_thread(void *data)
{
struct ndb_writer *writer = data;
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
- int i, popped;
+ int i, popped, done, any_note;
+ MDB_val key, val;
+ MDB_txn *txn;
- while (true) {
+ done = 0;
+ while (!done) {
+ txn = NULL;
popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
ndb_debug("popped %d items in the writer thread\n", popped);
- // look for a quit message to quit ASAP
+ any_note = 0;
+ for (i = 0 ; i < popped; i++) {
+ msg = &msgs[i];
+ if (msg->type == NDB_WRITER_NOTE) {
+ any_note = 1;
+ break;
+ }
+ }
+
+ if (any_note && mdb_txn_begin(writer->lmdb->env, NULL, 0, &txn))
+ {
+ fprintf(stderr, "writer thread txn_begin failed");
+ // should definitely not happen unless DB is full
+ // or something ?
+ assert(false);
+ }
+
for (i = 0; i < popped; i++) {
msg = &msgs[i];
- if (msg->type == NDB_WRITER_QUIT)
- goto cleanup;
+ switch (msg->type) {
+ case NDB_WRITER_QUIT:
+ // quits are handled before this
+ done = 1;
+ continue;
+ case NDB_WRITER_NOTE:
+ key.mv_size = 32;
+ key.mv_data = msg->note.note->id;
+ val.mv_size = msg->note.note_len;
+ val.mv_data = msg->note.note;
+
+ if (txn != NULL &&
+ mdb_put(txn, writer->lmdb->dbis[NDB_DBI_ID],
+ &key, &val, 0))
+ {
+ fprintf(stderr, "writer thread txn commit failed");
+ }
+ }
}
- switch (msg->type) {
- case NDB_WRITER_QUIT:
- // quits are handled before this
- ndb_debug("writer: unexpected quit message\n");
- goto cleanup;
- case NDB_WRITER_NOTE:
- //ndb_debug("writing note %ld bytes\n", msg->note.note_len);
- free(msg->note.note);
+ // commit writes
+ if (any_note && mdb_txn_commit(txn)) {
+ fprintf(stderr, "writer thread txn commit failed");
+ assert(false);
+ }
+
+ // free notes
+ for (i = 0; i < popped; i++) {
+ msg = &msgs[i];
+ if (msg->type == NDB_WRITER_NOTE)
+ free(msg->note.note);
}
}
-cleanup:
ndb_debug("quitting writer thread\n");
return NULL;
}
@@ -236,54 +297,67 @@ static void *ndb_ingester_thread(void *data)
secp256k1_context *ctx;
struct thread *thread = data;
struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx;
+ struct ndb_lmdb *lmdb = ingester->writer->lmdb;
struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg;
struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out;
- int i, to_write, popped;
+ int i, to_write, popped, done, any_event;
+ MDB_txn *read_txn = NULL;
ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
ndb_debug("started ingester thread\n");
- while (true) {
+ done = 0;
+ while (!done) {
to_write = 0;
+ any_event = 0;
+
popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH);
ndb_debug("popped %d items in the ingester thread\n", popped);
- // look for a quit message to quit ASAP
- // TODO: should I drain the queue first ?
for (i = 0; i < popped; i++) {
msg = &msgs[i];
- if (msg->type == NDB_INGEST_QUIT)
- goto cleanup;
+ if (msg->type == NDB_INGEST_EVENT) {
+ any_event = 1;
+ break;
+ }
}
+ if (any_event)
+ mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn);
+
for (i = 0; i < popped; i++) {
msg = &msgs[i];
switch (msg->type) {
case NDB_INGEST_QUIT:
- // quits are handled before this
- ndb_debug("ingester: unexpected quit message\n");
- goto cleanup;
+ done = 1;
+ break;
+
case NDB_INGEST_EVENT:
- out = &outs[to_write++];
- ndb_ingester_process_event(ctx, ingester,
- &msg->event, out);
+ out = &outs[to_write];
+ if (ndb_ingester_process_event(ctx, ingester,
+ &msg->event, out,
+ read_txn)) {
+ to_write++;
+ }
}
}
+ if (any_event)
+ mdb_txn_abort(read_txn);
+
if (to_write > 0)
ndb_writer_queue_msgs(ingester->writer, outs, to_write);
}
-cleanup:
ndb_debug("quitting ingester thread\n");
secp256k1_context_destroy(ctx);
return NULL;
}
-static int ndb_writer_init(struct ndb_writer *writer)
+static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb)
{
- // make the queue big!
+ writer->lmdb = lmdb;
writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE;
writer->queue_buf = malloc(writer->queue_buflen);
if (writer->queue_buf == NULL) {
@@ -374,38 +448,75 @@ static void ndb_make_id_ts(unsigned char *id, uint32_t created,
ts->created = created;
}
-int ndb_init(struct ndb **pndb, size_t mapsize, int ingester_threads)
+static int ndb_init_lmdb(struct ndb_lmdb *lmdb, size_t mapsize)
{
- struct ndb *ndb;
- //MDB_dbi ind_id; // TODO: ind_pk, etc
int rc;
+ MDB_txn *txn;
- ndb = *pndb = calloc(1, sizeof(struct ndb));
- if (ndb == NULL) {
- fprintf(stderr, "ndb_init: malloc failed\n");
+ if ((rc = mdb_env_create(&lmdb->env))) {
+ fprintf(stderr, "mdb_env_create failed, error %d\n", rc);
return 0;
}
- if ((rc = mdb_env_create(&ndb->env))) {
- fprintf(stderr, "mdb_env_create failed, error %d\n", rc);
+ if ((rc = mdb_env_set_mapsize(lmdb->env, mapsize))) {
+ fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc);
return 0;
}
- if ((rc = mdb_env_set_mapsize(ndb->env, mapsize))) {
+ if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBIS))) {
fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc);
return 0;
}
- if ((rc = mdb_env_open(ndb->env, "./testdata/db", 0, 0664))) {
+ if ((rc = mdb_env_open(lmdb->env, "./testdata/db", 0, 0664))) {
fprintf(stderr, "mdb_env_open failed, error %d\n", rc);
return 0;
}
- if (!ndb_writer_init(&ndb->writer))
+ // Initialize DBIs
+ if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) {
+ fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc);
+ return 0;
+ }
+
+ if ((rc = mdb_dbi_open(txn, "id", MDB_CREATE, &lmdb->dbis[NDB_DBI_ID]))) {
+ fprintf(stderr, "mdb_dbi_open id failed, error %d\n", rc);
return 0;
+ }
- if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, ingester_threads))
+ // Commit the transaction
+ if ((rc = mdb_txn_commit(txn))) {
+ fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc);
return 0;
+ }
+
+ return 1;
+}
+
+int ndb_init(struct ndb **pndb, size_t mapsize, int ingester_threads)
+{
+ struct ndb *ndb;
+ //MDB_dbi ind_id; // TODO: ind_pk, etc
+
+ ndb = *pndb = calloc(1, sizeof(struct ndb));
+ if (ndb == NULL) {
+ fprintf(stderr, "ndb_init: malloc failed\n");
+ return 0;
+ }
+
+ if (!ndb_init_lmdb(&ndb->lmdb, mapsize))
+ return 0;
+
+ if (!ndb_writer_init(&ndb->writer, &ndb->lmdb)) {
+ fprintf(stderr, "ndb_writer_init failed");
+ return 0;
+ }
+
+ if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, ingester_threads)) {
+ fprintf(stderr, "failed to initialize %d ingester thread(s)",
+ ingester_threads);
+ return 0;
+ }
// Initialize LMDB environment and spin up threads
return 1;
diff --git a/test.c b/test.c
@@ -30,6 +30,7 @@ static void test_lmdb_put()
}
free(json);
+
ndb_destroy(ndb);
}