nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

commit 591499c2f63c3cbb260cfd73eed881a131571a91
parent 33c47407d9057fafd1dcdfa5fbd88eff0b524b0b
Author: William Casarin <jb55@jb55.com>
Date:   Mon,  8 Jan 2024 16:18:30 -0800

query: add tag index and tag queries

Diffstat:
MTODO | 1+
Mndb.c | 17++++++++++++++++-
Msrc/nostrdb.c | 281++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
Msrc/nostrdb.h | 2++
Mtest.c | 39++++++++++++++++++++++++++++++++++++++-
5 files changed, 320 insertions(+), 20 deletions(-)

diff --git a/TODO b/TODO @@ -3,3 +3,4 @@ execution plan for tags query execution plan for created_at query note kind index rebuild migration (A) filter from json +tags index migration diff --git a/ndb.c b/ndb.c @@ -92,6 +92,7 @@ static void print_stats(struct ndb_stat *stat) int ndb_print_search_keys(struct ndb_txn *txn); int ndb_print_kind_keys(struct ndb_txn *txn); +int ndb_print_tag_keys(struct ndb_txn *txn); static void print_note(struct ndb_note *note) { @@ -188,7 +189,7 @@ int main(int argc, char *argv[]) argc -= 2; current_field = 0; - for (i = 0; argc && i < 3; i++) { + for (i = 0; argc && i < 100; i++) { if (!strcmp(argv[0], "-k")) { if (current_field != NDB_FILTER_KINDS) ndb_filter_start_field(f, NDB_FILTER_KINDS); @@ -218,6 +219,16 @@ int main(int argc, char *argv[]) ndb_filter_end_field(f); argv += 2; argc -= 2; + } else if (!strcmp(argv[0], "-t")) { + if (current_field) { + ndb_filter_end_field(f); + current_field = 0; + } + ndb_filter_start_tag_field(f, 't'); + ndb_filter_add_str_element(f, argv[1]); + ndb_filter_end_field(f); + argv += 2; + argc -= 2; } } @@ -258,6 +269,10 @@ int main(int argc, char *argv[]) ndb_begin_query(ndb, &txn); ndb_print_kind_keys(&txn); ndb_end_query(&txn); + } else if (argc == 2 && !strcmp(argv[1], "print-tag-keys")) { + ndb_begin_query(ndb, &txn); + ndb_print_tag_keys(&txn); + ndb_end_query(&txn); } else { return usage(); } diff --git a/src/nostrdb.c b/src/nostrdb.c @@ -346,6 +346,32 @@ static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) { return diff ? diff : len_diff<0 ? -1 : len_diff; } +static int ndb_tag_key_compare(const MDB_val *a, const MDB_val *b) +{ + MDB_val va, vb; + uint64_t ts_a, ts_b; + int cmp; + + va.mv_data = a->mv_data; + va.mv_size = a->mv_size - 8; + + vb.mv_data = b->mv_data; + vb.mv_size = b->mv_size - 8; + + if ((cmp = mdb_cmp_memn(&va, &vb))) + return cmp; + + ts_a = *(uint64_t*)(va.mv_data + va.mv_size); + ts_b = *(uint64_t*)(vb.mv_data + vb.mv_size); + + if (ts_a < ts_b) + return -1; + else if (ts_a > ts_b) + return 1; + + return 0; +} + static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b) { struct cursor ca, cb; @@ -777,6 +803,7 @@ static int ndb_tag_filter_matches(struct ndb_filter_elements *els, // we should not expect an id if (str.flag == NDB_PACKED_ID) continue; + break; case NDB_ELEMENT_UNKNOWN: default: @@ -2455,6 +2482,123 @@ static int ndb_query_plan_execute_ids(struct ndb_txn *txn, return 1; } +// +// encode a tag index key +// +// consists of: +// +// u8 tag +// u8 tag_val_len +// [u8] tag_val_bytes +// u64 created_at +// +static int ndb_encode_tag_key(unsigned char *buf, int buf_size, + char tag, const unsigned char *val, + unsigned char val_len, + uint64_t timestamp) +{ + struct cursor writer; + int ok; + + // quick exit for obvious case where it will be too big. There can be + // values of val_len that still fail, but we just let the writer handle + // those failure cases + if (val_len >= buf_size) + return 0; + + make_cursor(buf, buf + buf_size, &writer); + + ok = + cursor_push_byte(&writer, tag) && + cursor_push(&writer, (unsigned char*)val, val_len) && + cursor_push(&writer, (unsigned char*)&timestamp, sizeof(timestamp)); + + if (!ok) + return 0; + + return writer.p - writer.start; +} + +static int ndb_query_plan_execute_tags(struct ndb_txn *txn, + struct ndb_filter *filter, + struct ndb_query_results *results, + int limit) +{ + MDB_cursor *cur; + MDB_dbi db; + MDB_val k, v; + int len, taglen, rc, i; + uint64_t *pint, until, note_id; + unsigned char key_buffer[255]; + struct ndb_note *note; + struct ndb_filter_elements *tags; + union ndb_filter_element *tag; + struct ndb_query_result res; + + db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; + + if (!(tags = ndb_filter_get_elems(filter, NDB_FILTER_TAGS))) + return 0; + + until = UINT64_MAX; + if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) + until = *pint; + + if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) + return 0; + + for (i = 0; i < tags->count; i++) { + tag = &tags->elements[i]; + + taglen = tags->field.elem_type == NDB_ELEMENT_ID + ? 32 : strlen(tag->string); + + if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), + tags->field.tag, tag->id, taglen, + until))) + return 0; + + k.mv_data = key_buffer; + k.mv_size = len; + + if (!ndb_cursor_start(cur, &k, &v)) + continue; + + // for each id in our ids filter, find in the db + while (!query_is_full(results, limit)) { + // check if tag value matches, bail if not + if (((unsigned char *)k.mv_data)[0] != tags->field.tag) + break; + + // check if tag value matches, bail if not + if (taglen != k.mv_size - 9) + break; + + if (memcmp(k.mv_data+1, tag->id, k.mv_size-9)) + break; + + note_id = *(uint64_t*)v.mv_data; + + if (!(note = ndb_get_note_by_key(txn, note_id, NULL))) + continue; + + if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_TAGS)) + goto next; + + ndb_query_result_init(&res, note, note_id); + if (!push_query_result(results, &res)) + break; + +next: + if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) + break; + } + } + + mdb_cursor_close(cur); + return 1; +} + static int ndb_query_plan_execute_kinds(struct ndb_txn *txn, struct ndb_filter *filter, struct ndb_query_results *results, @@ -2504,12 +2648,17 @@ static int ndb_query_plan_execute_kinds(struct ndb_txn *txn, break; note_id = *(uint64_t*)v.mv_data; - if ((note = ndb_get_note_by_key(txn, note_id, NULL))) { - ndb_query_result_init(&res, note, note_id); - if (!push_query_result(results, &res)) - break; - } + if (!(note = ndb_get_note_by_key(txn, note_id, NULL))) + goto next; + + if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_KINDS)) + goto next; + ndb_query_result_init(&res, note, note_id); + if (!push_query_result(results, &res)) + break; + +next: if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) break; } @@ -2531,10 +2680,10 @@ static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter) // this is rougly similar to the heuristic in strfry's dbscan if (ids) { return NDB_PLAN_IDS; - } else if (tags) { - return NDB_PLAN_TAGS; - } else if (authors) { + } else if (authors && authors->count <= 5) { return NDB_PLAN_AUTHORS; + } else if (tags && tags->count <= 5) { + return NDB_PLAN_TAGS; } else if (kinds) { return NDB_PLAN_KINDS; } @@ -2549,6 +2698,7 @@ static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter, { struct ndb_query_results results; uint64_t limit, *pint; + enum ndb_query_plan plan; limit = capacity; if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT))) @@ -2559,7 +2709,9 @@ static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter, ((unsigned char *)res) + limit * sizeof(*res), &results.cur); - switch (ndb_filter_plan(filter)) { + plan = ndb_filter_plan(filter); + ndb_debug("using query plan %d\n", plan); + switch (plan) { // We have a list of ids, just open a cursor and jump to each once case NDB_PLAN_IDS: if (!ndb_query_plan_execute_ids(txn, filter, &results, limit)) @@ -2572,10 +2724,13 @@ static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter, return 0; break; + case NDB_PLAN_TAGS: + if (!ndb_query_plan_execute_tags(txn, filter, &results, limit)) + return 0; + break; // TODO: finish query execution plans! case NDB_PLAN_CREATED: case NDB_PLAN_AUTHORS: - case NDB_PLAN_TAGS: return 0; } @@ -2609,6 +2764,61 @@ int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters, return 1; } +static int ndb_write_note_tag_index(struct ndb_txn *txn, struct ndb_note *note, + uint64_t note_key) +{ + unsigned char key_buffer[255]; + struct ndb_iterator iter; + struct ndb_str tkey, tval; + char tchar; + int len, rc; + MDB_val key, val; + MDB_dbi tags_db; + + tags_db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; + + ndb_tags_iterate_start(note, &iter); + + while (ndb_tags_iterate_next(&iter)) { + if (iter.tag->count < 2) + continue; + + tkey = ndb_tag_str(note, iter.tag, 0); + + // we only write indices for 1-char tags. + tchar = tkey.str[0]; + if (tchar == 0 || tkey.str[1] != 0) + continue; + + tval = ndb_tag_str(note, iter.tag, 1); + len = ndb_str_len(&tval); + + if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), + tchar, tval.id, (unsigned char)len, + ndb_note_created_at(note)))) { + // this will fail when we try to encode a key that is + // too big + continue; + } + + //ndb_debug("writing tag '%c':'data:%d' to index\n", tchar, len); + + key.mv_data = key_buffer; + key.mv_size = len; + + val.mv_data = &note_key; + val.mv_size = sizeof(note_key); + + if ((rc = mdb_put(txn->mdb_txn, tags_db, &key, &val, 0))) { + ndb_debug("write note tag index to db failed: %s\n", + mdb_strerror(rc)); + return 0; + } + } + + return 1; +} + static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t note_key) { @@ -3203,13 +3413,9 @@ static uint64_t ndb_write_note(struct ndb_txn *txn, return 0; } - // write id index key clustered with created_at - if (!ndb_write_note_id_index(txn, note->note, note_key)) - return 0; - - // write note kind index - if (!ndb_write_note_kind_index(txn, note->note, note_key)) - return 0; + ndb_write_note_id_index(txn, note->note, note_key); + ndb_write_note_kind_index(txn, note->note, note_key); + ndb_write_note_tag_index(txn, note->note, note_key); // only parse content and do fulltext index on text and longform notes if (note->note->kind == 1 || note->note->kind == 30023) { @@ -3691,6 +3897,13 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map return 0; } + if ((rc = mdb_dbi_open(txn, "note_tags", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, + &lmdb->dbs[NDB_DB_NOTE_TAGS]))) { + fprintf(stderr, "mdb_dbi_open note_tags failed: %s\n", mdb_strerror(rc)); + return 0; + } + mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TAGS], ndb_tag_key_compare); + // Commit the transaction if ((rc = mdb_txn_commit(txn))) { fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc); @@ -5026,6 +5239,27 @@ void ndb_config_set_ingest_filter(struct ndb_config *config, config->filter_context = filter_ctx; } +int ndb_print_tag_keys(struct ndb_txn *txn) +{ + MDB_cursor *cur; + MDB_val k, v; + int i; + + if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TAGS], &cur)) + return 0; + + i = 1; + while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { + printf("%d note_tags '%.*s' %" PRIu64 "\n", + i, (int)k.mv_size-8, (const char *)k.mv_data, + *(uint64_t*)(k.mv_data+(k.mv_size-8))); + + i++; + } + + return 1; +} + int ndb_print_kind_keys(struct ndb_txn *txn) { MDB_cursor *cur; @@ -5099,6 +5333,13 @@ struct ndb_str ndb_tag_str(struct ndb_note *note, struct ndb_tag *tag, int ind) return ndb_note_str(note, &tag->strs[ind]); } +int ndb_str_len(struct ndb_str *str) +{ + if (str->flag == NDB_PACKED_ID) + return 32; + return strlen(str->str); +} + struct ndb_str ndb_iter_tag_str(struct ndb_iterator *iter, int ind) { return ndb_tag_str(iter->note, iter->tag, ind); @@ -5161,13 +5402,15 @@ void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter) int ndb_tags_iterate_next(struct ndb_iterator *iter) { + struct ndb_tags *tags; + if (iter->tag == NULL || iter->index == -1) { iter->tag = iter->note->tags.tag; iter->index = 0; return iter->note->tags.count != 0; } - struct ndb_tags *tags = &iter->note->tags; + tags = &iter->note->tags; if (++iter->index < tags->count) { uint32_t tag_data_size = iter->tag->count * sizeof(iter->tag->strs[0]); @@ -5261,6 +5504,8 @@ const char *ndb_db_name(enum ndb_dbs db) return "note_fulltext"; case NDB_DB_NOTE_BLOCKS: return "note_blocks"; + case NDB_DB_NOTE_TAGS: + return "note_tags"; case NDB_DBS: return "count"; } diff --git a/src/nostrdb.h b/src/nostrdb.h @@ -170,6 +170,7 @@ enum ndb_dbs { NDB_DB_NOTE_KIND, // note kind index NDB_DB_NOTE_TEXT, // note fulltext index NDB_DB_NOTE_BLOCKS, // parsed note blocks for rendering + NDB_DB_NOTE_TAGS, // note tags index NDB_DBS, }; @@ -502,6 +503,7 @@ unsigned char *ndb_note_pubkey(struct ndb_note *note); unsigned char *ndb_note_sig(struct ndb_note *note); void _ndb_note_set_kind(struct ndb_note *note, uint32_t kind); struct ndb_tags *ndb_note_tags(struct ndb_note *note); +int ndb_str_len(struct ndb_str *str); // TAGS void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter); diff --git a/test.c b/test.c @@ -1142,6 +1142,43 @@ static void test_fast_strchr() assert(fast_strchr(testStr6, 'm', strlen(testStr6)) == testStr6 + 38); } +static void test_tag_query() +{ + struct ndb *ndb; + struct ndb_txn txn; + struct ndb_filter filters[1], *f = &filters[0]; + struct ndb_config config; + struct ndb_query_result results[4]; + int count, cap; + uint64_t subid, note_ids[1]; + ndb_default_config(&config); + + cap = sizeof(results) / sizeof(results[0]); + + assert(ndb_init(&ndb, test_dir, &config)); + + const char *ev = "[\"EVENT\",\"s\",{\"id\": \"7fd6e4286e595b60448bf69d8ec4a472c5ad14521555813cdfce1740f012aefd\",\"pubkey\": \"b85beab689aed6a10110cc3cdd6e00ac37a2f747c4e60b18a31f4352a5bfb6ed\",\"created_at\": 1704762185,\"kind\": 1,\"tags\": [[\"t\",\"hashtag\"]],\"content\": \"hi\",\"sig\": \"5b05669af5a322730731b13d38667464ea3b45bef1861e26c99ef1815d7e8d557a76e06afa5fffa1dcd207402b92ae7dda6ef411ea515df2bca58d74e6f2772e\"}]"; + + f = &filters[0]; + ndb_filter_init(f); + ndb_filter_start_tag_field(f, 't'); + ndb_filter_add_str_element(f, "hashtag"); + ndb_filter_end_field(f); + + assert((subid = ndb_subscribe(ndb, f, 1))); + assert(ndb_process_event(ndb, ev, strlen(ev))); + ndb_wait_for_notes(ndb, subid, note_ids, 1); + + ndb_begin_query(ndb, &txn); + + assert(ndb_query(&txn, f, 1, results, cap, &count)); + assert(count == 1); + assert(!strcmp(ndb_note_content(results[0].note), "hi")); + + ndb_end_query(&txn); + ndb_destroy(ndb); +} + static void test_query() { struct ndb *ndb; @@ -1179,7 +1216,6 @@ static void test_query() assert(ndb_init(&ndb, test_dir, &config)); - f = &filters[0]; ndb_filter_init(f); ndb_filter_start_field(f, NDB_FILTER_IDS); @@ -1340,6 +1376,7 @@ static void test_subscriptions() int main(int argc, const char *argv[]) { test_query(); + test_tag_query(); test_parse_content(); test_url_parsing(); test_subscriptions();