nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

commit f4e2057db5e08efb6c400ee4c9fdeb308f14ffca
parent 264cf31f85a6444aa7b164156f59066fbbb09148
Author: William Casarin <jb55@jb55.com>
Date:   Sun, 26 Nov 2023 20:04:30 -0800

Initial nostrdb relay subscriptions

This adds some initial code for the nostrdb relay subscription monitor.

When new notes are written to the database, they are checked against
active subscriptions. If any of the subscriptions are matched, the note
primary key is written to the inbox queue for that subscription.

We also add an ndb_wait_for_notes() method that simply waits for notes
to be written by the subscription monitor.

Changelog-Added: Added filter subscriptions

Diffstat:
MTODO | 1-
Msrc/nostrdb.c | 243+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Msrc/nostrdb.h | 19+++++++++++++++++++
Msrc/protected_queue.h | 4+---
Mtest.c | 32++++++++++++++++++++++++++++++++
5 files changed, 277 insertions(+), 22 deletions(-)

diff --git a/TODO b/TODO @@ -1 +0,0 @@ -update url parsing code from damus diff --git a/src/nostrdb.c b/src/nostrdb.c @@ -33,9 +33,13 @@ // the maximum number of things threads pop and push in bulk static const int THREAD_QUEUE_BATCH = 4096; +// maximum number of active subscriptions +#define MAX_SUBSCRIPTIONS 32 + // the maximum size of inbox queues static const int DEFAULT_QUEUE_SIZE = 1000000; + // increase if we need bigger filters #define NDB_FILTER_PAGES 64 @@ -154,6 +158,7 @@ struct ndb_lmdb { struct ndb_writer { struct ndb_lmdb *lmdb; + struct ndb_monitor *monitor; void *queue_buf; int queue_buflen; @@ -170,16 +175,47 @@ struct ndb_ingester { ndb_ingest_filter_fn filter; }; +struct ndb_subscription { + uint64_t subid; + struct ndb_filter_group filter; + struct prot_queue inbox; +}; + +struct ndb_monitor { + struct ndb_subscription subscriptions[MAX_SUBSCRIPTIONS]; + int num_subscriptions; +}; struct ndb { struct ndb_lmdb lmdb; struct ndb_ingester ingester; + struct ndb_monitor monitor; struct ndb_writer writer; int version; uint32_t flags; // setting flags // lmdb environ handles, etc }; +// We get the KeyMatchResult function from the scan_cursor_type +// This function is used to match the key for the corresponding cursor type. +// For example, KIND scanners will look for a kind +enum ndb_scan_cursor_type { + NDB_SCAN_KIND, + NDB_SCAN_PK_KIND, + NDB_SCAN_ID, +}; + +// same idea as DBScan::ScanCursor in strfry +struct ndb_scan_cursor { + enum ndb_scan_cursor_type type; + int outstanding; +}; + +// same idea as DBScan in strfry +struct ndb_dbscan { + struct ndb_scan_cursor cursors[12]; + int num_cursors; +}; // A clustered key with an id and a timestamp struct ndb_tsid { @@ -828,6 +864,38 @@ void ndb_filter_end_field(struct ndb_filter *filter) filter->current = NULL; } +static void ndb_filter_group_init(struct ndb_filter_group *group) +{ + group->num_filters = 0; +} + +static int ndb_filter_group_add(struct ndb_filter_group *group, + struct ndb_filter *filter) +{ + if (group->num_filters + 1 >= NDB_MAX_FILTERS) + return 0; + group->filters[group->num_filters++] = filter; +} + +static int ndb_filter_group_matches(struct ndb_filter_group *group, + struct ndb_note *note) +{ + int i; + struct ndb_filter *filter; + + if (group->num_filters == 0) + return 1; + + for (i = 0; i < group->num_filters; i++) { + filter = group->filters[i]; + + if (ndb_filter_matches(filter, note)) + return 1; + } + + return 0; +} + static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id, uint64_t timestamp, const char *search) { @@ -2137,6 +2205,23 @@ static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note, return 1; } +/* +static int ndb_filter_query(struct ndb *ndb, struct ndb_filter *filter) +{ +} + +static int ndb_filter_cursors(struct ndb_filter *filter, struct ndb_cursor) +{ +} + +int ndb_query(struct ndb *ndb, struct ndb_filter **filters, int num_filters) +{ + struct ndb_filter_group group; + ndb_filter_group_init(&group); + +} +*/ + static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t note_key) { @@ -2520,7 +2605,6 @@ int ndb_text_search(struct ndb_txn *txn, const char *query, MDB_val k, v; int i, j, keysize, saved_size, limit; MDB_cursor_op op, order_op; - //int num_note_ids; saved = NULL; ndb_text_search_results_init(results); @@ -2694,7 +2778,7 @@ static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note, content = ndb_note_content(note); if (!ndb_parse_content(scratch, scratch_size, content, content_len, &blocks)) { - ndb_debug("failed to parse content '%.*s'\n", content_len, content); + //ndb_debug("failed to parse content '%.*s'\n", content_len, content); return 0; } @@ -2780,22 +2864,64 @@ static void ndb_write_version(struct ndb_txn *txn, uint64_t version) //fprintf(stderr, "writing version %" PRIu64 "\n", version); } +struct written_note { + uint64_t note_id; + struct ndb_writer_note *note; +}; + +// When the data has been committed to the database, take all of the written +// notes, check them against subscriptions, and then write to the subscription +// inbox for all matching notes +static void ndb_notify_subscriptions(struct ndb_monitor *monitor, + struct written_note *wrote, int num_notes) +{ + int i, k; + struct written_note *written; + struct ndb_note *note; + struct ndb_subscription *sub; + + for (i = 0; i < monitor->num_subscriptions; i++) { + sub = &monitor->subscriptions[i]; + ndb_debug("checking subscription %d, %d notes\n", i, num_notes); + + for (k = 0; k < num_notes; k++) { + written = &wrote[k]; + note = written->note->note; + + if (ndb_filter_group_matches(&sub->filter, note)) { + ndb_debug("pushing note\n"); + if (!prot_queue_push(&sub->inbox, &written->note_id)) { + ndb_debug("couldn't push note to subscriber"); + } + } else { + ndb_debug("not pushing note\n"); + } + } + + } +} + static void *ndb_writer_thread(void *data) { struct ndb_writer *writer = data; struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; - // 8mb scratch buffer for parsing note content - size_t scratch_size = 8 * 1024 * 1024; - unsigned char *scratch = malloc(scratch_size); - int i, popped, done, any_note; + struct written_note written_notes[THREAD_QUEUE_BATCH]; + size_t scratch_size; + int i, popped, done, any_note, num_notes; uint64_t note_nkey; - MDB_txn *mdb_txn = NULL; struct ndb_txn txn; + unsigned char *scratch; + + // 8mb scratch buffer for parsing note content + scratch_size = 8 * 1024 * 1024; + scratch = malloc(scratch_size); + MDB_txn *mdb_txn = NULL; ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn); done = 0; while (!done) { txn.mdb_txn = NULL; + num_notes = 0; popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); //ndb_debug("writer popped %d items\n", popped); @@ -2817,7 +2943,7 @@ static void *ndb_writer_thread(void *data) fprintf(stderr, "writer thread txn_begin failed"); // should definitely not happen unless DB is full // or something ? - assert(false); + continue; } for (i = 0; i < popped; i++) { @@ -2839,11 +2965,17 @@ static void *ndb_writer_thread(void *data) } break; case NDB_WRITER_NOTE: - ndb_write_note(&txn, &msg->note, scratch, - scratch_size); - //printf("wrote note "); - //print_hex(msg->note.note->id, 32); - //printf("\n"); + note_nkey = ndb_write_note(&txn, &msg->note, + scratch, + scratch_size); + + ndb_debug("note_nkey %" PRIu64 "\n", note_nkey); + if (note_nkey > 0) { + written_notes[num_notes++] = (struct written_note){ + .note_id = note_nkey, + .note = &msg->note, + }; + } break; case NDB_WRITER_DBMETA: ndb_write_version(&txn, msg->ndb_meta.version); @@ -2862,9 +2994,16 @@ static void *ndb_writer_thread(void *data) } // commit writes - if (any_note && !ndb_end_query(&txn)) { - fprintf(stderr, "writer thread txn commit failed"); - assert(false); + if (any_note) { + if (!ndb_end_query(&txn)) { + ndb_debug("writer thread txn commit failed\n"); + } else { + ndb_debug("notifying subscriptions\n"); + ndb_notify_subscriptions(writer->monitor, + written_notes, + num_notes); + // update subscriptions + } } // free notes @@ -2958,9 +3097,11 @@ static void *ndb_ingester_thread(void *data) } -static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb) +static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb, + struct ndb_monitor *monitor) { writer->lmdb = lmdb; + writer->monitor = monitor; writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE; writer->queue_buf = malloc(writer->queue_buflen); if (writer->queue_buf == NULL) { @@ -3214,6 +3355,11 @@ static int ndb_run_migrations(struct ndb *ndb) return 1; } +static void ndb_monitor_init(struct ndb_monitor *monitor) +{ + monitor->num_subscriptions = 0; +} + int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config) { struct ndb *ndb; @@ -3230,7 +3376,9 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize)) return 0; - if (!ndb_writer_init(&ndb->writer, &ndb->lmdb)) { + ndb_monitor_init(&ndb->monitor); + + if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor)) { fprintf(stderr, "ndb_writer_init failed\n"); return 0; } @@ -4748,3 +4896,62 @@ struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, u return blocks; } + +struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid) +{ + struct ndb_subscription *sub, *tsub; + int i; + + for (i = 0, sub = NULL; i < ndb->monitor.num_subscriptions; i++) { + tsub = &ndb->monitor.subscriptions[i]; + if (tsub->subid == subid) { + sub = tsub; + break; + } + } + + return sub; +} + +int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, + int note_id_capacity) +{ + struct ndb_subscription *sub; + if (!(sub = ndb_find_subscription(ndb, subid))) + return 0; + + return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity); +} + +uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter_group *group) +{ + static uint64_t subids = 0; + struct ndb_subscription *sub; + int index; + size_t buflen; + uint64_t subid; + char *buf; + + if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) { + fprintf(stderr, "too many subscriptions\n"); + return 0; + } + + index = ndb->monitor.num_subscriptions++; + sub = &ndb->monitor.subscriptions[index]; + subid = ++subids; + sub->subid = subid; + + memcpy(&sub->filter, group, sizeof(*group)); + + // 500k ought to be enough for anyone + buflen = sizeof(uint64_t) * 65536; + buf = malloc(buflen); + + if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) { + fprintf(stderr, "failed to push prot queue\n"); + return 0; + } + + return subid; +} diff --git a/src/nostrdb.h b/src/nostrdb.h @@ -4,6 +4,10 @@ #include <inttypes.h> #include "cursor.h" +// how many filters are allowed in a filter group +#define NDB_MAX_FILTERS 16 + +// maximum number of filters allowed in a filter group #define NDB_PACKED_STR 0x1 #define NDB_PACKED_ID 0x2 @@ -26,6 +30,7 @@ struct ndb_blocks; struct ndb_block; struct ndb_note; struct ndb_tag; +struct ndb_filter_group; struct ndb_tags; struct ndb_lmdb; union ndb_packed_str; @@ -236,6 +241,11 @@ struct ndb_filter { struct ndb_filter_elements *elements[NDB_NUM_FILTERS]; }; +struct ndb_filter_group { + struct ndb_filter *filters[NDB_MAX_FILTERS]; + int num_filters; +}; + struct ndb_config { int flags; int ingester_threads; @@ -462,12 +472,21 @@ void ndb_filter_reset(struct ndb_filter *); void ndb_filter_end_field(struct ndb_filter *); void ndb_filter_free(struct ndb_filter *); +// SUBSCRIPTIONS +uint64_t ndb_subscribe(struct ndb *, struct ndb_filter_group *); +int ndb_wait_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, + int note_id_capacity); +int ndb_unsubscribe(int subid); + // FULLTEXT SEARCH int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *, struct ndb_text_search_config *); void ndb_default_text_search_config(struct ndb_text_search_config *); void ndb_text_search_config_set_order(struct ndb_text_search_config *, enum ndb_search_order); void ndb_text_search_config_set_limit(struct ndb_text_search_config *, int limit); +// QUERY +void ndb_query(struct ndb_filter **, int num_filters); + // STATS int ndb_stat(struct ndb *ndb, struct ndb_stat *stat); void ndb_stat_counts_init(struct ndb_stat_counts *counts); diff --git a/src/protected_queue.h b/src/protected_queue.h @@ -19,8 +19,6 @@ #include "cursor.h" #include "util.h" -#define BUFFER_SIZE 100 - /* * The prot_queue structure represents a thread-safe queue that can hold * generic data elements. @@ -53,7 +51,7 @@ static inline int prot_queue_init(struct prot_queue* q, void* buf, { // buffer elements must fit nicely in the buffer if (buflen == 0 || buflen % elem_size != 0) - return 0; + assert(!"queue elements don't fit nicely"); q->head = 0; q->tail = 0; diff --git a/test.c b/test.c @@ -1203,9 +1203,41 @@ static int test_varints() { return 0; } +static void test_subscriptions() +{ + struct ndb *ndb; + struct ndb_config config; + struct ndb_filter filter, *f = &filter; + uint64_t subid; + uint64_t note_id = 0; + struct ndb_filter_group group; + ndb_default_config(&config); + + const char *ev = "[\"EVENT\",\"s\",{\"id\": \"3718b368de4d01a021990e6e00dce4bdf860caed21baffd11b214ac498e7562e\",\"pubkey\": \"57c811c86a871081f52ca80e657004fe0376624a978f150073881b6daf0cbf1d\",\"created_at\": 1704300579,\"kind\": 1337,\"tags\": [],\"content\": \"test\",\"sig\": \"061c36d4004d8342495eb22e8e7c2e2b6e1a1c7b4ae6077fef09f9a5322c561b88bada4f63ff05c9508cb29d03f50f71ef3c93c0201dbec440fc32eda87f273b\"}]"; + + assert(ndb_init(&ndb, test_dir, &config)); + + assert(ndb_filter_init(f)); + assert(ndb_filter_start_field(f, NDB_FILTER_KINDS)); + assert(ndb_filter_add_int_element(f, 1337)); + ndb_filter_end_field(f); + + group.filters[0] = f; + group.num_filters = 1; + + assert((subid = ndb_subscribe(ndb, &group))); + + assert(ndb_process_event(ndb, ev, strlen(ev))); + + assert(ndb_wait_for_notes(ndb, subid, &note_id, 1)); + + assert(note_id > 0); +} + int main(int argc, const char *argv[]) { test_parse_content(); test_url_parsing(); + test_subscriptions(); test_comma_url_parsing(); test_varints(); test_bech32_objects();