commit 8636e48ffa2852a922609d070f2e2af347236a45
parent 2c71e2315dbd5a1ce51147d85c04f12cf6f4d8aa
Author: William Casarin <jb55@jb55.com>
Date: Fri, 11 Aug 2023 19:20:43 -0700
ndb: parse profiles, index notes using clustered keys
Diffstat:
M | Makefile | | | 7 | ++++--- |
M | nostrdb.c | | | 283 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------- |
2 files changed, 256 insertions(+), 34 deletions(-)
diff --git a/Makefile b/Makefile
@@ -1,9 +1,10 @@
CFLAGS = -Wall -Wno-unused-function -Werror -O2 -g -Ideps/secp256k1/include -Ideps/lmdb -Ideps/flatcc/include
HEADERS = sha256.h nostrdb.h cursor.h hex.h jsmn.h config.h sha256.h random.h memchr.h
-SRCS = nostrdb.c sha256.c
-LDS = $(SRCS) $(ARS)
+FLATCC_SRCS=deps/flatcc/src/runtime/json_parser.c deps/flatcc/src/runtime/builder.c deps/flatcc/src/runtime/emitter.c deps/flatcc/src/runtime/refmap.c
+SRCS = nostrdb.c sha256.c $(FLATCC_SRCS)
+LDS = $(ARS) $(SRCS)
DEPS = $(SRCS) $(HEADERS) $(ARS)
-ARS = deps/lmdb/liblmdb.a deps/secp256k1/.libs/libsecp256k1.a
+ARS = deps/lmdb/liblmdb.a deps/secp256k1/.libs/libsecp256k1.a
LMDB_VER=0.9.31
FLATCC_VER=0.6.1
PREFIX ?= /usr/local
diff --git a/nostrdb.c b/nostrdb.c
@@ -14,6 +14,7 @@
#include <limits.h>
#include <assert.h>
+#include "bindings/c/profile_json_parser.h"
#include "secp256k1.h"
#include "secp256k1_ecdh.h"
#include "secp256k1_schnorrsig.h"
@@ -49,9 +50,13 @@ struct ndb_ingest_controller
};
enum ndb_dbs {
- NDB_DBI_ID,
+ NDB_DB_NOTE,
+ NDB_DB_META,
+ NDB_DB_PROFILE,
+ NDB_DB_NOTE_ID,
+ NDB_DB_PROFILE_PK,
+ NDB_DBS,
};
-#define NDB_DBIS 1
struct ndb_json_parser {
const char *json;
@@ -66,7 +71,7 @@ struct ndb_json_parser {
// useful to pass to threads on its own
struct ndb_lmdb {
MDB_env *env;
- MDB_dbi dbis[NDB_DBIS];
+ MDB_dbi dbs[NDB_DBS];
};
struct ndb_writer {
@@ -161,8 +166,9 @@ enum ndb_ingester_msgtype {
};
enum ndb_writer_msgtype {
- NDB_WRITER_NOTE, // write a note to the db
NDB_WRITER_QUIT, // kill thread immediately
+ NDB_WRITER_NOTE, // write a note to the db
+ NDB_WRITER_PROFILE, // write a profile to the db
};
struct ndb_ingester_event {
@@ -175,6 +181,12 @@ struct ndb_writer_note {
size_t note_len;
};
+struct ndb_writer_profile {
+ struct ndb_writer_note note;
+ void *profile_flatbuf;
+ size_t profile_len;
+};
+
struct ndb_ingester_msg {
enum ndb_ingester_msgtype type;
union {
@@ -186,6 +198,7 @@ struct ndb_writer_msg {
enum ndb_writer_msgtype type;
union {
struct ndb_writer_note note;
+ struct ndb_writer_profile profile;
};
};
@@ -225,26 +238,83 @@ static int ndb_writer_queue_note(struct ndb_writer *writer,
return prot_queue_push(&writer->inbox, &msg);
}
+static uint64_t ndb_get_note_by_id(MDB_txn *txn, struct ndb_lmdb *lmdb,
+ unsigned char *id)
+{
+ MDB_val key, data;
+ MDB_cursor *cur;
+ struct ndb_tsid tsid;
+
+ ndb_tsid_high(&tsid, id);
+ key.mv_data = &tsid;
+ key.mv_size = sizeof(tsid);
+
+ mdb_cursor_open(txn, lmdb->dbs[NDB_DB_NOTE_ID], &cur);
+
+ // Position cursor at the next key greater than or equal to the specified key
+ if (mdb_cursor_get(cur, &key, &data, MDB_SET_RANGE)) {
+ mdb_cursor_close(cur);
+ return 0;
+ }
+
+ if (mdb_cursor_get(cur, &key, &data, MDB_PREV)) {
+ mdb_cursor_close(cur);
+ return 0;
+ }
+
+ mdb_cursor_close(cur);
+ if (memcmp(key.mv_data, id, 32) == 0)
+ return *((uint64_t*)data.mv_data);
+
+ return 0;
+}
+
static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid)
{
unsigned char id[32];
struct ndb_ingest_controller *c = data;
- int rc;
+ MDB_val key;
hex_decode(hexid, 64, id, sizeof(id));
// let's see if we already have it
- MDB_val key, val;
+
key.mv_size = 32;
key.mv_data = id;
- rc = mdb_get(c->read_txn, c->lmdb->dbis[NDB_DBI_ID], &key, &val);
- if (rc == MDB_NOTFOUND)
+ if (!ndb_get_note_by_id(c->read_txn, c->lmdb, id))
return NDB_IDRES_CONT;
return NDB_IDRES_STOP;
}
+
+static int ndb_process_profile_note(struct ndb_note *note, void **profile,
+ size_t *profile_len)
+{
+ int res;
+
+ flatcc_builder_t builder;
+ flatcc_json_parser_t json_parser;
+
+ flatcc_builder_init(&builder);
+
+ //printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note));
+ res = profile_parse_json(&builder, &json_parser,
+ ndb_note_content(note),
+ note->content_length,
+ flatcc_json_parser_f_skip_unknown);
+
+ if (res != 0) {
+ ndb_debug("profile_parse_json failed %d '%.*s'\n", res,
+ note->content_length, ndb_note_content(note));
+ return 0;
+ }
+
+ *profile = flatcc_builder_finalize_aligned_buffer(&builder, profile_len);
+ return 1;
+}
+
static int ndb_ingester_process_event(secp256k1_context *ctx,
struct ndb_ingester *ingester,
struct ndb_ingester_event *ev,
@@ -256,8 +326,8 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
struct ndb_note *note;
struct ndb_ingest_controller controller;
struct ndb_id_cb cb;
- void *buf;
- size_t bufsize, note_size;
+ void *buf, *flatbuf;
+ size_t bufsize, note_size, profile_len;
// we will use this to check if we already have it in the DB during
// ID parsing
@@ -311,9 +381,18 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
// to the writer thread
note = realloc(note, note_size);
- out->type = NDB_WRITER_NOTE;
- out->note.note = note;
- out->note.note_len = note_size;
+ if (note->kind == 0 &&
+ ndb_process_profile_note(note, &flatbuf, &profile_len)) {
+ out->type = NDB_WRITER_PROFILE;
+ out->profile.note.note = note;
+ out->profile.note.note_len = note_size;
+ out->profile.profile_flatbuf = flatbuf;
+ out->profile.profile_len = profile_len;
+ } else {
+ out->type = NDB_WRITER_NOTE;
+ out->note.note = note;
+ out->note.note_len = note_size;
+ }
// there's nothing left to do with the original json, so free it
free(ev->json);
@@ -327,12 +406,121 @@ cleanup:
return 0;
}
+static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db)
+{
+ MDB_cursor *mc;
+ MDB_val key, val;
+
+ if (mdb_cursor_open(txn, db, &mc))
+ return 0;
+
+ if (mdb_cursor_get(mc, &key, &val, MDB_LAST)) {
+ mdb_cursor_close(mc);
+ return 0;
+ }
+
+ mdb_cursor_close(mc);
+
+ assert(key.mv_size == 8);
+ return *((uint64_t*)key.mv_data);
+}
+
+static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
+ struct ndb_writer_profile *profile)
+{
+ uint64_t profile_key;
+ struct ndb_tsid tsid;
+ struct ndb_note *note;
+ int rc;
+
+ MDB_val key, val;
+ MDB_dbi profile_db, pk_db;
+
+ note = profile->note.note;
+
+ // get dbs
+ profile_db = lmdb->dbs[NDB_DB_PROFILE];
+ pk_db = lmdb->dbs[NDB_DB_PROFILE_PK];
+
+ // get new key
+ profile_key = ndb_get_last_key(txn, profile_db) + 1;
+
+ // write profile to profile store
+ key.mv_data = &profile_key;
+ key.mv_size = sizeof(profile_key);
+ val.mv_data = profile->profile_flatbuf;
+ val.mv_size = profile->profile_len;
+ //ndb_debug("profile_len %ld\n", profile->profile_len);
+
+ if ((rc = mdb_put(txn, profile_db, &key, &val, 0))) {
+ ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc));
+ return 0;
+ }
+
+ // write profile_pk + created_at index
+ ndb_tsid_init(&tsid, note->pubkey, note->created_at);
+
+ key.mv_data = &tsid;
+ key.mv_size = sizeof(tsid);
+ val.mv_data = &profile_key;
+ val.mv_size = sizeof(profile_key);
+
+ if ((rc = mdb_put(txn, pk_db, &key, &val, 0))) {
+ ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n",
+ profile_key, mdb_strerror(rc));
+ return 0;
+ }
+
+ return 1;
+}
+
+static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn,
+ struct ndb_writer_note *note)
+{
+ uint64_t note_key;
+ struct ndb_tsid tsid;
+ MDB_dbi note_db, id_db;
+ MDB_val key, val;
+
+ // get dbs
+ note_db = lmdb->dbs[NDB_DB_NOTE];
+ id_db = lmdb->dbs[NDB_DB_NOTE_ID];
+
+ // get new key
+ note_key = ndb_get_last_key(txn, note_db) + 1;
+
+ // write note to event store
+ key.mv_data = ¬e_key;
+ key.mv_size = sizeof(note_key);
+ val.mv_data = note->note;
+ val.mv_size = note->note_len;
+
+ if (mdb_put(txn, note_db, &key, &val, 0)) {
+ ndb_debug("write note to db failed\n");
+ return 0;
+ }
+
+ // write id index key clustered with created_at
+ ndb_tsid_init(&tsid, note->note->id, note->note->created_at);
+
+ key.mv_data = &tsid;
+ key.mv_size = sizeof(tsid);
+ val.mv_data = ¬e_key;
+ val.mv_size = sizeof(note_key);
+
+ if (mdb_put(txn, id_db, &key, &val, 0)) {
+ ndb_debug("write note id index to db failed\n");
+ return 0;
+ }
+
+ return note_key;
+}
+
static void *ndb_writer_thread(void *data)
{
struct ndb_writer *writer = data;
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
int i, popped, done, any_note;
- MDB_val key, val;
MDB_txn *txn;
done = 0;
@@ -344,9 +532,10 @@ static void *ndb_writer_thread(void *data)
any_note = 0;
for (i = 0 ; i < popped; i++) {
msg = &msgs[i];
- if (msg->type == NDB_WRITER_NOTE) {
- any_note = 1;
- break;
+ switch (msg->type) {
+ case NDB_WRITER_NOTE: any_note = 1; break;
+ case NDB_WRITER_PROFILE: any_note = 1; break;
+ case NDB_WRITER_QUIT: break;
}
}
@@ -360,23 +549,20 @@ static void *ndb_writer_thread(void *data)
for (i = 0; i < popped; i++) {
msg = &msgs[i];
+
switch (msg->type) {
case NDB_WRITER_QUIT:
// quits are handled before this
done = 1;
continue;
+ case NDB_WRITER_PROFILE:
+ ndb_write_note(writer->lmdb, txn, &msg->note);
+ // TODO: save note_key with profile
+ ndb_write_profile(writer->lmdb, txn, &msg->profile);
+ break;
case NDB_WRITER_NOTE:
- key.mv_size = 32;
- key.mv_data = msg->note.note->id;
- val.mv_size = msg->note.note_len;
- val.mv_data = msg->note.note;
-
- if (txn != NULL &&
- mdb_put(txn, writer->lmdb->dbis[NDB_DBI_ID],
- &key, &val, 0))
- {
- fprintf(stderr, "writer thread txn commit failed");
- }
+ ndb_write_note(writer->lmdb, txn, &msg->note);
+ break;
}
}
@@ -386,11 +572,16 @@ static void *ndb_writer_thread(void *data)
assert(false);
}
+
// free notes
for (i = 0; i < popped; i++) {
msg = &msgs[i];
if (msg->type == NDB_WRITER_NOTE)
free(msg->note.note);
+ else if (msg->type == NDB_WRITER_PROFILE) {
+ free(msg->profile.profile_flatbuf);
+ free(msg->profile.note.note);
+ }
}
}
@@ -566,7 +757,7 @@ static int ndb_init_lmdb(struct ndb_lmdb *lmdb, size_t mapsize)
return 0;
}
- if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBIS))) {
+ if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) {
fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc);
return 0;
}
@@ -576,16 +767,46 @@ static int ndb_init_lmdb(struct ndb_lmdb *lmdb, size_t mapsize)
return 0;
}
- // Initialize DBIs
+ // Initialize DBs
if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) {
fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc);
return 0;
}
- if ((rc = mdb_dbi_open(txn, "id", MDB_CREATE, &lmdb->dbis[NDB_DBI_ID]))) {
+ // note flatbuffer db
+ if ((rc = mdb_dbi_open(txn, "note", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NOTE]))) {
+ fprintf(stderr, "mdb_dbi_open event failed, error %d\n", rc);
+ return 0;
+ }
+
+ // note metadata db
+ if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_META]))) {
+ fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc);
+ return 0;
+ }
+
+ // profile flatbuffer db
+ if ((rc = mdb_dbi_open(txn, "profile", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_PROFILE]))) {
+ fprintf(stderr, "mdb_dbi_open profile failed, error %d\n", rc);
+ return 0;
+ }
+
+ // id+ts index flags
+ unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED;
+
+ // index dbs
+ if ((rc = mdb_dbi_open(txn, "note_id", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_ID]))) {
+ fprintf(stderr, "mdb_dbi_open id failed, error %d\n", rc);
+ return 0;
+ }
+ mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare);
+
+ if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) {
fprintf(stderr, "mdb_dbi_open id failed, error %d\n", rc);
return 0;
}
+ mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare);
+
// Commit the transaction
if ((rc = mdb_txn_commit(txn))) {