commit 043d1d358a79d8c2935c136ced22c3fb162e711d
parent 8ee1dc79057ababc8f2bb0ef44e63116f615b965
Author: William Casarin <jb55@jb55.com>
Date: Tue, 12 Sep 2023 16:14:09 -0700
migrations: add initial migration logic
This is important since we're about to push the first version of nostrdb
to thousands of iphones.
Diffstat:
6 files changed, 198 insertions(+), 12 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -6,6 +6,8 @@ shell.nix
*.dSYM
*.zst
testdata/many-events.json
+testdata/db/ndb-v0.tar
+testdata/db/v0
TODO.bak
test_contacts_ndb_note
bench
diff --git a/Makefile b/Makefile
@@ -14,6 +14,7 @@ C_BINDINGS_META=bindings/c/meta_builder.h bindings/c/meta_reader.h bindings/c/me
C_BINDINGS_COMMON=bindings/c/flatbuffers_common_builder.h bindings/c/flatbuffers_common_reader.h
C_BINDINGS=$(C_BINDINGS_COMMON) $(C_BINDINGS_PROFILE) $(C_BINDINGS_META)
BINDINGS=bindings
+CHECKDATA=testdata/db/v0/data.mdb
all: lib bindings
@@ -21,9 +22,10 @@ lib: benches test
bindings: bindings-swift bindings-c
-check: test
+check: test $(CHECKDATA)
rm -rf testdata/db/*.mdb
- ./test
+ ./test || rm -rf testdata/db/v0
+ rm -rf testdata/db/v0
clean:
rm -rf test bench bench-ingest bench-ingest-many
@@ -122,6 +124,17 @@ bench: bench.c $(DEPS)
bench-ingest: bench-ingest.c $(DEPS)
$(CC) $(CFLAGS) bench-ingest.c $(LDS) -o $@
+testdata/db/ndb-v0.tar.zst:
+ curl https://cdn.jb55.com/s/ndb-v0.tar.zst -o $@
+
+testdata/db/ndb-v0.tar: testdata/db/ndb-v0.tar.zst
+ zstd -d < $< > $@
+
+testdata/db/v0/data.mdb: testdata/db/ndb-v0.tar
+ @tar xf $<
+ @rm -rf testdata/db/v0
+ @mv v0 testdata/db
+
testdata/many-events.json.zst:
curl https://cdn.jb55.com/s/many-events.json.zst -o $@
diff --git a/README.md b/README.md
@@ -9,3 +9,10 @@ but it is custom built for nostr events.
These events are then memory-mapped inside lmdb, enabling insanely fast,
zero-copy access and querying.
+
+This entire design of nostrdb is copied almost entirely from strfry[1], the
+fastest nostr relay. The difference is that nostrdb is meant to be embeddable
+as a C library into any application, and does not support full relay
+functionality (yet?)
+
+[1]: https://github.com/hoytech/strfry
diff --git a/nostrdb.c b/nostrdb.c
@@ -27,6 +27,7 @@ static const int THREAD_QUEUE_BATCH = 4096;
// the maximum size of inbox queues
static const int DEFAULT_QUEUE_SIZE = 1000000;
+#define ndb_flag_set(flags, f) ((flags & f) == f)
#define NDB_PARSED_ID (1 << 0)
#define NDB_PARSED_PUBKEY (1 << 1)
@@ -37,6 +38,12 @@ static const int DEFAULT_QUEUE_SIZE = 1000000;
#define NDB_PARSED_TAGS (1 << 6)
#define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS)
+typedef int (*ndb_migrate_fn)(struct ndb *);
+
+struct ndb_migration {
+ ndb_migrate_fn fn;
+};
+
struct ndb_profile_record_builder {
flatcc_builder_t *builder;
void *flatbuf;
@@ -67,7 +74,7 @@ enum ndb_dbs {
// keys used for storing data in the NDB metadata database (NDB_DB_NDB_META)
enum ndb_meta_key {
- NDB_DB_VERSION = 1
+ NDB_META_KEY_VERSION = 1
};
struct ndb_json_parser {
@@ -106,6 +113,7 @@ struct ndb {
struct ndb_lmdb lmdb;
struct ndb_ingester ingester;
struct ndb_writer writer;
+ int version;
// lmdb environ handles, etc
};
@@ -115,6 +123,49 @@ struct ndb_tsid {
uint64_t timestamp;
};
+// Migrations
+//
+
+static int ndb_migrate_user_search_indices(struct ndb *ndb)
+{
+ return 1;
+}
+
+static struct ndb_migration MIGRATIONS[] = {
+ //{ .fn = ndb_migrate_user_search_indices }
+};
+
+
+int ndb_db_version(struct ndb *ndb)
+{
+ int rc;
+ uint64_t version, version_key;
+ MDB_val k, v;
+ MDB_txn *txn;
+
+ version_key = NDB_META_KEY_VERSION;
+ k.mv_data = &version_key;
+ k.mv_size = sizeof(version_key);
+
+ if ((rc = mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn))) {
+ fprintf(stderr, "ndb_db_version: mdb_txn_begin failed, error %d\n", rc);
+ return -1;
+ }
+
+ if (mdb_get(txn, ndb->lmdb.dbs[NDB_DB_NDB_META], &k, &v)) {
+ version = -1;
+ } else {
+ if (v.mv_size != 8) {
+ fprintf(stderr, "run_migrations: invalid version size?");
+ return 0;
+ }
+ version = *((uint64_t*)v.mv_data);
+ }
+
+ mdb_txn_abort(txn);
+ return version;
+}
+
/** From LMDB: Compare two items lexically */
static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) {
int diff;
@@ -181,6 +232,7 @@ enum ndb_writer_msgtype {
NDB_WRITER_QUIT, // kill thread immediately
NDB_WRITER_NOTE, // write a note to the db
NDB_WRITER_PROFILE, // write a profile to the db
+ NDB_WRITER_DBMETA, // write ndb metadata
};
struct ndb_ingester_event {
@@ -205,11 +257,17 @@ struct ndb_ingester_msg {
};
};
+struct ndb_writer_ndb_meta {
+ // these are 64 bit because I'm paranoid of db-wide alignment issues
+ uint64_t version;
+};
+
struct ndb_writer_msg {
enum ndb_writer_msgtype type;
union {
struct ndb_writer_note note;
struct ndb_writer_profile profile;
+ struct ndb_writer_ndb_meta ndb_meta;
};
};
@@ -242,6 +300,12 @@ int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32],
return 1;
}
+static inline int ndb_writer_queue_msg(struct ndb_writer *writer,
+ struct ndb_writer_msg *msg)
+{
+ return prot_queue_push(&writer->inbox, msg);
+}
+
static inline int ndb_writer_queue_msgs(struct ndb_writer *writer,
struct ndb_writer_msg *msgs,
int num_msgs)
@@ -720,6 +784,27 @@ static uint64_t ndb_write_note(struct ndb_lmdb *lmdb, MDB_txn *txn,
return note_key;
}
+// only to be called from the writer thread
+static void ndb_write_version(struct ndb_lmdb *lmdb, MDB_txn *txn, uint64_t version)
+{
+ int rc;
+ MDB_val key, val;
+ uint64_t version_key;
+
+ version_key = NDB_META_KEY_VERSION;
+
+ key.mv_data = &version_key;
+ key.mv_size = sizeof(version_key);
+ val.mv_data = &version;
+ val.mv_size = sizeof(version);
+
+ if ((rc = mdb_put(txn, lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) {
+ ndb_debug("write version to ndb_meta failed: %s\n",
+ mdb_strerror(rc));
+ return;
+ }
+}
+
static void *ndb_writer_thread(void *data)
{
struct ndb_writer *writer = data;
@@ -740,6 +825,7 @@ static void *ndb_writer_thread(void *data)
switch (msg->type) {
case NDB_WRITER_NOTE: any_note = 1; break;
case NDB_WRITER_PROFILE: any_note = 1; break;
+ case NDB_WRITER_DBMETA: any_note = 1; break;
case NDB_WRITER_QUIT: break;
}
}
@@ -773,6 +859,9 @@ static void *ndb_writer_thread(void *data)
case NDB_WRITER_NOTE:
ndb_write_note(writer->lmdb, txn, &msg->note);
break;
+ case NDB_WRITER_DBMETA:
+ ndb_write_version(writer->lmdb, txn, msg->ndb_meta.version);
+ break;
}
}
@@ -1033,7 +1122,56 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map
return 1;
}
-int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingester_threads)
+static int ndb_queue_write_version(struct ndb *ndb, uint64_t version)
+{
+ struct ndb_writer_msg msg;
+ msg.type = NDB_WRITER_DBMETA;
+ msg.ndb_meta.version = version;
+ return ndb_writer_queue_msg(&ndb->writer, &msg);
+}
+
+static int ndb_run_migrations(struct ndb *ndb)
+{
+ uint64_t version, latest_version, i;
+
+ latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]);
+
+ if ((version = ndb_db_version(ndb)) == -1) {
+ version = latest_version;
+
+ // no version found. fresh db?
+ if (!ndb_queue_write_version(ndb, version)) {
+ fprintf(stderr, "run_migrations: failed writing db version");
+ return 0;
+ }
+
+ return 1;
+ }
+
+ if (version < latest_version)
+ fprintf(stderr, "nostrdb: migrating v%d -> v%d\n",
+ (int)version, (int)latest_version);
+
+ for (i = version; i < latest_version; i++) {
+ if (!MIGRATIONS[i].fn(ndb)) {
+ fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1));
+ return 0;
+ }
+
+ if (!ndb_queue_write_version(ndb, i+1)) {
+ fprintf(stderr, "run_migrations: failed writing db version");
+ return 0;
+ }
+
+ version = i+1;
+ }
+
+ ndb->version = version;
+
+ return 1;
+}
+
+int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingester_threads, int flags)
{
struct ndb *ndb;
//MDB_dbi ind_id; // TODO: ind_pk, etc
@@ -1058,6 +1196,11 @@ int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingest
return 0;
}
+ if (!ndb_flag_set(flags, NDB_FLAG_NOMIGRATE) && !ndb_run_migrations(ndb)) {
+ fprintf(stderr, "failed to run migrations\n");
+ return 0;
+ }
+
// Initialize LMDB environment and spin up threads
return 1;
}
diff --git a/nostrdb.h b/nostrdb.h
@@ -7,6 +7,8 @@
#define NDB_PACKED_STR 0x1
#define NDB_PACKED_ID 0x2
+#define NDB_FLAG_NOMIGRATE (1 << 0)
+
//#define DEBUG 1
#ifdef DEBUG
@@ -158,7 +160,8 @@ int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair);
int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[32], unsigned char signature[64]);
// NDB
-int ndb_init(struct ndb **ndb, const char *dbdir, size_t mapsize, int ingester_threads);
+int ndb_init(struct ndb **ndb, const char *dbdir, size_t mapsize, int ingester_threads, int flags);
+int ndb_db_version(struct ndb *ndb);
int ndb_process_event(struct ndb *, const char *json, int len);
int ndb_process_events(struct ndb *, const char *ldjson, size_t len);
int ndb_begin_query(struct ndb *, struct ndb_txn *);
diff --git a/test.c b/test.c
@@ -26,7 +26,7 @@ static void test_load_profiles()
mapsize = 1024 * 1024 * 100;
ingester_threads = 1;
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads));
+ assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
read_file("testdata/profiles.json", (unsigned char*)json, alloc_size, &written);
@@ -34,7 +34,7 @@ static void test_load_profiles()
ndb_destroy(ndb);
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads));
+ assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
unsigned char id[32] = {
0x22, 0x05, 0x0b, 0x6d, 0x97, 0xbb, 0x9d, 0xa0, 0x9e, 0x90, 0xed, 0x0c,
0x6d, 0xd9, 0x5e, 0xed, 0x1d, 0x42, 0x3e, 0x27, 0xd5, 0xcb, 0xa5, 0x94,
@@ -58,11 +58,28 @@ static void test_fuzz_events() {
struct ndb *ndb;
const char *str = "[\"EVENT\"\"\"{\"content\"\"created_at\":0 \"id\"\"5086a8f76fe1da7fb56a25d1bebbafd70fca62e36a72c6263f900ff49b8f8604\"\"kind\":0 \"pubkey\":9c87f94bcbe2a837adc28d46c34eeaab8fc2e1cdf94fe19d4b99ae6a5e6acedc \"sig\"\"27374975879c94658412469cee6db73d538971d21a7b580726a407329a4cafc677fb56b946994cea59c3d9e118fef27e4e61de9d2c46ac0a65df14153 ea93cf5\"\"tags\"[[][\"\"]]}]";
- ndb_init(&ndb, test_dir, 1024 * 1024, 1);
+ ndb_init(&ndb, test_dir, 1024 * 1024, 1, 0);
ndb_process_event(ndb, str, strlen(str));
ndb_destroy(ndb);
}
+static void test_migrate() {
+ static const char *v0_dir = "testdata/db/v0";
+ size_t mapsize = 1024ULL * 1024ULL * 1024ULL * 32ULL;
+ int threads = 2;
+ struct ndb *ndb;
+
+ assert(ndb_init(&ndb, v0_dir, mapsize, threads, NDB_FLAG_NOMIGRATE));
+ assert(ndb_db_version(ndb) == -1);
+ ndb_destroy(ndb);
+
+ assert(ndb_init(&ndb, v0_dir, mapsize, threads, 0));
+ ndb_destroy(ndb);
+ assert(ndb_init(&ndb, v0_dir, mapsize, threads, 0));
+ assert(ndb_db_version(ndb) == 0);
+ ndb_destroy(ndb);
+}
+
static void test_basic_event() {
unsigned char buf[512];
struct ndb_builder builder, *b = &builder;
@@ -260,13 +277,13 @@ static void test_replacement()
mapsize = 1024 * 1024 * 100;
ingester_threads = 1;
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads));
+ assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
read_file("testdata/old-new.json", (unsigned char*)json, alloc_size, &written);
assert(ndb_process_events(ndb, json, written));
ndb_destroy(ndb);
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads));
+ assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
struct ndb_txn txn;
assert(ndb_begin_query(ndb, &txn));
@@ -304,14 +321,14 @@ static void test_fetch_last_noteid()
mapsize = 1024 * 1024 * 100;
ingester_threads = 1;
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads));
+ assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
read_file("testdata/random.json", (unsigned char*)json, alloc_size, &written);
assert(ndb_process_events(ndb, json, written));
ndb_destroy(ndb);
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads));
+ assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
unsigned char id[32] = { 0xdc, 0x96, 0x4f, 0x4c, 0x89, 0x83, 0x64, 0x13, 0x8e, 0x81, 0x96, 0xf0, 0xc7, 0x33, 0x38, 0xc8, 0xcc, 0x3e, 0xbf, 0xa3, 0xaf, 0xdd, 0xbc, 0x7d, 0xd1, 0x58, 0xb4, 0x84, 0x7c, 0x1e, 0xbf, 0xa0 };
@@ -652,6 +669,7 @@ static void test_fast_strchr()
}
int main(int argc, const char *argv[]) {
+ test_migrate();
test_basic_event();
test_empty_tags();
test_parse_json();