damus

nostr ios client
git clone git://jb55.com/damus
Log | Files | Refs | README | LICENSE

commit 1cf898e0b255d4d4e1f3efadd0120c8e390d8a57
parent 502ceee6d456d3583824d19f550e18eba4cdff8e
Author: William Casarin <jb55@jb55.com>
Date:   Mon, 23 Oct 2023 10:28:42 +0800

ndb: update nostrdb

This includes the new profile fetched_at logic and reaction stats.

When receiving new profiles, nostrdb will record when it was last
received in a new database. This database is a mapping from Pubkey to
timestamp.

You can manually read/write to this table using:

ndb_read_last_profile_fetch
ndb_write_last_profile_fetch

This patch also includes the new reaction counting metadata table. It is
not used yet (but reactions are still counted!)

Changelog-Added: Added reaction counters to nostrdb
Changelog-Added: Record when profile is last fetched in nostrdb

Diffstat:
Mnostrdb/nostrdb.c | 359+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------
Mnostrdb/nostrdb.h | 7++++---
2 files changed, 291 insertions(+), 75 deletions(-)

diff --git a/nostrdb/nostrdb.c b/nostrdb/nostrdb.c @@ -16,6 +16,8 @@ #include "bindings/c/profile_json_parser.h" #include "bindings/c/profile_builder.h" +#include "bindings/c/meta_builder.h" +#include "bindings/c/meta_reader.h" #include "bindings/c/profile_verifier.h" #include "secp256k1.h" #include "secp256k1_ecdh.h" @@ -155,8 +157,7 @@ static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id, key->search[sizeof(key->search) - 1] = '\0'; } -static int ndb_write_profile_search_index(struct ndb_lmdb *lmdb, - MDB_txn *txn, +static int ndb_write_profile_search_index(struct ndb_txn *txn, struct ndb_search_key *index_key, uint64_t profile_key) { @@ -168,7 +169,9 @@ static int ndb_write_profile_search_index(struct ndb_lmdb *lmdb, val.mv_data = &profile_key; val.mv_size = sizeof(profile_key); - if ((rc = mdb_put(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], &key, &val, 0))) { + if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], + &key, &val, 0))) + { ndb_debug("ndb_write_profile_search_index failed: %s\n", mdb_strerror(rc)); return 0; @@ -179,8 +182,7 @@ static int ndb_write_profile_search_index(struct ndb_lmdb *lmdb, // map usernames and display names to profile keys for user searching -static int ndb_write_profile_search_indices(struct ndb_lmdb *lmdb, - MDB_txn *txn, +static int ndb_write_profile_search_indices(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key, void *profile_root) @@ -199,8 +201,7 @@ static int ndb_write_profile_search_indices(struct ndb_lmdb *lmdb, if (name) { ndb_make_search_key(&index, note->pubkey, note->created_at, name); - if (!ndb_write_profile_search_index(lmdb, txn, &index, - profile_key)) + if (!ndb_write_profile_search_index(txn, &index, profile_key)) return 0; } @@ -211,19 +212,30 @@ static int ndb_write_profile_search_indices(struct ndb_lmdb *lmdb, } ndb_make_search_key(&index, note->pubkey, note->created_at, display_name); - if (!ndb_write_profile_search_index(lmdb, txn, &index, - profile_key)) + if (!ndb_write_profile_search_index(txn, &index, profile_key)) return 0; } return 1; } -int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn) + +static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags) { - txn->ndb = ndb; + txn->lmdb = &ndb->lmdb; MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn; - return mdb_txn_begin(ndb->lmdb.env, NULL, 0, mdb_txn) == 0; + return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0; +} + +int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn) +{ + return _ndb_begin_query(ndb, txn, MDB_RDONLY); +} + +// this should only be used in migrations, etc +static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn) +{ + return _ndb_begin_query(ndb, txn, 0); } @@ -243,8 +255,8 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb) size_t len; int count; - if (!ndb_begin_query(ndb, &txn)) { - fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_query failed\n"); + if (!ndb_begin_rw_query(ndb, &txn)) { + fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_rw_query failed\n"); return 0; } @@ -268,8 +280,7 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb) return 0; } - if (!ndb_write_profile_search_indices(&ndb->lmdb, txn.mdb_txn, - note, profile_key, + if (!ndb_write_profile_search_indices(&txn, note, profile_key, profile_root)) { fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n"); @@ -282,7 +293,8 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb) fprintf(stderr, "migrated %d profiles to include search indices\n", count); mdb_cursor_close(cur); - mdb_txn_commit(txn.mdb_txn); + + ndb_end_query(&txn); return 1; } @@ -442,11 +454,15 @@ struct ndb_writer_ndb_meta { uint64_t version; }; +// Used in the writer thread when writing ndb_profile_fetch_record's +// kv = pubkey: recor struct ndb_writer_last_fetch { unsigned char pubkey[32]; uint64_t fetched_at; }; +// The different types of messages that the writer thread can write to the +// database struct ndb_writer_msg { enum ndb_writer_msgtype type; union { @@ -457,9 +473,10 @@ struct ndb_writer_msg { }; }; -void ndb_end_query(struct ndb_txn *txn) +int ndb_end_query(struct ndb_txn *txn) { - mdb_txn_abort(txn->mdb_txn); + // this works on read or write queries. + return mdb_txn_commit(txn->mdb_txn) == 0; } int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32], @@ -504,18 +521,21 @@ static int ndb_writer_queue_note(struct ndb_writer *writer, return prot_queue_push(&writer->inbox, &msg); } -static void ndb_writer_last_profile_fetch(struct ndb_lmdb *lmdb, MDB_txn *txn, - struct ndb_writer_last_fetch *w) +static void ndb_writer_last_profile_fetch(struct ndb_txn *txn, + const unsigned char *pubkey, + uint64_t fetched_at) { int rc; MDB_val key, val; - key.mv_data = (unsigned char*)&w->pubkey; - key.mv_size = sizeof(w->pubkey); - val.mv_data = &w->fetched_at; - val.mv_size = sizeof(w->fetched_at); + key.mv_data = (unsigned char*)pubkey; + key.mv_size = 32; + val.mv_data = &fetched_at; + val.mv_size = sizeof(fetched_at); - if ((rc = mdb_put(txn, lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &key, &val, 0))) { + if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], + &key, &val, 0))) + { ndb_debug("write version to ndb_meta failed: %s\n", mdb_strerror(rc)); return; @@ -524,6 +544,46 @@ static void ndb_writer_last_profile_fetch(struct ndb_lmdb *lmdb, MDB_txn *txn, //fprintf(stderr, "writing version %" PRIu64 "\n", version); } + +// We just received a profile that we haven't processed yet, but it could +// be an older one! Make sure we only write last fetched profile if it's a new +// one +// +// To do this, we first check the latest profile in the database. If the +// created_date for this profile note is newer, then we write a +// last_profile_fetch record, otherwise we do not. +// +// WARNING: This function is only valid when called from the writer thread +static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn, + struct ndb_note *note) +{ + size_t len; + uint64_t profile_key, note_key; + void *root; + struct ndb_note *last_profile; + NdbProfileRecord_table_t record; + + if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) { + record = NdbProfileRecord_as_root(root); + note_key = NdbProfileRecord_note_key(record); + last_profile = ndb_get_note_by_key(txn, note_key, &len); + if (last_profile == NULL) { + return 0; + } + + // found profile, let's see if it's newer than ours + if (note->created_at > last_profile->created_at) { + // this is a new profile note, record last fetched time + ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); + } + } else { + // couldn't fetch profile. record last fetched time + ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); + } + + return 1; +} + int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, uint64_t fetched_at) { @@ -536,8 +596,8 @@ int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, } // get some value based on a clustered id key -int ndb_get_tsid(MDB_txn *txn, struct ndb_lmdb *lmdb, enum ndb_dbs db, - const unsigned char *id, MDB_val *val) +int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id, + MDB_val *val) { MDB_val k, v; MDB_cursor *cur; @@ -550,7 +610,7 @@ int ndb_get_tsid(MDB_txn *txn, struct ndb_lmdb *lmdb, enum ndb_dbs db, k.mv_data = &tsid; k.mv_size = sizeof(tsid); - mdb_cursor_open(txn, lmdb->dbs[db], &cur); + mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur); // Position cursor at the next key greater than or equal to the specified key if (mdb_cursor_get(cur, &k, &v, MDB_SET_RANGE)) { @@ -582,7 +642,7 @@ static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key, k.mv_data = &key; k.mv_size = sizeof(key); - if (mdb_get(txn->mdb_txn, txn->ndb->lmdb.dbs[store], &k, &v)) { + if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n"); return NULL; } @@ -602,7 +662,7 @@ static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind, if (len) *len = 0; - if (!ndb_get_tsid(txn->mdb_txn, &txn->ndb->lmdb, ind, pk, &k)) { + if (!ndb_get_tsid(txn, ind, pk, &k)) { //ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n"); return 0; } @@ -610,7 +670,7 @@ static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind, if (primkey) *primkey = *(uint64_t*)k.mv_data; - if (mdb_get(txn->mdb_txn, txn->ndb->lmdb.dbs[store], &k, &v)) { + if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n"); return 0; } @@ -638,7 +698,7 @@ static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn, { MDB_val k; - if (!ndb_get_tsid(txn->mdb_txn, &txn->ndb->lmdb, db, id, &k)) + if (!ndb_get_tsid(txn, db, id, &k)) return 0; return *(uint32_t*)k.mv_data; @@ -664,37 +724,52 @@ void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len); } -uint64_t ndb_read_last_profile_fetch(struct ndb_txn *txn, uint64_t profile_key) +uint64_t +ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey) { - size_t len; - void *ret = ndb_lookup_by_key(txn, profile_key, NDB_DB_PROFILE_LAST_FETCH, &len); - if (ret == NULL) + MDB_val k, v; + + k.mv_data = (unsigned char*)pubkey; + k.mv_size = 32; + + if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) { + //ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n"); return 0; - assert(len == sizeof(uint64_t)); - return *((uint64_t*)ret); + } + + return *((uint64_t*)v.mv_data); } -static int ndb_has_note(MDB_txn *txn, struct ndb_lmdb *lmdb, const unsigned char *id) +static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id) { MDB_val val; - if (!ndb_get_tsid(txn, lmdb, NDB_DB_NOTE_ID, id, &val)) + if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val)) return 0; return 1; } +static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb, + MDB_txn *mdb_txn) +{ + txn->lmdb = lmdb; + txn->mdb_txn = mdb_txn; +} + static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid) { unsigned char id[32]; struct ndb_ingest_controller *c = data; + struct ndb_txn txn; hex_decode(hexid, 64, id, sizeof(id)); // let's see if we already have it - if (!ndb_has_note(c->read_txn, c->lmdb, id)) + ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn); + if (!ndb_has_note(&txn, id)) return NDB_IDRES_CONT; return NDB_IDRES_STOP; @@ -785,8 +860,12 @@ static int ndb_ingester_process_note(secp256k1_context *ctx, size_t note_size, struct ndb_writer_msg *out) { + //printf("ndb_ingester_process_note "); + //print_hex(note->id, 32); + //printf("\n"); + // Verify! If it's an invalid note we don't need to - // bothter writing it to the database + // bother writing it to the database if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) { ndb_debug("signature verification failed\n"); return 0; @@ -957,8 +1036,8 @@ int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const cha k.mv_size = sizeof(s); if ((rc = mdb_cursor_open(txn->mdb_txn, - txn->ndb->lmdb.dbs[NDB_DB_PROFILE_SEARCH], - cursor))) { + txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], + cursor))) { printf("search_profile: cursor opened failed: %s\n", mdb_strerror(rc)); return 0; @@ -1040,7 +1119,7 @@ static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b) return 0; } -static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn, +static int ndb_write_profile(struct ndb_txn *txn, struct ndb_writer_profile *profile, uint64_t note_key) { @@ -1071,11 +1150,11 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn, //assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0); // get dbs - profile_db = lmdb->dbs[NDB_DB_PROFILE]; - pk_db = lmdb->dbs[NDB_DB_PROFILE_PK]; + profile_db = txn->lmdb->dbs[NDB_DB_PROFILE]; + pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK]; // get new key - profile_key = ndb_get_last_key(txn, profile_db) + 1; + profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1; // write profile to profile store key.mv_data = &profile_key; @@ -1084,7 +1163,7 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn, val.mv_size = flatbuf_len; //ndb_debug("profile_len %ld\n", profile->profile_len); - if ((rc = mdb_put(txn, profile_db, &key, &val, 0))) { + if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) { ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc)); return 0; } @@ -1097,14 +1176,20 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn, val.mv_data = &profile_key; val.mv_size = sizeof(profile_key); - if ((rc = mdb_put(txn, pk_db, &key, &val, 0))) { + // write last fetched record + if (!ndb_maybe_write_last_profile_fetch(txn, note)) { + ndb_debug("failed to write last profile fetched record\n"); + return 0; + } + + if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) { ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n", profile_key, mdb_strerror(rc)); return 0; } // write name, display_name profile search indices - if (!ndb_write_profile_search_indices(lmdb, txn, note, profile_key, + if (!ndb_write_profile_search_indices(txn, note, profile_key, flatbuf)) { ndb_debug("failed to write profile search indices\n"); return 0; @@ -1113,7 +1198,127 @@ static int ndb_write_profile(struct ndb_lmdb *lmdb, MDB_txn *txn, return 1; } -static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn, +// find the last id tag in a note (e, p, etc) +static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type) +{ + unsigned char *last = NULL; + struct ndb_iterator iter; + struct ndb_str str; + + // get the liked event id (last id) + ndb_tags_iterate_start(note, &iter); + + while (ndb_tags_iterate_next(&iter)) { + if (iter.tag->count < 2) + continue; + + str = ndb_note_str(note, &iter.tag->strs[0]); + + // assign liked to the last e tag + if (str.flag == NDB_PACKED_STR && str.str[0] == type) { + str = ndb_note_str(note, &iter.tag->strs[1]); + if (str.flag == NDB_PACKED_ID) + last = str.id; + } + } + + return last; +} + +void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len) +{ + MDB_val k, v; + + k.mv_data = (unsigned char*)id; + k.mv_size = 32; + + if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) { + ndb_debug("ndb_get_note_meta: mdb_get note failed\n"); + return NULL; + } + + if (len) + *len = v.mv_size; + + return v.mv_data; +} + +// When receiving a reaction note, look for the liked id and increase the +// reaction counter in the note metadata database +// +// TODO: I found some bugs when implementing this feature. If the same note id +// is processed multiple times in the same ingestion block, then it will count +// the like twice. This is because it hasn't been written to the DB yet and the +// ingestor doesn't know about notes that are being processed at the same time. +// One fix for this is to maintain a hashtable in the ingestor and make sure +// the same note is not processed twice. +// +// I'm not sure how common this would be, so I'm not going to worry about it +// for now, but it's something to keep in mind. +static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note) +{ + size_t len; + void *root; + int reactions, rc; + MDB_val key, val; + NdbEventMeta_table_t meta; + unsigned char *liked = ndb_note_last_id_tag(note, 'e'); + + if (liked == NULL) + return 0; + + root = ndb_get_note_meta(txn, liked, &len); + + flatcc_builder_t builder; + flatcc_builder_init(&builder); + NdbEventMeta_start_as_root(&builder); + + // no meta record, let's make one + if (root == NULL) { + NdbEventMeta_reactions_add(&builder, 1); + } else { + // clone existing and add to it + meta = NdbEventMeta_as_root(root); + + reactions = NdbEventMeta_reactions_get(meta); + NdbEventMeta_clone(&builder, meta); + NdbEventMeta_reactions_add(&builder, reactions + 1); + } + + NdbProfileRecord_end_as_root(&builder); + root = flatcc_builder_finalize_aligned_buffer(&builder, &len); + assert(((uint64_t)root % 8) == 0); + + if (root == NULL) { + ndb_debug("failed to create note metadata record\n"); + return 0; + } + + // metadata is keyed on id because we want to collect stats regardless + // if we have the note yet or not + key.mv_data = liked; + key.mv_size = 32; + + val.mv_data = root; + val.mv_size = len; + + // write the new meta record + //ndb_debug("writing stats record for "); + //print_hex(liked, 32); + //ndb_debug("\n"); + + if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) { + ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc)); + return 0; + } + + free(root); + + return 1; +} + + +static uint64_t ndb_write_note(struct ndb_txn *txn, struct ndb_writer_note *note) { int rc; @@ -1123,11 +1328,11 @@ static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn, MDB_val key, val; // get dbs - note_db = lmdb->dbs[NDB_DB_NOTE]; - id_db = lmdb->dbs[NDB_DB_NOTE_ID]; + note_db = txn->lmdb->dbs[NDB_DB_NOTE]; + id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; // get new key - note_key = ndb_get_last_key(txn, note_db) + 1; + note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1; // write note to event store key.mv_data = &note_key; @@ -1135,7 +1340,7 @@ static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn, val.mv_data = note->note; val.mv_size = note->note_len; - if ((rc = mdb_put(txn, note_db, &key, &val, 0))) { + if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) { ndb_debug("write note to db failed: %s\n", mdb_strerror(rc)); return 0; } @@ -1148,17 +1353,21 @@ static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn, val.mv_data = &note_key; val.mv_size = sizeof(note_key); - if ((rc = mdb_put(txn, id_db, &key, &val, 0))) { + if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) { ndb_debug("write note id index to db failed: %s\n", mdb_strerror(rc)); return 0; } + if (note->note->kind == 7) { + ndb_write_reaction_stats(txn, note->note); + } + return note_key; } // only to be called from the writer thread -static void ndb_write_version(struct ndb_lmdb *lmdb, MDB_txn *txn, uint64_t version) +static void ndb_write_version(struct ndb_txn *txn, uint64_t version) { int rc; MDB_val key, val; @@ -1171,7 +1380,7 @@ static void ndb_write_version(struct ndb_lmdb *lmdb, MDB_txn *txn, uint64_t vers val.mv_data = &version; val.mv_size = sizeof(version); - if ((rc = mdb_put(txn, lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { + if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { ndb_debug("write version to ndb_meta failed: %s\n", mdb_strerror(rc)); return; @@ -1186,11 +1395,13 @@ static void *ndb_writer_thread(void *data) struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; int i, popped, done, any_note; uint64_t note_nkey; - MDB_txn *txn; + MDB_txn *mdb_txn = NULL; + struct ndb_txn txn; + ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn); done = 0; while (!done) { - txn = NULL; + txn.mdb_txn = NULL; popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); //ndb_debug("writer popped %d items\n", popped); @@ -1206,7 +1417,7 @@ static void *ndb_writer_thread(void *data) } } - if (any_note && mdb_txn_begin(writer->lmdb->env, NULL, 0, &txn)) + if (any_note && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn)) { fprintf(stderr, "writer thread txn_begin failed"); // should definitely not happen unless DB is full @@ -1224,33 +1435,37 @@ static void *ndb_writer_thread(void *data) continue; case NDB_WRITER_PROFILE: note_nkey = - ndb_write_note(writer->lmdb, txn, &msg->note); + ndb_write_note(&txn, &msg->note); if (msg->profile.record.builder) { // only write if parsing didn't fail - ndb_write_profile(writer->lmdb, txn, - &msg->profile, + ndb_write_profile(&txn, &msg->profile, note_nkey); } break; case NDB_WRITER_NOTE: - ndb_write_note(writer->lmdb, txn, &msg->note); + ndb_write_note(&txn, &msg->note); + //printf("wrote note "); + //print_hex(msg->note.note->id, 32); + //printf("\n"); break; case NDB_WRITER_DBMETA: - ndb_write_version(writer->lmdb, txn, msg->ndb_meta.version); + ndb_write_version(&txn, msg->ndb_meta.version); break; case NDB_WRITER_PROFILE_LAST_FETCH: - ndb_writer_last_profile_fetch(writer->lmdb, txn, &msg->last_fetch); + ndb_writer_last_profile_fetch(&txn, + msg->last_fetch.pubkey, + msg->last_fetch.fetched_at + ); break; } } // commit writes - if (any_note && mdb_txn_commit(txn)) { + if (any_note && !ndb_end_query(&txn)) { fprintf(stderr, "writer thread txn commit failed"); assert(false); } - // free notes for (i = 0; i < popped; i++) { msg = &msgs[i]; @@ -1459,7 +1674,7 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map } // note metadata db - if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_META]))) { + if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) { fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc); return 0; } diff --git a/nostrdb/nostrdb.h b/nostrdb/nostrdb.h @@ -40,7 +40,7 @@ struct ndb_search { // required to keep a read struct ndb_txn { - struct ndb *ndb; + struct ndb_lmdb *lmdb; void *mdb_txn; }; @@ -196,15 +196,16 @@ int ndb_begin_query(struct ndb *, struct ndb_txn *); int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query); int ndb_search_profile_next(struct ndb_search *search); void ndb_search_profile_end(struct ndb_search *search); -void ndb_end_query(struct ndb_txn *); +int ndb_end_query(struct ndb_txn *); int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, uint64_t fetched_at); -uint64_t ndb_read_last_profile_fetch(struct ndb_txn *txn, uint64_t profile_key); +uint64_t ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey); void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pubkey, size_t *len, uint64_t *primkey); void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len); uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id); uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id); struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *primkey); struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len); +void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len); void ndb_destroy(struct ndb *); // BUILDER