commit c0d2b49ab813010ed18bd9cf8980141a30f591ad
parent 7542f7bec311c99db662f98efff94518066c0082
Author: William Casarin <jb55@jb55.com>
Date: Fri, 1 Dec 2023 13:10:57 -0800
ingest: add configurable ingest filter
This allows users of nostrdb to selectively filter notes of any kind during
ingest.
Contact lists too big? Create a filter to reject them.
You only care about notes with specific kinds? Reject everything else.
Damus will use this for rejecting large events that might take up too
much space for storage, such as contact lists.
This commit also switched to ndb_config for configuring nostrdb, because
the arguments to ndb_init were getting out of hand.
Changelog-Added: Added ingest filter setting
Changelog-Changed: Switch to ndb_config for per-session ndb settings
Diffstat:
M | bench-ingest-many.c | | | 14 | ++++++++------ |
M | nostrdb.c | | | 86 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------- |
M | nostrdb.h | | | 25 | +++++++++++++++++++++++-- |
M | test.c | | | 97 | ++++++++++++++++++++++++++++++++++++++----------------------------------------- |
4 files changed, 144 insertions(+), 78 deletions(-)
diff --git a/bench-ingest-many.c b/bench-ingest-many.c
@@ -27,18 +27,20 @@ int map_file(const char *filename, unsigned char **p, size_t *flen)
static int bench_parser()
{
- int ingester_threads;
long nanos, ms;
- size_t written, mapsize;
+ size_t written;
struct ndb *ndb;
struct timespec t1, t2;
char *json;
int times = 1;
+ struct ndb_config config;
+ ndb_default_config(&config);
- mapsize = 1024ULL * 1024ULL * 400ULL * 10ULL;
- ingester_threads = 8;
- assert(ndb_init(&ndb, "testdata/db", mapsize, ingester_threads,
- NDB_FLAG_SKIP_NOTE_VERIFY));
+ ndb_config_set_mapsize(&config, 1024ULL * 1024ULL * 400ULL * 10ULL);
+ ndb_config_set_ingest_threads(&config, 8);
+ ndb_config_set_flags(&config, NDB_FLAG_SKIP_NOTE_VERIFY);
+
+ assert(ndb_init(&ndb, "testdata/db", &config));
const char *filename = "testdata/many-events.json";
if (!map_file(filename, (unsigned char**)&json, &written)) {
printf("mapping testdata/many-events.json failed\n");
diff --git a/nostrdb.c b/nostrdb.c
@@ -7,6 +7,7 @@
#include "sha256.h"
#include "lmdb.h"
#include "util.h"
+#include "cpu.h"
#include "threadpool.h"
#include "protected_queue.h"
#include "memchr.h"
@@ -115,6 +116,8 @@ struct ndb_ingester {
uint32_t flags;
struct threadpool tp;
struct ndb_writer *writer;
+ void *filter_context;
+ ndb_ingest_filter_fn filter;
};
@@ -1577,16 +1580,23 @@ static int ndb_ingester_process_note(secp256k1_context *ctx,
struct ndb_note *note,
size_t note_size,
struct ndb_writer_msg *out,
- uint32_t flags)
+ struct ndb_ingester *ingester)
{
- //printf("ndb_ingester_process_note ");
- //print_hex(note->id, 32);
- //printf("\n");
+ enum ndb_ingest_filter_action action;
+ action = NDB_INGEST_ACCEPT;
+
+ if (ingester->filter)
+ action = ingester->filter(ingester->filter_context, note);
+
+ if (action == NDB_INGEST_REJECT)
+ return 0;
// some special situations we might want to skip sig validation,
// like during large imports
- if (!(flags & NDB_FLAG_SKIP_NOTE_VERIFY)) {
- // Verify! If it's an invalid note we don't need to
+ if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) {
+ // if we're skipping validation we don't need to verify
+ } else {
+ // verify! If it's an invalid note we don't need to
// bother writing it to the database
if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
ndb_debug("signature verification failed\n");
@@ -1678,7 +1688,7 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
}
if (!ndb_ingester_process_note(ctx, note, note_size,
- out, ingester->flags)) {
+ out, ingester)) {
goto cleanup;
} else {
// we're done with the original json, free it
@@ -1699,7 +1709,7 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
}
if (!ndb_ingester_process_note(ctx, note, note_size,
- out, ingester->flags)) {
+ out, ingester)) {
goto cleanup;
} else {
// we're done with the original json, free it
@@ -3000,8 +3010,8 @@ static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb)
// initialize the ingester queue and then spawn the thread
static int ndb_ingester_init(struct ndb_ingester *ingester,
- struct ndb_writer *writer, int num_threads,
- int flags)
+ struct ndb_writer *writer,
+ struct ndb_config *config)
{
int elem_size, num_elems;
static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT };
@@ -3011,10 +3021,13 @@ static int ndb_ingester_init(struct ndb_ingester *ingester,
num_elems = DEFAULT_QUEUE_SIZE;
ingester->writer = writer;
- ingester->flags = flags;
+ ingester->flags = config->flags;
+ ingester->filter = config->ingest_filter;
+ ingester->filter_context = config->filter_context;
- if (!threadpool_init(&ingester->tp, num_threads, elem_size, num_elems,
- &quit_msg, ingester, ndb_ingester_thread))
+ if (!threadpool_init(&ingester->tp, config->ingester_threads,
+ elem_size, num_elems, &quit_msg, ingester,
+ ndb_ingester_thread))
{
fprintf(stderr, "ndb ingester threadpool failed to init\n");
return 0;
@@ -3221,20 +3234,20 @@ static int ndb_run_migrations(struct ndb *ndb)
return 1;
}
-int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingester_threads, int flags)
+int ndb_init(struct ndb **pndb, const char *filename, struct ndb_config *config)
{
struct ndb *ndb;
//MDB_dbi ind_id; // TODO: ind_pk, etc
ndb = *pndb = calloc(1, sizeof(struct ndb));
- ndb->flags = flags;
+ ndb->flags = config->flags;
if (ndb == NULL) {
fprintf(stderr, "ndb_init: malloc failed\n");
return 0;
}
- if (!ndb_init_lmdb(filename, &ndb->lmdb, mapsize))
+ if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize))
return 0;
if (!ndb_writer_init(&ndb->writer, &ndb->lmdb)) {
@@ -3242,14 +3255,14 @@ int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingest
return 0;
}
- if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, ingester_threads,
- ndb->flags)) {
+ if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) {
fprintf(stderr, "failed to initialize %d ingester thread(s)\n",
- ingester_threads);
+ config->ingester_threads);
return 0;
}
- if (!ndb_flag_set(flags, NDB_FLAG_NOMIGRATE) && !ndb_run_migrations(ndb)) {
+ if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE) &&
+ !ndb_run_migrations(ndb)) {
fprintf(stderr, "failed to run migrations\n");
return 0;
}
@@ -4380,4 +4393,37 @@ inline int ndb_builder_push_tag_str(struct ndb_builder *builder,
return ndb_builder_finalize_tag(builder, pstr);
}
+//
+// CONFIG
+//
+void ndb_default_config(struct ndb_config *config)
+{
+ config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB
+ config->ingester_threads = 4; // TODO: figure this out from platform apis
+ config->flags = 0;
+ config->ingest_filter = NULL;
+ config->filter_context = NULL;
+}
+void ndb_config_set_ingest_threads(struct ndb_config *config, int threads)
+{
+ int cores = get_physical_cores();
+ config->ingester_threads = cores == -1 ? 4 : cores;
+}
+
+void ndb_config_set_flags(struct ndb_config *config, int flags)
+{
+ config->flags = flags;
+}
+
+void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize)
+{
+ config->mapsize = mapsize;
+}
+
+void ndb_config_set_ingest_filter(struct ndb_config *config,
+ ndb_ingest_filter_fn fn, void *filter_ctx)
+{
+ config->ingest_filter = fn;
+ config->filter_context = filter_ctx;
+}
diff --git a/nostrdb.h b/nostrdb.h
@@ -105,6 +105,12 @@ enum tce_type {
NDB_TCE_EOSE = 0x4,
};
+enum ndb_ingest_filter_action {
+ NDB_INGEST_REJECT,
+ NDB_INGEST_ACCEPT,
+ NDB_INGEST_SKIP_VALIDATION
+};
+
// function pointer for controlling what to do after we parse an id
typedef enum ndb_idres (*ndb_id_fn)(void *, const char *);
@@ -233,6 +239,8 @@ struct ndb_note {
#pragma pack(pop)
+typedef enum ndb_ingest_filter_action (*ndb_ingest_filter_fn)(void *, struct ndb_note *);
+
struct ndb_builder {
struct cursor mem;
struct cursor note_cur;
@@ -295,6 +303,20 @@ struct ndb_filter {
struct ndb_filter_elements *elements[NDB_NUM_FILTERS];
};
+struct ndb_config {
+ int flags;
+ int ingester_threads;
+ size_t mapsize;
+ void *filter_context;
+ ndb_ingest_filter_fn ingest_filter;
+};
+
+// CONFIG
+void ndb_default_config(struct ndb_config *);
+void ndb_config_set_ingest_threads(struct ndb_config *config, int threads);
+void ndb_config_set_flags(struct ndb_config *config, int flags);
+void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize);
+void ndb_config_set_ingest_filter(struct ndb_config *config, ndb_ingest_filter_fn fn, void *);
// HELPERS
int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen);
@@ -304,7 +326,7 @@ int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair);
int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[32], unsigned char signature[64]);
// NDB
-int ndb_init(struct ndb **ndb, const char *dbdir, size_t mapsize, int ingester_threads, int flags);
+int ndb_init(struct ndb **ndb, const char *dbdir, struct ndb_config *);
int ndb_db_version(struct ndb *ndb);
int ndb_process_event(struct ndb *, const char *json, int len);
int ndb_process_events(struct ndb *, const char *ldjson, size_t len);
@@ -354,7 +376,6 @@ void ndb_filter_reset(struct ndb_filter *);
void ndb_filter_end_field(struct ndb_filter *);
void ndb_filter_free(struct ndb_filter *filter);
-
// FULLTEXT SEARCH
int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *);
diff --git a/test.c b/test.c
@@ -118,15 +118,12 @@ static void test_filters()
static void test_fetched_at()
{
struct ndb *ndb;
- size_t mapsize;
- int ingester_threads;
struct ndb_txn txn;
uint64_t fetched_at, t1, t2;
+ struct ndb_config config;
+ ndb_default_config(&config);
- mapsize = 1024 * 1024 * 100;
- ingester_threads = 1;
-
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
const unsigned char pubkey[] = { 0x87, 0xfb, 0xc6, 0xd5, 0x98, 0x31, 0xa8, 0x23, 0xa4, 0x5d, 0x10, 0x1f,
0x86, 0x94, 0x2c, 0x41, 0xcd, 0xe2, 0x90, 0x23, 0xf4, 0x09, 0x20, 0x24,
@@ -174,22 +171,21 @@ static void test_reaction_counter()
static const int alloc_size = 1024 * 1024;
char *json = malloc(alloc_size);
struct ndb *ndb;
- size_t mapsize, len;
+ size_t len;
void *root;
- int written, ingester_threads, reactions;
+ int written, reactions;
NdbEventMeta_table_t meta;
struct ndb_txn txn;
+ struct ndb_config config;
+ ndb_default_config(&config);
- mapsize = 1024 * 1024 * 100;
- ingester_threads = 1;
-
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
read_file("testdata/reactions.json", (unsigned char*)json, alloc_size, &written);
assert(ndb_process_client_events(ndb, json, written));
ndb_destroy(ndb);
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
assert(ndb_begin_query(ndb, &txn));
@@ -247,16 +243,15 @@ static void test_profile_updates()
static const int alloc_size = 1024 * 1024;
char *json = malloc(alloc_size);
struct ndb *ndb;
- size_t mapsize, len;
+ size_t len;
void *record;
- int written, ingester_threads;
+ int written;
struct ndb_txn txn;
uint64_t key;
+ struct ndb_config config;
+ ndb_default_config(&config);
- mapsize = 1024 * 1024 * 100;
- ingester_threads = 1;
-
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
read_file("testdata/profile-updates.json", (unsigned char*)json, alloc_size, &written);
@@ -264,7 +259,7 @@ static void test_profile_updates()
ndb_destroy(ndb);
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
assert(ndb_begin_query(ndb, &txn));
const unsigned char pk[32] = {
@@ -293,12 +288,11 @@ static void test_load_profiles()
static const int alloc_size = 1024 * 1024;
char *json = malloc(alloc_size);
struct ndb *ndb;
- size_t mapsize;
- int written, ingester_threads;
+ int written;
+ struct ndb_config config;
+ ndb_default_config(&config);
- mapsize = 1024 * 1024 * 100;
- ingester_threads = 1;
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
read_file("testdata/profiles.json", (unsigned char*)json, alloc_size, &written);
@@ -306,7 +300,7 @@ static void test_load_profiles()
ndb_destroy(ndb);
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
unsigned char id[32] = {
0x22, 0x05, 0x0b, 0x6d, 0x97, 0xbb, 0x9d, 0xa0, 0x9e, 0x90, 0xed, 0x0c,
0x6d, 0xd9, 0x5e, 0xed, 0x1d, 0x42, 0x3e, 0x27, 0xd5, 0xcb, 0xa5, 0x94,
@@ -330,26 +324,31 @@ static void test_load_profiles()
static void test_fuzz_events() {
struct ndb *ndb;
const char *str = "[\"EVENT\"\"\"{\"content\"\"created_at\":0 \"id\"\"5086a8f76fe1da7fb56a25d1bebbafd70fca62e36a72c6263f900ff49b8f8604\"\"kind\":0 \"pubkey\":9c87f94bcbe2a837adc28d46c34eeaab8fc2e1cdf94fe19d4b99ae6a5e6acedc \"sig\"\"27374975879c94658412469cee6db73d538971d21a7b580726a407329a4cafc677fb56b946994cea59c3d9e118fef27e4e61de9d2c46ac0a65df14153 ea93cf5\"\"tags\"[[][\"\"]]}]";
+ struct ndb_config config;
+ ndb_default_config(&config);
- ndb_init(&ndb, test_dir, 1024 * 1024, 1, 0);
+ ndb_init(&ndb, test_dir, &config);
ndb_process_event(ndb, str, strlen(str));
ndb_destroy(ndb);
}
static void test_migrate() {
static const char *v0_dir = "testdata/db/v0";
- size_t mapsize = 1024ULL * 1024ULL * 1024ULL * 32ULL;
- int threads = 2;
struct ndb *ndb;
+ struct ndb_config config;
+ ndb_default_config(&config);
+ ndb_config_set_flags(&config, NDB_FLAG_NOMIGRATE);
fprintf(stderr, "testing migrate on v0\n");
- assert(ndb_init(&ndb, v0_dir, mapsize, threads, NDB_FLAG_NOMIGRATE));
+ assert(ndb_init(&ndb, v0_dir, &config));
assert(ndb_db_version(ndb) == 0);
ndb_destroy(ndb);
- assert(ndb_init(&ndb, v0_dir, mapsize, threads, 0));
+ ndb_config_set_flags(&config, 0);
+
+ assert(ndb_init(&ndb, v0_dir, &config));
ndb_destroy(ndb);
- assert(ndb_init(&ndb, v0_dir, mapsize, threads, 0));
+ assert(ndb_init(&ndb, v0_dir, &config));
assert(ndb_db_version(ndb) == 3);
test_profile_search(ndb);
@@ -548,18 +547,18 @@ static void test_replacement()
char *json = malloc(alloc_size);
unsigned char *buf = malloc(alloc_size);
struct ndb *ndb;
- size_t mapsize, len;
- int written, ingester_threads;
+ size_t len;
+ int written;
+ struct ndb_config config;
+ ndb_default_config(&config);
- mapsize = 1024 * 1024 * 100;
- ingester_threads = 1;
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
read_file("testdata/old-new.json", (unsigned char*)json, alloc_size, &written);
assert(ndb_process_events(ndb, json, written));
ndb_destroy(ndb);
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
struct ndb_txn txn;
assert(ndb_begin_query(ndb, &txn));
@@ -590,19 +589,19 @@ static void test_fetch_last_noteid()
char *json = malloc(alloc_size);
unsigned char *buf = malloc(alloc_size);
struct ndb *ndb;
- size_t mapsize, len;
- int written, ingester_threads;
+ size_t len;
+ int written;
+ struct ndb_config config;
+ ndb_default_config(&config);
- mapsize = 1024 * 1024 * 100;
- ingester_threads = 1;
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
read_file("testdata/random.json", (unsigned char*)json, alloc_size, &written);
assert(ndb_process_events(ndb, json, written));
ndb_destroy(ndb);
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
unsigned char id[32] = { 0xdc, 0x96, 0x4f, 0x4c, 0x89, 0x83, 0x64, 0x13, 0x8e, 0x81, 0x96, 0xf0, 0xc7, 0x33, 0x38, 0xc8, 0xcc, 0x3e, 0xbf, 0xa3, 0xaf, 0xdd, 0xbc, 0x7d, 0xd1, 0x58, 0xb4, 0x84, 0x7c, 0x1e, 0xbf, 0xa0 };
@@ -945,22 +944,20 @@ static void test_fast_strchr()
static void test_fulltext()
{
struct ndb *ndb;
- size_t mapsize;
- int ingester_threads;
struct ndb_txn txn;
int written;
static const int alloc_size = 2 << 18;
char *json = malloc(alloc_size);
struct ndb_text_search_results results;
+ struct ndb_config config;
+ ndb_default_config(&config);
- mapsize = 1024 * 1024 * 100;
- ingester_threads = 1;
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
read_file("testdata/search.json", (unsigned char*)json, alloc_size, &written);
assert(ndb_process_client_events(ndb, json, written));
ndb_destroy(ndb);
- assert(ndb_init(&ndb, test_dir, mapsize, ingester_threads, 0));
+ assert(ndb_init(&ndb, test_dir, &config));
ndb_begin_query(ndb, &txn);
ndb_text_search(&txn, "Jump Over", &results);