damus

nostr ios client
git clone git://jb55.com/damus
Log | Files | Refs | README | LICENSE

commit fafe3b4b3e4ed513d772c08fea3f744e3c6a422c
parent 69c7acea76c3b931d404148ff2779278587a198c
Author: William Casarin <jb55@jb55.com>
Date:   Wed, 13 Sep 2023 05:40:44 -0700

ndb: add nostrdb migrations

Diffstat:
Mnostrdb/Ndb.swift | 2+-
Mnostrdb/nostrdb.c | 163+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mnostrdb/nostrdb.h | 5++++-
3 files changed, 164 insertions(+), 6 deletions(-)

diff --git a/nostrdb/Ndb.swift b/nostrdb/Ndb.swift @@ -33,7 +33,7 @@ class Ndb { let ok = path.withCString { testdir in var ok = false while !ok && mapsize > 1024 * 1024 * 700 { - ok = ndb_init(&ndb_p, testdir, mapsize, ingest_threads) != 0 + ok = ndb_init(&ndb_p, testdir, mapsize, ingest_threads, 0) != 0 if !ok { mapsize /= 2 } diff --git a/nostrdb/nostrdb.c b/nostrdb/nostrdb.c @@ -27,6 +27,7 @@ static const int THREAD_QUEUE_BATCH = 4096; // the maximum size of inbox queues static const int DEFAULT_QUEUE_SIZE = 1000000; +#define ndb_flag_set(flags, f) ((flags & f) == f) #define NDB_PARSED_ID (1 << 0) #define NDB_PARSED_PUBKEY (1 << 1) @@ -37,6 +38,12 @@ static const int DEFAULT_QUEUE_SIZE = 1000000; #define NDB_PARSED_TAGS (1 << 6) #define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS) +typedef int (*ndb_migrate_fn)(struct ndb *); + +struct ndb_migration { + ndb_migrate_fn fn; +}; + struct ndb_profile_record_builder { flatcc_builder_t *builder; void *flatbuf; @@ -61,9 +68,15 @@ enum ndb_dbs { NDB_DB_PROFILE, NDB_DB_NOTE_ID, NDB_DB_PROFILE_PK, + NDB_DB_NDB_META, NDB_DBS, }; +// keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) +enum ndb_meta_key { + NDB_META_KEY_VERSION = 1 +}; + struct ndb_json_parser { const char *json; int json_len; @@ -100,6 +113,7 @@ struct ndb { struct ndb_lmdb lmdb; struct ndb_ingester ingester; struct ndb_writer writer; + int version; // lmdb environ handles, etc }; @@ -109,6 +123,49 @@ struct ndb_tsid { uint64_t timestamp; }; +// Migrations +// + +static int ndb_migrate_user_search_indices(struct ndb *ndb) +{ + return 1; +} + +static struct ndb_migration MIGRATIONS[] = { + //{ .fn = ndb_migrate_user_search_indices } +}; + + +int ndb_db_version(struct ndb *ndb) +{ + int rc; + uint64_t version, version_key; + MDB_val k, v; + MDB_txn *txn; + + version_key = NDB_META_KEY_VERSION; + k.mv_data = &version_key; + k.mv_size = sizeof(version_key); + + if ((rc = mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn))) { + fprintf(stderr, "ndb_db_version: mdb_txn_begin failed, error %d\n", rc); + return -1; + } + + if (mdb_get(txn, ndb->lmdb.dbs[NDB_DB_NDB_META], &k, &v)) { + version = -1; + } else { + if (v.mv_size != 8) { + fprintf(stderr, "run_migrations: invalid version size?"); + return 0; + } + version = *((uint64_t*)v.mv_data); + } + + mdb_txn_abort(txn); + return version; +} + /** From LMDB: Compare two items lexically */ static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) { int diff; @@ -175,6 +232,7 @@ enum ndb_writer_msgtype { NDB_WRITER_QUIT, // kill thread immediately NDB_WRITER_NOTE, // write a note to the db NDB_WRITER_PROFILE, // write a profile to the db + NDB_WRITER_DBMETA, // write ndb metadata }; struct ndb_ingester_event { @@ -199,11 +257,17 @@ struct ndb_ingester_msg { }; }; +struct ndb_writer_ndb_meta { + // these are 64 bit because I'm paranoid of db-wide alignment issues + uint64_t version; +}; + struct ndb_writer_msg { enum ndb_writer_msgtype type; union { struct ndb_writer_note note; struct ndb_writer_profile profile; + struct ndb_writer_ndb_meta ndb_meta; }; }; @@ -236,6 +300,12 @@ int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32], return 1; } +static inline int ndb_writer_queue_msg(struct ndb_writer *writer, + struct ndb_writer_msg *msg) +{ + return prot_queue_push(&writer->inbox, msg); +} + static inline int ndb_writer_queue_msgs(struct ndb_writer *writer, struct ndb_writer_msg *msgs, int num_msgs) @@ -714,6 +784,27 @@ static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn, return note_key; } +// only to be called from the writer thread +static void ndb_write_version(struct ndb_lmdb *lmdb, MDB_txn *txn, uint64_t version) +{ + int rc; + MDB_val key, val; + uint64_t version_key; + + version_key = NDB_META_KEY_VERSION; + + key.mv_data = &version_key; + key.mv_size = sizeof(version_key); + val.mv_data = &version; + val.mv_size = sizeof(version); + + if ((rc = mdb_put(txn, lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { + ndb_debug("write version to ndb_meta failed: %s\n", + mdb_strerror(rc)); + return; + } +} + static void *ndb_writer_thread(void *data) { struct ndb_writer *writer = data; @@ -734,6 +825,7 @@ static void *ndb_writer_thread(void *data) switch (msg->type) { case NDB_WRITER_NOTE: any_note = 1; break; case NDB_WRITER_PROFILE: any_note = 1; break; + case NDB_WRITER_DBMETA: any_note = 1; break; case NDB_WRITER_QUIT: break; } } @@ -767,6 +859,9 @@ static void *ndb_writer_thread(void *data) case NDB_WRITER_NOTE: ndb_write_note(writer->lmdb, txn, &msg->note); break; + case NDB_WRITER_DBMETA: + ndb_write_version(writer->lmdb, txn, msg->ndb_meta.version); + break; } } @@ -962,7 +1057,7 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map } if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) { - fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc); + fprintf(stderr, "mdb_env_set_maxdbs failed, error %d\n", rc); return 0; } @@ -995,6 +1090,12 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map return 0; } + // ndb metadata (db version, etc) + if ((rc = mdb_dbi_open(txn, "ndb_meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NDB_META]))) { + fprintf(stderr, "mdb_dbi_open ndb_meta failed, error %d\n", rc); + return 0; + } + // id+ts index flags unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; @@ -1021,7 +1122,56 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map return 1; } -int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingester_threads) +static int ndb_queue_write_version(struct ndb *ndb, uint64_t version) +{ + struct ndb_writer_msg msg; + msg.type = NDB_WRITER_DBMETA; + msg.ndb_meta.version = version; + return ndb_writer_queue_msg(&ndb->writer, &msg); +} + +static int ndb_run_migrations(struct ndb *ndb) +{ + uint64_t version, latest_version, i; + + latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); + + if ((version = ndb_db_version(ndb)) == -1) { + version = latest_version; + + // no version found. fresh db? + if (!ndb_queue_write_version(ndb, version)) { + fprintf(stderr, "run_migrations: failed writing db version"); + return 0; + } + + return 1; + } + + if (version < latest_version) + fprintf(stderr, "nostrdb: migrating v%d -> v%d\n", + (int)version, (int)latest_version); + + for (i = version; i < latest_version; i++) { + if (!MIGRATIONS[i].fn(ndb)) { + fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); + return 0; + } + + if (!ndb_queue_write_version(ndb, i+1)) { + fprintf(stderr, "run_migrations: failed writing db version"); + return 0; + } + + version = i+1; + } + + ndb->version = version; + + return 1; +} + +int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingester_threads, int flags) { struct ndb *ndb; //MDB_dbi ind_id; // TODO: ind_pk, etc @@ -1036,16 +1186,21 @@ int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingest return 0; if (!ndb_writer_init(&ndb->writer, &ndb->lmdb)) { - fprintf(stderr, "ndb_writer_init failed"); + fprintf(stderr, "ndb_writer_init failed\n"); return 0; } if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, ingester_threads)) { - fprintf(stderr, "failed to initialize %d ingester thread(s)", + fprintf(stderr, "failed to initialize %d ingester thread(s)\n", ingester_threads); return 0; } + if (!ndb_flag_set(flags, NDB_FLAG_NOMIGRATE) && !ndb_run_migrations(ndb)) { + fprintf(stderr, "failed to run migrations\n"); + return 0; + } + // Initialize LMDB environment and spin up threads return 1; } diff --git a/nostrdb/nostrdb.h b/nostrdb/nostrdb.h @@ -7,6 +7,8 @@ #define NDB_PACKED_STR 0x1 #define NDB_PACKED_ID 0x2 +#define NDB_FLAG_NOMIGRATE (1 << 0) + //#define DEBUG 1 #ifdef DEBUG @@ -158,7 +160,8 @@ int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair); int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[32], unsigned char signature[64]); // NDB -int ndb_init(struct ndb **ndb, const char *dbdir, size_t mapsize, int ingester_threads); +int ndb_init(struct ndb **ndb, const char *dbdir, size_t mapsize, int ingester_threads, int flags); +int ndb_db_version(struct ndb *ndb); int ndb_process_event(struct ndb *, const char *json, int len); int ndb_process_events(struct ndb *, const char *ldjson, size_t len); int ndb_begin_query(struct ndb *, struct ndb_txn *);