commit 7361512ece1648e04536e3fdbbe7753c02590f95
parent 26b1f480fc4b1c0711668c5d972b402cb91adf87
Author: William Casarin <jb55@jb55.com>
Date: Sat, 7 Oct 2023 21:12:29 -0700
txn: refactor nostrdb to use ndb_txn in more places internally
Diffstat:
M | nostrdb.c | | | 143 | +++++++++++++++++++++++++++++++++++++++++++++++-------------------------------- |
M | nostrdb.h | | | 4 | ++-- |
2 files changed, 87 insertions(+), 60 deletions(-)
diff --git a/nostrdb.c b/nostrdb.c
@@ -155,8 +155,7 @@ static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id,
key->search[sizeof(key->search) - 1] = '\0';
}
-static int ndb_write_profile_search_index(struct ndb_lmdb *lmdb,
- MDB_txn *txn,
+static int ndb_write_profile_search_index(struct ndb_txn *txn,
struct ndb_search_key *index_key,
uint64_t profile_key)
{
@@ -168,7 +167,9 @@ static int ndb_write_profile_search_index(struct ndb_lmdb *lmdb,
val.mv_data = &profile_key;
val.mv_size = sizeof(profile_key);
- if ((rc = mdb_put(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], &key, &val, 0))) {
+ if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH],
+ &key, &val, 0)))
+ {
ndb_debug("ndb_write_profile_search_index failed: %s\n",
mdb_strerror(rc));
return 0;
@@ -179,8 +180,7 @@ static int ndb_write_profile_search_index(struct ndb_lmdb *lmdb,
// map usernames and display names to profile keys for user searching
-static int ndb_write_profile_search_indices(struct ndb_lmdb *lmdb,
- MDB_txn *txn,
+static int ndb_write_profile_search_indices(struct ndb_txn *txn,
struct ndb_note *note,
uint64_t profile_key,
void *profile_root)
@@ -199,8 +199,7 @@ static int ndb_write_profile_search_indices(struct ndb_lmdb *lmdb,
if (name) {
ndb_make_search_key(&index, note->pubkey, note->created_at,
name);
- if (!ndb_write_profile_search_index(lmdb, txn, &index,
- profile_key))
+ if (!ndb_write_profile_search_index(txn, &index, profile_key))
return 0;
}
@@ -211,19 +210,30 @@ static int ndb_write_profile_search_indices(struct ndb_lmdb *lmdb,
}
ndb_make_search_key(&index, note->pubkey, note->created_at,
display_name);
- if (!ndb_write_profile_search_index(lmdb, txn, &index,
- profile_key))
+ if (!ndb_write_profile_search_index(txn, &index, profile_key))
return 0;
}
return 1;
}
-int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn)
+
+static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags)
{
- txn->ndb = ndb;
+ txn->lmdb = &ndb->lmdb;
MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn;
- return mdb_txn_begin(ndb->lmdb.env, NULL, 0, mdb_txn) == 0;
+ return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0;
+}
+
+int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn)
+{
+ return _ndb_begin_query(ndb, txn, MDB_RDONLY);
+}
+
+// this should only be used in migrations, etc
+static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn)
+{
+ return _ndb_begin_query(ndb, txn, 0);
}
@@ -243,8 +253,8 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb)
size_t len;
int count;
- if (!ndb_begin_query(ndb, &txn)) {
- fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_query failed\n");
+ if (!ndb_begin_rw_query(ndb, &txn)) {
+ fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_rw_query failed\n");
return 0;
}
@@ -268,8 +278,7 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb)
return 0;
}
- if (!ndb_write_profile_search_indices(&ndb->lmdb, txn.mdb_txn,
- note, profile_key,
+ if (!ndb_write_profile_search_indices(&txn, note, profile_key,
profile_root)) {
fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n");
@@ -282,7 +291,8 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb)
fprintf(stderr, "migrated %d profiles to include search indices\n", count);
mdb_cursor_close(cur);
- mdb_txn_commit(txn.mdb_txn);
+
+ ndb_end_query(&txn);
return 1;
}
@@ -457,9 +467,10 @@ struct ndb_writer_msg {
};
};
-void ndb_end_query(struct ndb_txn *txn)
+int ndb_end_query(struct ndb_txn *txn)
{
- mdb_txn_abort(txn->mdb_txn);
+ // this works on read or write queries.
+ return mdb_txn_commit(txn->mdb_txn) == 0;
}
int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32],
@@ -504,7 +515,7 @@ static int ndb_writer_queue_note(struct ndb_writer *writer,
return prot_queue_push(&writer->inbox, &msg);
}
-static void ndb_writer_last_profile_fetch(struct ndb_lmdb *lmdb, MDB_txn *txn,
+static void ndb_writer_last_profile_fetch(struct ndb_txn *txn,
struct ndb_writer_last_fetch *w)
{
int rc;
@@ -515,7 +526,9 @@ static void ndb_writer_last_profile_fetch(struct ndb_lmdb *lmdb, MDB_txn *txn,
val.mv_data = &w->fetched_at;
val.mv_size = sizeof(w->fetched_at);
- if ((rc = mdb_put(txn, lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &key, &val, 0))) {
+ if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH],
+ &key, &val, 0)))
+ {
ndb_debug("write version to ndb_meta failed: %s\n",
mdb_strerror(rc));
return;
@@ -536,8 +549,8 @@ int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey,
}
// get some value based on a clustered id key
-int ndb_get_tsid(MDB_txn *txn, struct ndb_lmdb *lmdb, enum ndb_dbs db,
- const unsigned char *id, MDB_val *val)
+int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id,
+ MDB_val *val)
{
MDB_val k, v;
MDB_cursor *cur;
@@ -550,7 +563,7 @@ int ndb_get_tsid(MDB_txn *txn, struct ndb_lmdb *lmdb, enum ndb_dbs db,
k.mv_data = &tsid;
k.mv_size = sizeof(tsid);
- mdb_cursor_open(txn, lmdb->dbs[db], &cur);
+ mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur);
// Position cursor at the next key greater than or equal to the specified key
if (mdb_cursor_get(cur, &k, &v, MDB_SET_RANGE)) {
@@ -582,7 +595,7 @@ static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key,
k.mv_data = &key;
k.mv_size = sizeof(key);
- if (mdb_get(txn->mdb_txn, txn->ndb->lmdb.dbs[store], &k, &v)) {
+ if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) {
ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
return NULL;
}
@@ -602,7 +615,7 @@ static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind,
if (len)
*len = 0;
- if (!ndb_get_tsid(txn->mdb_txn, &txn->ndb->lmdb, ind, pk, &k)) {
+ if (!ndb_get_tsid(txn, ind, pk, &k)) {
//ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n");
return 0;
}
@@ -610,7 +623,7 @@ static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind,
if (primkey)
*primkey = *(uint64_t*)k.mv_data;
- if (mdb_get(txn->mdb_txn, txn->ndb->lmdb.dbs[store], &k, &v)) {
+ if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) {
ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
return 0;
}
@@ -638,7 +651,7 @@ static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn,
{
MDB_val k;
- if (!ndb_get_tsid(txn->mdb_txn, &txn->ndb->lmdb, db, id, &k))
+ if (!ndb_get_tsid(txn, db, id, &k))
return 0;
return *(uint32_t*)k.mv_data;
@@ -675,26 +688,35 @@ uint64_t ndb_read_last_profile_fetch(struct ndb_txn *txn, uint64_t profile_key)
}
-static int ndb_has_note(MDB_txn *txn, struct ndb_lmdb *lmdb, const unsigned char *id)
+static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id)
{
MDB_val val;
- if (!ndb_get_tsid(txn, lmdb, NDB_DB_NOTE_ID, id, &val))
+ if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val))
return 0;
return 1;
}
+static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb,
+ MDB_txn *mdb_txn)
+{
+ txn->lmdb = lmdb;
+ txn->mdb_txn = mdb_txn;
+}
+
static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid)
{
unsigned char id[32];
struct ndb_ingest_controller *c = data;
+ struct ndb_txn txn;
hex_decode(hexid, 64, id, sizeof(id));
// let's see if we already have it
- if (!ndb_has_note(c->read_txn, c->lmdb, id))
+ ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn);
+ if (!ndb_has_note(&txn, id))
return NDB_IDRES_CONT;
return NDB_IDRES_STOP;
@@ -957,8 +979,8 @@ int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const cha
k.mv_size = sizeof(s);
if ((rc = mdb_cursor_open(txn->mdb_txn,
- txn->ndb->lmdb.dbs[NDB_DB_PROFILE_SEARCH],
- cursor))) {
+ txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH],
+ cursor))) {
printf("search_profile: cursor opened failed: %s\n",
mdb_strerror(rc));
return 0;
@@ -1040,7 +1062,7 @@ static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b)
return 0;
}
-static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
+static int ndb_write_profile(struct ndb_txn *txn,
struct ndb_writer_profile *profile,
uint64_t note_key)
{
@@ -1071,11 +1093,11 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
//assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0);
// get dbs
- profile_db = lmdb->dbs[NDB_DB_PROFILE];
- pk_db = lmdb->dbs[NDB_DB_PROFILE_PK];
+ profile_db = txn->lmdb->dbs[NDB_DB_PROFILE];
+ pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK];
// get new key
- profile_key = ndb_get_last_key(txn, profile_db) + 1;
+ profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1;
// write profile to profile store
key.mv_data = &profile_key;
@@ -1084,7 +1106,7 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
val.mv_size = flatbuf_len;
//ndb_debug("profile_len %ld\n", profile->profile_len);
- if ((rc = mdb_put(txn, profile_db, &key, &val, 0))) {
+ if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) {
ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc));
return 0;
}
@@ -1097,14 +1119,14 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
val.mv_data = &profile_key;
val.mv_size = sizeof(profile_key);
- if ((rc = mdb_put(txn, pk_db, &key, &val, 0))) {
+ if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) {
ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n",
profile_key, mdb_strerror(rc));
return 0;
}
// write name, display_name profile search indices
- if (!ndb_write_profile_search_indices(lmdb, txn, note, profile_key,
+ if (!ndb_write_profile_search_indices(txn, note, profile_key,
flatbuf)) {
ndb_debug("failed to write profile search indices\n");
return 0;
@@ -1113,7 +1135,7 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn,
return 1;
}
-static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn,
+static uint64_t ndb_write_note(struct ndb_txn *txn,
struct ndb_writer_note *note)
{
int rc;
@@ -1123,11 +1145,11 @@ static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn,
MDB_val key, val;
// get dbs
- note_db = lmdb->dbs[NDB_DB_NOTE];
- id_db = lmdb->dbs[NDB_DB_NOTE_ID];
+ note_db = txn->lmdb->dbs[NDB_DB_NOTE];
+ id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
// get new key
- note_key = ndb_get_last_key(txn, note_db) + 1;
+ note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1;
// write note to event store
key.mv_data = ¬e_key;
@@ -1135,7 +1157,7 @@ static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn,
val.mv_data = note->note;
val.mv_size = note->note_len;
- if ((rc = mdb_put(txn, note_db, &key, &val, 0))) {
+ if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) {
ndb_debug("write note to db failed: %s\n", mdb_strerror(rc));
return 0;
}
@@ -1148,17 +1170,21 @@ static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn,
val.mv_data = ¬e_key;
val.mv_size = sizeof(note_key);
- if ((rc = mdb_put(txn, id_db, &key, &val, 0))) {
+ if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) {
ndb_debug("write note id index to db failed: %s\n",
mdb_strerror(rc));
return 0;
}
+ if (note->note->kind == 7) {
+ ndb_write_reaction_stats(txn, note->note, note_key);
+ }
+
return note_key;
}
// only to be called from the writer thread
-static void ndb_write_version(struct ndb_lmdb *lmdb, MDB_txn *txn, uint64_t version)
+static void ndb_write_version(struct ndb_txn *txn, uint64_t version)
{
int rc;
MDB_val key, val;
@@ -1171,7 +1197,7 @@ static void ndb_write_version(struct ndb_lmdb *lmdb, MDB_txn *txn, uint64_t vers
val.mv_data = &version;
val.mv_size = sizeof(version);
- if ((rc = mdb_put(txn, lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) {
+ if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) {
ndb_debug("write version to ndb_meta failed: %s\n",
mdb_strerror(rc));
return;
@@ -1186,11 +1212,13 @@ static void *ndb_writer_thread(void *data)
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
int i, popped, done, any_note;
uint64_t note_nkey;
- MDB_txn *txn;
+ MDB_txn *mdb_txn = NULL;
+ struct ndb_txn txn;
+ ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn);
done = 0;
while (!done) {
- txn = NULL;
+ txn.mdb_txn = NULL;
popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
//ndb_debug("writer popped %d items\n", popped);
@@ -1206,7 +1234,7 @@ static void *ndb_writer_thread(void *data)
}
}
- if (any_note && mdb_txn_begin(writer->lmdb->env, NULL, 0, &txn))
+ if (any_note && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn))
{
fprintf(stderr, "writer thread txn_begin failed");
// should definitely not happen unless DB is full
@@ -1224,28 +1252,27 @@ static void *ndb_writer_thread(void *data)
continue;
case NDB_WRITER_PROFILE:
note_nkey =
- ndb_write_note(writer->lmdb, txn, &msg->note);
+ ndb_write_note(&txn, &msg->note);
if (msg->profile.record.builder) {
// only write if parsing didn't fail
- ndb_write_profile(writer->lmdb, txn,
- &msg->profile,
+ ndb_write_profile(&txn, &msg->profile,
note_nkey);
}
break;
case NDB_WRITER_NOTE:
- ndb_write_note(writer->lmdb, txn, &msg->note);
+ ndb_write_note(&txn, &msg->note);
break;
case NDB_WRITER_DBMETA:
- ndb_write_version(writer->lmdb, txn, msg->ndb_meta.version);
+ ndb_write_version(&txn, msg->ndb_meta.version);
break;
case NDB_WRITER_PROFILE_LAST_FETCH:
- ndb_writer_last_profile_fetch(writer->lmdb, txn, &msg->last_fetch);
+ ndb_writer_last_profile_fetch(&txn, &msg->last_fetch);
break;
}
}
// commit writes
- if (any_note && mdb_txn_commit(txn)) {
+ if (any_note && !ndb_end_query(&txn)) {
fprintf(stderr, "writer thread txn commit failed");
assert(false);
}
diff --git a/nostrdb.h b/nostrdb.h
@@ -40,7 +40,7 @@ struct ndb_search {
// required to keep a read
struct ndb_txn {
- struct ndb *ndb;
+ struct ndb_lmdb *lmdb;
void *mdb_txn;
};
@@ -196,7 +196,7 @@ int ndb_begin_query(struct ndb *, struct ndb_txn *);
int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query);
int ndb_search_profile_next(struct ndb_search *search);
void ndb_search_profile_end(struct ndb_search *search);
-void ndb_end_query(struct ndb_txn *);
+int ndb_end_query(struct ndb_txn *);
int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, uint64_t fetched_at);
uint64_t ndb_read_last_profile_fetch(struct ndb_txn *txn, uint64_t profile_key);
void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pubkey, size_t *len, uint64_t *primkey);