nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

commit f243083f9231139da7708d0e1359a98e18e0e5bb
parent d6b14c0fc3010e640246018b4ec2a52f8cd4973f
Author: William Casarin <jb55@jb55.com>
Date:   Fri,  9 Feb 2024 13:58:41 -0800

filter: use relative data offsets for easy cloning

Instead of storing exact pointers inside of our filter elements, just
store offsets. This will allow us to clone filters very easily without
having to mess around with fixing up the pointers afterwards.

Signed-off-by: William Casarin <jb55@jb55.com>

Diffstat:
Msrc/nostrdb.c | 302++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------
Msrc/nostrdb.h | 12+++++++++---
Mtest.c | 16+++++++++-------
3 files changed, 203 insertions(+), 127 deletions(-)

diff --git a/src/nostrdb.c b/src/nostrdb.c @@ -14,6 +14,7 @@ #include "threadpool.h" #include "protected_queue.h" #include "memchr.h" +#include "print_util.h" #include <stdlib.h> #include <limits.h> #include <assert.h> @@ -517,12 +518,10 @@ static inline int ndb_filter_elem_is_ptr(struct ndb_filter_field *field) { // are as small as possible. This also prevents new fields from being added int ndb_filter_end(struct ndb_filter *filter) { - int i, k; #ifdef DEBUG size_t orig_size; #endif size_t data_len, elem_len; - struct ndb_filter_elements *els; if (filter->finalized == 1) return 0; @@ -534,28 +533,6 @@ int ndb_filter_end(struct ndb_filter *filter) orig_size = filter->data_buf.end - filter->elem_buf.start; #endif - // first we delta-ize the element pointers so that they are relative to - // the start of the delta buf. we will re-add these after we move the - // memory - for (i = 0; i < filter->num_elements; i++) { - els = filter->elements[i]; - - // realloc could move this whole thing, so subtract - // the element pointers as well - filter->elements[i] = (struct ndb_filter_elements *) - ((unsigned char *)filter->elements[i] - - filter->elem_buf.start); - - if (!ndb_filter_elem_is_ptr(&els->field)) - continue; - - for (k = 0; k < els->count; k++) { - els->elements[k].id = (unsigned char *) - ((unsigned char *)els->elements[k].id - - filter->data_buf.start); - } - } - // cap the elem buff filter->elem_buf.end = filter->elem_buf.p; @@ -571,23 +548,6 @@ int ndb_filter_end(struct ndb_filter *filter) filter->data_buf.end = filter->data_buf.start + data_len; filter->data_buf.p = filter->data_buf.end; - // un-deltaize the pointers - for (i = 0; i < filter->num_elements; i++) { - filter->elements[i] = (struct ndb_filter_elements *) - ((unsigned char *)filter->elem_buf.start + - (size_t)filter->elements[i]); - els = filter->elements[i]; - - if (!ndb_filter_elem_is_ptr(&els->field)) - continue; - - for (k = 0; k < els->count; k++) { - els->elements[k].id = - (size_t)els->elements[k].id + - filter->data_buf.start; - } - } - filter->finalized = 1; ndb_debug("ndb_filter_end: %ld -> %ld\n", orig_size, elem_len + data_len); @@ -595,6 +555,67 @@ int ndb_filter_end(struct ndb_filter *filter) return 1; } +static inline struct ndb_filter_elements * +ndb_filter_get_elements_by_offset(struct ndb_filter *filter, int offset) +{ + struct ndb_filter_elements *els; + + if (offset < 0) + return NULL; + + els = (struct ndb_filter_elements *)(filter->elem_buf.start + offset); + + if ((unsigned char *)els > filter->elem_buf.p) + return NULL; + + return els; +} + +struct ndb_filter_elements * +ndb_filter_current_element(struct ndb_filter *filter) +{ + return ndb_filter_get_elements_by_offset(filter, filter->current); +} + +static inline struct ndb_filter_elements * +ndb_filter_get_elements(struct ndb_filter *filter, int index) +{ + if (filter->num_elements <= 0) + return NULL; + + if (index > filter->num_elements-1) + return NULL; + + return ndb_filter_get_elements_by_offset(filter, filter->elements[index]); +} + +static inline unsigned char * +ndb_filter_elements_data(struct ndb_filter *filter, int offset) +{ + unsigned char *data; + + if (offset < 0) + return NULL; + + data = filter->data_buf.start + offset; + if (data > filter->data_buf.p) + return NULL; + + return data; +} + +static inline unsigned char * +ndb_filter_get_id_element(struct ndb_filter *filter, struct ndb_filter_elements *els, int index) +{ + return ndb_filter_elements_data(filter, els->elements[index]); +} + +static inline const char * +ndb_filter_get_string_element(struct ndb_filter *filter, struct ndb_filter_elements *els, int index) +{ + return (const char *)ndb_filter_elements_data(filter, els->elements[index]); +} + int ndb_filter_init(struct ndb_filter *filter) { struct cursor cur; @@ -622,8 +643,8 @@ int ndb_filter_init(struct ndb_filter *filter) assert(filter->elem_buf.start == cur.start); filter->num_elements = 0; - filter->elements[0] = (struct ndb_filter_elements*) buf; - filter->current = NULL; + filter->elements[0] = 0; + filter->current = -1; filter->finalized = 0; return 1; @@ -657,14 +678,15 @@ static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filte int i; struct ndb_filter_elements *els, *el; - if (filter->current) { + if (ndb_filter_current_element(filter)) { fprintf(stderr, "ndb_filter_start_field: filter field already in progress, did you forget to call ndb_filter_end_field?\n"); return 0; } // you can only start and end fields once for (i = 0; i < filter->num_elements; i++) { - el = filter->elements[i]; + el = ndb_filter_get_elements(filter, i); + assert(el); if (el->field.type == field) { fprintf(stderr, "ndb_filter_start_field: field '%s' already exists\n", ndb_filter_field_name(field)); @@ -672,8 +694,9 @@ static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filte } } - els = (struct ndb_filter_elements *) filter->elem_buf.p ; - filter->current = els; + filter->current = filter->elem_buf.p - filter->elem_buf.start; + els = ndb_filter_current_element(filter); + assert(els); // advance elem buffer to the variable data section if (!cursor_skip(&filter->elem_buf, sizeof(struct ndb_filter_elements))) { @@ -702,43 +725,44 @@ int ndb_filter_start_tag_field(struct ndb_filter *filter, char tag) static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_element el) { - unsigned char *data; - const char *str; + struct ndb_filter_elements *current; + uint64_t offset; - if (!filter->current) + if (!(current = ndb_filter_current_element(filter))) return 0; - data = filter->data_buf.p; + offset = filter->data_buf.p - filter->data_buf.start; - switch (filter->current->field.type) { + switch (current->field.type) { case NDB_FILTER_IDS: case NDB_FILTER_AUTHORS: if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) return 0; - el.id = data; break; case NDB_FILTER_KINDS: + offset = el.integer; break; case NDB_FILTER_SINCE: case NDB_FILTER_UNTIL: case NDB_FILTER_LIMIT: // only one allowed for since/until - if (filter->current->count != 0) + if (current->count != 0) return 0; + offset = el.integer; break; case NDB_FILTER_TAGS: - str = (const char *)filter->data_buf.p; if (!cursor_push_c_str(&filter->data_buf, el.string)) return 0; // push a pointer of the string in the databuf as an element - el.string = str; break; } - if (!cursor_push(&filter->elem_buf, (unsigned char*)&el, sizeof(el))) + if (!cursor_push(&filter->elem_buf, (unsigned char *)&offset, + sizeof(offset))) { return 0; + } - filter->current->count++; + current->count++; return 1; } @@ -747,11 +771,12 @@ static int ndb_filter_set_elem_type(struct ndb_filter *filter, enum ndb_generic_element_type elem_type) { enum ndb_generic_element_type current_elem_type; + struct ndb_filter_elements *current; - if (!filter->current) + if (!(current = ndb_filter_current_element(filter))) return 0; - current_elem_type = filter->current->field.elem_type; + current_elem_type = current->field.elem_type; // element types must be uniform if (current_elem_type != elem_type && current_elem_type != NDB_ELEMENT_UNKNOWN) { @@ -759,7 +784,7 @@ static int ndb_filter_set_elem_type(struct ndb_filter *filter, return 0; } - filter->current->field.elem_type = elem_type; + current->field.elem_type = elem_type; return 1; } @@ -767,12 +792,13 @@ static int ndb_filter_set_elem_type(struct ndb_filter *filter, int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str) { union ndb_filter_element el; + struct ndb_filter_elements *current; - if (!filter->current) + if (!(current = ndb_filter_current_element(filter))) return 0; // only generic queries are allowed to have strings - switch (filter->current->field.type) { + switch (current->field.type) { case NDB_FILTER_SINCE: case NDB_FILTER_UNTIL: case NDB_FILTER_LIMIT: @@ -794,10 +820,11 @@ int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str) int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer) { union ndb_filter_element el; - if (!filter->current) + struct ndb_filter_elements *current; + if (!(current = ndb_filter_current_element(filter))) return 0; - switch (filter->current->field.type) { + switch (current->field.type) { case NDB_FILTER_IDS: case NDB_FILTER_AUTHORS: case NDB_FILTER_TAGS: @@ -817,12 +844,13 @@ int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer) int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id) { union ndb_filter_element el; + struct ndb_filter_elements *current; - if (!filter->current) + if (!(current = ndb_filter_current_element(filter))) return 0; // only certain filter types allow pushing id elements - switch (filter->current->field.type) { + switch (current->field.type) { case NDB_FILTER_SINCE: case NDB_FILTER_UNTIL: case NDB_FILTER_LIMIT: @@ -843,11 +871,13 @@ int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id return ndb_filter_add_element(filter, el); } -static int ndb_tag_filter_matches(struct ndb_filter_elements *els, +static int ndb_tag_filter_matches(struct ndb_filter *filter, + struct ndb_filter_elements *els, struct ndb_note *note) { int i; - union ndb_filter_element el; + const unsigned char *id; + const char *el_str; struct ndb_iterator iter, *it = &iter; struct ndb_str str; @@ -894,14 +924,15 @@ static int ndb_tag_filter_matches(struct ndb_filter_elements *els, } for (i = 0; i < els->count; i++) { - el = els->elements[i]; switch (els->field.elem_type) { case NDB_ELEMENT_ID: - if (!memcmp(el.id, str.id, 32)) + id = ndb_filter_get_id_element(filter, els, i); + if (!memcmp(id, str.id, 32)) return 1; break; case NDB_ELEMENT_STRING: - if (!strcmp(el.string, str.str)) + el_str = ndb_filter_get_string_element(filter, els, i); + if (!strcmp(el_str, str.str)) return 1; break; case NDB_ELEMENT_UNKNOWN: @@ -913,12 +944,32 @@ static int ndb_tag_filter_matches(struct ndb_filter_elements *els, return 0; } +struct search_id_state { + struct ndb_filter *filter; + struct ndb_filter_elements *els; + unsigned char *key; +}; + static int compare_ids(const void *pa, const void *pb) { - const unsigned char *a = *(const unsigned char **)pa; - const unsigned char *b = *(const unsigned char **)pb; + unsigned char *a = *(unsigned char **)pa; + unsigned char *b = *(unsigned char **)pb; + + return memcmp(a, b, 32); +} + +static int search_ids(const void *ctx, const void *mid_ptr) +{ + struct search_id_state *state; + uint32_t mid; + + state = (struct search_id_state *)ctx; + mid = *(uint32_t *)mid_ptr; + + unsigned char *mid_id = ndb_filter_elements_data(state->filter, mid); + assert(mid_id); - return memcmp(a, b, 32); + return memcmp(state->key, mid_id, 32); } static int compare_kinds(const void *pa, const void *pb) @@ -937,17 +988,21 @@ static int compare_kinds(const void *pa, const void *pb) } } - +// // returns 1 if a filter matches a note static int ndb_filter_matches_with(struct ndb_filter *filter, struct ndb_note *note, int already_matched) { int i, j; - unsigned char *id; struct ndb_filter_elements *els; + struct search_id_state state; + + state.filter = filter; for (i = 0; i < filter->num_elements; i++) { - els = filter->elements[i]; + els = ndb_filter_get_elements(filter, i); + state.els = els; + assert(els); // if we know we already match from a query scan result, // we can skip this check @@ -957,36 +1012,36 @@ static int ndb_filter_matches_with(struct ndb_filter *filter, switch (els->field.type) { case NDB_FILTER_KINDS: for (j = 0; j < els->count; j++) { - if ((unsigned int)els->elements[j].integer == note->kind) + if ((unsigned int)els->elements[j] == note->kind) goto cont; } break; case NDB_FILTER_IDS: - id = note->id; - if (bsearch(&id, &els->elements[0], els->count, - sizeof(els->elements[0].id), compare_ids)) { + state.key = ndb_note_id(note); + if (bsearch(&state, &els->elements[0], els->count, + sizeof(els->elements[0]), search_ids)) { continue; } break; case NDB_FILTER_AUTHORS: - id = note->pubkey; - if (bsearch(&id, &els->elements[0], els->count, - sizeof(els->elements[0].id), compare_ids)) { + state.key = ndb_note_pubkey(note); + if (bsearch(&state, &els->elements[0], els->count, + sizeof(els->elements[0]), search_ids)) { continue; } break; case NDB_FILTER_TAGS: - if (ndb_tag_filter_matches(els, note)) + if (ndb_tag_filter_matches(filter, els, note)) continue; break; case NDB_FILTER_SINCE: assert(els->count == 1); - if (note->created_at >= els->elements[0].integer) + if (note->created_at >= els->elements[0]) continue; break; case NDB_FILTER_UNTIL: assert(els->count == 1); - if (note->created_at < els->elements[0].integer) + if (note->created_at < els->elements[0]) continue; case NDB_FILTER_LIMIT: cont: @@ -1005,27 +1060,47 @@ int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note) return ndb_filter_matches_with(filter, note, 0); } +// because elements are stored as offsets and qsort doesn't support context, +// we do a dumb thing where we convert elements to pointers and back if we +// are doing an id or string sort +static void sort_filter_elements(struct ndb_filter *filter, + struct ndb_filter_elements *els, + int (*cmp)(const void *, const void *)) +{ + int i; + + assert(ndb_filter_elem_is_ptr(&els->field)); + + for (i = 0; i < els->count; i++) + els->elements[i] += (uint64_t)filter->data_buf.start; + + qsort(&els->elements[0], els->count, sizeof(els->elements[0]), cmp); + + for (i = 0; i < els->count; i++) + els->elements[i] -= (uint64_t)filter->data_buf.start; +} + void ndb_filter_end_field(struct ndb_filter *filter) { + int cur_offset; struct ndb_filter_elements *cur; - cur = filter->current; + cur_offset = filter->current; - if (cur == NULL) + if (!(cur = ndb_filter_current_element(filter))) return; - filter->elements[filter->num_elements++] = cur; + filter->elements[filter->num_elements++] = cur_offset; // sort elements for binary search switch (cur->field.type) { case NDB_FILTER_IDS: case NDB_FILTER_AUTHORS: - qsort(&cur->elements[0], cur->count, - sizeof(cur->elements[0].id), compare_ids); + sort_filter_elements(filter, cur, compare_ids); break; case NDB_FILTER_KINDS: qsort(&cur->elements[0], cur->count, - sizeof(cur->elements[0].integer), compare_kinds); + sizeof(cur->elements[0]), compare_kinds); break; case NDB_FILTER_TAGS: // TODO: generic tag search sorting @@ -1037,7 +1112,7 @@ void ndb_filter_end_field(struct ndb_filter *filter) break; } - filter->current = NULL; + filter->current = -1; } @@ -2422,7 +2497,8 @@ ndb_filter_get_elems(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) struct ndb_filter_elements *els; for (i = 0; i < filter->num_elements; i++) { - els = filter->elements[i]; + els = ndb_filter_get_elements(filter, i); + assert(els); if (els->field.type == typ) { return els; } @@ -2431,7 +2507,7 @@ ndb_filter_get_elems(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) return NULL; } -static union ndb_filter_element * +static uint64_t * ndb_filter_get_elem(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) { struct ndb_filter_elements *els; @@ -2443,10 +2519,10 @@ ndb_filter_get_elem(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) static uint64_t *ndb_filter_get_int(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) { - union ndb_filter_element *el = NULL; - if (!(el = ndb_filter_get_elem(filter, typ))) - return 0; - return &el->integer; + uint64_t *el; + if (NULL == (el = ndb_filter_get_elem(filter, typ))) + return NULL; + return el; } static inline int push_query_result(struct ndb_query_results *results, @@ -2527,7 +2603,7 @@ static int ndb_query_plan_execute_ids(struct ndb_txn *txn, if (query_is_full(results, limit)) break; - id = (unsigned char*)ids->elements[i].id; + id = ndb_filter_get_id_element(filter, ids, i); ndb_tsid_init(&tsid, (unsigned char *)id, until); k.mv_data = &tsid; @@ -2615,7 +2691,7 @@ static int ndb_query_plan_execute_tags(struct ndb_txn *txn, unsigned char key_buffer[255]; struct ndb_note *note; struct ndb_filter_elements *tags; - union ndb_filter_element *tag; + unsigned char *tag; struct ndb_query_result res; db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; @@ -2631,13 +2707,13 @@ static int ndb_query_plan_execute_tags(struct ndb_txn *txn, return 0; for (i = 0; i < tags->count; i++) { - tag = &tags->elements[i]; + tag = ndb_filter_get_id_element(filter, tags, i); taglen = tags->field.elem_type == NDB_ELEMENT_ID - ? 32 : strlen(tag->string); + ? 32 : strlen((const char*)tag); if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), - tags->field.tag, tag->id, taglen, + tags->field.tag, tag, taglen, until))) return 0; @@ -2657,7 +2733,7 @@ static int ndb_query_plan_execute_tags(struct ndb_txn *txn, if (taglen != k.mv_size - 9) break; - if (memcmp(k.mv_data+1, tag->id, k.mv_size-9)) + if (memcmp(k.mv_data+1, tag, k.mv_size-9)) break; note_id = *(uint64_t*)v.mv_data; @@ -2714,7 +2790,7 @@ static int ndb_query_plan_execute_kinds(struct ndb_txn *txn, if (query_is_full(results, limit)) break; - kind = kinds->elements[i].integer; + kind = kinds->elements[i]; ndb_debug("kind %" PRIu64 "\n", kind); ndb_u64_tsid_init(&tsid, kind, until); @@ -3094,14 +3170,6 @@ static int prefix_count(const char *str1, int len1, const char *str2, int len2) return count; } -static void ndb_print_text_search_key(struct ndb_text_search_key *key) -{ - printf("K<'%.*s' %" PRIu64 " %" PRIu64 " note_id:%" PRIu64 ">", key->str_len, key->str, - key->word_index, - key->timestamp, - key->note_id); -} - static int ndb_prefix_matches(struct ndb_text_search_result *result, struct ndb_word *search_word) { @@ -3565,7 +3633,7 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor, note = written->note->note; if (ndb_filter_group_matches(&sub->group, note)) { - //ndb_debug("pushing note\n"); + ndb_debug("pushing note\n"); if (!prot_queue_push(&sub->inbox, &written->note_id)) { ndb_debug("couldn't push note to subscriber"); } diff --git a/src/nostrdb.h b/src/nostrdb.h @@ -234,7 +234,9 @@ struct ndb_filter_field { struct ndb_filter_elements { struct ndb_filter_field field; int count; - union ndb_filter_element elements[0]; + + // this needs to be pointer size for reasons + uint64_t elements[0]; }; struct ndb_filter { @@ -242,8 +244,10 @@ struct ndb_filter { struct cursor data_buf; int num_elements; int finalized; - struct ndb_filter_elements *current; - struct ndb_filter_elements *elements[NDB_NUM_FILTERS]; + int current; + + // struct ndb_filter_elements offsets into elem_buf + int elements[NDB_NUM_FILTERS]; }; struct ndb_config { @@ -475,6 +479,8 @@ int ndb_filter_init(struct ndb_filter *); int ndb_filter_add_id_element(struct ndb_filter *, const unsigned char *id); int ndb_filter_add_int_element(struct ndb_filter *, uint64_t integer); int ndb_filter_add_str_element(struct ndb_filter *, const char *str); + +struct ndb_filter_elements *ndb_filter_current_element(struct ndb_filter *); int ndb_filter_start_field(struct ndb_filter *, enum ndb_filter_fieldtype); int ndb_filter_start_tag_field(struct ndb_filter *, char tag); int ndb_filter_matches(struct ndb_filter *, struct ndb_note *); diff --git a/test.c b/test.c @@ -62,9 +62,9 @@ static void test_filters() assert(ndb_filter_add_int_element(f, 1337)); assert(ndb_filter_add_int_element(f, 2)); - current = f->current; - assert(f->current->count == 2); - assert(f->current->field.type == NDB_FILTER_KINDS); + current = ndb_filter_current_element(f); + assert(current->count == 2); + assert(current->field.type == NDB_FILTER_KINDS); // can't start if we've already started assert(ndb_filter_start_field(f, NDB_FILTER_KINDS) == 0); @@ -73,8 +73,8 @@ static void test_filters() ndb_filter_end(f); // should be sorted after end - assert(current->elements[0].integer == 2); - assert(current->elements[1].integer == 1337); + assert(current->elements[0] == 2); + assert(current->elements[1] == 1337); // try matching the filter assert(ndb_filter_matches(f, note)); @@ -115,7 +115,7 @@ static void test_filters() assert(ndb_filter_add_id_element(f, ndb_note_pubkey(note))); ndb_filter_end_field(f); ndb_filter_end(f); - assert(f->current == NULL); + assert(f->current == -1); assert(ndb_filter_matches(f, note)); ndb_filter_destroy(f); @@ -1225,8 +1225,9 @@ static void test_query() f = &filters[0]; ndb_filter_init(f); ndb_filter_start_field(f, NDB_FILTER_IDS); - ndb_filter_add_id_element(f, id); ndb_filter_add_id_element(f, id2); + ndb_filter_add_id_element(f, id); + assert(ndb_filter_current_element(f)->count == 2); ndb_filter_end_field(f); ndb_filter_end(f); @@ -1258,6 +1259,7 @@ static void test_query() count = 0; assert(ndb_query(&txn, f, 1, results, cap, &count)); ndb_print_kind_keys(&txn); + printf("count %d\n", count); assert(count == 2); assert(!strcmp(ndb_note_content(results[0].note), "hmm")); assert(!strcmp(ndb_note_content(results[1].note), "what"));