nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

commit 0f13ab2989cc585bf2af373295061e43c775c96f
parent 866dc0a15817b977f43e1104cd4a7ff7ed111ac5
Author: William Casarin <jb55@jb55.com>
Date:   Fri,  6 Oct 2023 10:04:31 -0700

record when a profile was last fetched

This will be used for optimizing profile fetching

Diffstat:
Mnostrdb.c | 85+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------
Mnostrdb.h | 2+-
Mtest.c | 40++++++++++++++++++++++++++++++++++++++++
3 files changed, 114 insertions(+), 13 deletions(-)

diff --git a/nostrdb.c b/nostrdb.c @@ -454,11 +454,15 @@ struct ndb_writer_ndb_meta { uint64_t version; }; +// Used in the writer thread when writing ndb_profile_fetch_record's +// kv = pubkey: recor struct ndb_writer_last_fetch { unsigned char pubkey[32]; uint64_t fetched_at; }; +// The different types of messages that the writer thread can write to the +// database struct ndb_writer_msg { enum ndb_writer_msgtype type; union { @@ -518,15 +522,17 @@ static int ndb_writer_queue_note(struct ndb_writer *writer, } static void ndb_writer_last_profile_fetch(struct ndb_txn *txn, - struct ndb_writer_last_fetch *w) + const unsigned char *pubkey, + uint64_t fetched_at + ) { int rc; MDB_val key, val; - key.mv_data = (unsigned char*)&w->pubkey; - key.mv_size = sizeof(w->pubkey); - val.mv_data = &w->fetched_at; - val.mv_size = sizeof(w->fetched_at); + key.mv_data = (unsigned char*)pubkey; + key.mv_size = 32; + val.mv_data = &fetched_at; + val.mv_size = sizeof(fetched_at); if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &key, &val, 0))) @@ -539,6 +545,46 @@ static void ndb_writer_last_profile_fetch(struct ndb_txn *txn, //fprintf(stderr, "writing version %" PRIu64 "\n", version); } + +// We just received a profile that we haven't processed yet, but it could +// be an older one! Make sure we only write last fetched profile if it's a new +// one +// +// To do this, we first check the latest profile in the database. If the +// created_date for this profile note is newer, then we write a +// last_profile_fetch record, otherwise we do not. +// +// WARNING: This function is only valid when called from the writer thread +static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn, + struct ndb_note *note) +{ + size_t len; + uint64_t profile_key, note_key; + void *root; + struct ndb_note *last_profile; + NdbProfileRecord_table_t record; + + if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) { + record = NdbProfileRecord_as_root(root); + note_key = NdbProfileRecord_note_key(record); + last_profile = ndb_get_note_by_key(txn, note_key, &len); + if (last_profile == NULL) { + return 0; + } + + // found profile, let's see if it's newer than ours + if (note->created_at > last_profile->created_at) { + // this is a new profile note, record last fetched time + ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); + } + } else { + // couldn't fetch profile. record last fetched time + ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); + } + + return 1; +} + int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, uint64_t fetched_at) { @@ -679,14 +725,20 @@ void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len); } -uint64_t ndb_read_last_profile_fetch(struct ndb_txn *txn, uint64_t profile_key) +uint64_t +ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey) { - size_t len; - void *ret = ndb_lookup_by_key(txn, profile_key, NDB_DB_PROFILE_LAST_FETCH, &len); - if (ret == NULL) + MDB_val k, v; + + k.mv_data = (unsigned char*)pubkey; + k.mv_size = 32; + + if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) { + ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n"); return 0; - assert(len == sizeof(uint64_t)); - return *((uint64_t*)ret); + } + + return *((uint64_t*)v.mv_data); } @@ -1125,6 +1177,12 @@ static int ndb_write_profile(struct ndb_txn *txn, val.mv_data = &profile_key; val.mv_size = sizeof(profile_key); + // write last fetched record + if (!ndb_maybe_write_last_profile_fetch(txn, note)) { + ndb_debug("failed to write last profile fetched record\n"); + return 0; + } + if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) { ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n", profile_key, mdb_strerror(rc)); @@ -1395,7 +1453,10 @@ static void *ndb_writer_thread(void *data) ndb_write_version(&txn, msg->ndb_meta.version); break; case NDB_WRITER_PROFILE_LAST_FETCH: - ndb_writer_last_profile_fetch(&txn, &msg->last_fetch); + ndb_writer_last_profile_fetch(&txn, + msg->last_fetch.pubkey, + msg->last_fetch.fetched_at + ); break; } } diff --git a/nostrdb.h b/nostrdb.h @@ -198,7 +198,7 @@ int ndb_search_profile_next(struct ndb_search *search); void ndb_search_profile_end(struct ndb_search *search); int ndb_end_query(struct ndb_txn *); int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, uint64_t fetched_at); -uint64_t ndb_read_last_profile_fetch(struct ndb_txn *txn, uint64_t profile_key); +uint64_t ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey); void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pubkey, size_t *len, uint64_t *primkey); void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len); uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id); diff --git a/test.c b/test.c @@ -46,6 +46,45 @@ static void print_search(struct ndb_txn *txn, struct ndb_search *search) printf("\n"); } +static void test_fetched_at() +{ + struct ndb *ndb; + size_t mapsize; + int ingester_threads; + struct ndb_txn txn; + uint64_t fetched_at; + + mapsize = 1024 * 1024 * 100; + ingester_threads = 1; + + assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0)); + + const unsigned char pubkey[] = { 0x87, 0xfb, 0xc6, 0xd5, 0x98, 0x31, 0xa8, 0x23, 0xa4, 0x5d, 0x10, 0x1f, + 0x86, 0x94, 0x2c, 0x41, 0xcd, 0xe2, 0x90, 0x23, 0xf4, 0x09, 0x20, 0x24, + 0xa2, 0x7c, 0x50, 0x10, 0x3c, 0x15, 0x40, 0x01 }; + + const char profile_1[] = "[\"EVENT\",{\"id\": \"a44eb8fb6931d6155b04038bef0624407e46c85c61e5758392cbb615f00184ca\",\"pubkey\": \"87fbc6d59831a823a45d101f86942c41cde29023f4092024a27c50103c154001\",\"created_at\": 1695593354,\"kind\": 0,\"tags\": [],\"content\": \"{\\\"name\\\":\\\"b\\\"}\",\"sig\": \"7540bbde4b4479275e20d95acaa64027359a73989927f878825093cba2f468bd8e195919a77b4c230acecddf92e6b4bee26918b0c0842f84ec7c1fae82453906\"}]"; + + uint64_t t1 = time(NULL); + + // process the first event, this should set the fetched_at + assert(ndb_process_client_event(ndb, profile_1, sizeof(profile_1))); + + // we sleep for a second because we want to make sure the fetched_at is not + // updated for the next record, which is an older profile. + sleep(1); + + assert(ndb_begin_query(ndb, &txn)); + + // this should be set to t1 + fetched_at = ndb_read_last_profile_fetch(&txn, pubkey); + + assert(fetched_at == t1); + + //const char profile_2[] = "[\"EVENT\",{\"id\": \"9b2861dda8fc602ec2753f92f1a443c9565de606e0c8f4fd2db4f2506a3b13ca\",\"pubkey\": \"87fbc6d59831a823a45d101f86942c41cde29023f4092024a27c50103c154001\",\"created_at\": 1695593347,\"kind\": 0,\"tags\": [],\"content\": \"{\\\"name\\\":\\\"a\\\"}\",\"sig\": \"f48da228f8967d33c3caf0a78f853b5144631eb86c7777fd25949123a5272a92765a0963d4686dd0efe05b7a9b986bfac8d43070b234153acbae5006d5a90f31\"}]"; + + +} static void test_reaction_counter() { @@ -821,6 +860,7 @@ static void test_fast_strchr() int main(int argc, const char *argv[]) { test_migrate(); + test_fetched_at(); test_profile_updates(); test_reaction_counter(); test_load_profiles();