commit c00473a296d69f3ac5db2c814beaf0e75eb0b3a5
parent 82c94b40a34d5a29785b17a30e2e6afcdfdb2dd5
Author: William Casarin <jb55@jb55.com>
Date: Thu, 4 Jan 2024 14:39:34 -0800
Initial nostrdb queries
Still a lot more work to do, but this is at least a proof of concept for
querying nostrdb using filters.
Diffstat:
M | src/nostrdb.c | | | 237 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- |
M | src/nostrdb.h | | | 6 | +++++- |
M | test.c | | | 50 | ++++++++++++++++++++++++++++++++++++++++++++++++++ |
3 files changed, 284 insertions(+), 9 deletions(-)
diff --git a/src/nostrdb.c b/src/nostrdb.c
@@ -836,7 +836,8 @@ static int compare_kinds(const void *pa, const void *pb)
// returns 1 if a filter matches a note
-int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note)
+static int ndb_filter_matches_with(struct ndb_filter *filter,
+ struct ndb_note *note, int already_matched)
{
int i, j;
unsigned char *id;
@@ -845,6 +846,11 @@ int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note)
for (i = 0; i < filter->num_elements; i++) {
els = filter->elements[i];
+ // if we know we already match from a query scan result,
+ // we can skip this check
+ if ((1 << els->field.type) & already_matched)
+ continue;
+
switch (els->field.type) {
case NDB_FILTER_KINDS:
for (j = 0; j < els->count; j++) {
@@ -891,6 +897,11 @@ cont:
return 1;
}
+int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note)
+{
+ return ndb_filter_matches_with(filter, note, 0);
+}
+
void ndb_filter_end_field(struct ndb_filter *filter)
{
struct ndb_filter_elements *cur;
@@ -2280,22 +2291,232 @@ static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note,
return 1;
}
-/*
-static int ndb_filter_query(struct ndb *ndb, struct ndb_filter *filter)
+static int ndb_filter_group_add_filters(struct ndb_filter_group *group,
+ struct ndb_filter *filters,
+ int num_filters)
+{
+ int i;
+
+ for (i = 0; i < num_filters; i++) {
+ if (!ndb_filter_group_add(group, &filters[i]))
+ return 0;
+ }
+
+ return 1;
+}
+
+static int ndb_filter_int(struct ndb_filter *filter,
+ enum ndb_filter_fieldtype typ, uint64_t *lim)
{
+ int i;
+ struct ndb_filter_elements *els;
+
+ for (i = 0; i < filter->num_elements; i++) {
+ els = filter->elements[i];
+ if (els->field.type == typ) {
+ *lim = els->elements[0].integer;
+ return 1;
+ }
+ }
+
+ return 0;
}
-static int ndb_filter_cursors(struct ndb_filter *filter, struct ndb_cursor)
+static int ndb_filter_get_limit(struct ndb_filter *filter, uint64_t *lim)
{
+ return ndb_filter_int(filter, NDB_FILTER_LIMIT, lim);
}
-int ndb_query(struct ndb *ndb, struct ndb_filter **filters, int num_filters)
+static int ndb_filter_get_until(struct ndb_filter *filter, uint64_t *lim)
{
- struct ndb_filter_group group;
- ndb_filter_group_init(&group);
+ return ndb_filter_int(filter, NDB_FILTER_UNTIL, lim);
+}
+static int ndb_filter_get_since(struct ndb_filter *filter, uint64_t *lim)
+{
+ return ndb_filter_int(filter, NDB_FILTER_SINCE, lim);
+}
+
+static int ndb_query_filter_id(struct ndb_txn *txn, struct ndb_filter *filter,
+ MDB_cursor *cur, const unsigned char *id,
+ uint64_t since, int *matched,
+ struct ndb_query_result *res)
+{
+ MDB_val k, v;
+ uint64_t note_id;
+ struct ndb_tsid tsid, *ptsid;
+
+ res->note = NULL;
+
+ ndb_tsid_init(&tsid, (unsigned char *)id, since);
+
+ k.mv_data = &tsid;
+ k.mv_size = sizeof(tsid);
+
+ if (!ndb_cursor_start(cur, &k, &v))
+ return 0;
+
+ ptsid = (struct ndb_tsid *)k.mv_data;
+ note_id = *(uint64_t*)v.mv_data;
+
+ if (memcmp(id, ptsid->id, 32) == 0)
+ *matched |= 1 << NDB_FILTER_AUTHORS;
+ else
+ return 1;
+
+ // get the note because we need it to match against the filter
+ if (!(res->note = ndb_get_note_by_key(txn, note_id, NULL)))
+ return 1;
+
+ // Sure this particular lookup matched the index query, but does it
+ // match the entire filter? Check! We also pass in things we've already
+ // matched via the filter so we don't have to check again. This can be
+ // pretty important for filters with a large number of entries.
+ if (!ndb_filter_matches_with(filter, res->note, *matched))
+ return 1;
+
+ return 2;
+}
+
+static inline int push_query_result(struct cursor *res,
+ struct ndb_query_result *result)
+{
+ return cursor_push(res, (unsigned char*)result, sizeof(*result));
+}
+
+static int compare_query_results(const void *pa, const void *pb)
+{
+ struct ndb_query_result *a, *b;
+
+ a = (struct ndb_query_result *)pa;
+ b = (struct ndb_query_result *)pb;
+
+ if (a->note->created_at == b->note->created_at) {
+ return memcmp(a->note->id, b->note->id, 32);
+ } else if (a->note->created_at > b->note->created_at) {
+ return -1;
+ } else {
+ return 1;
+ }
+}
+
+static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
+ struct ndb_query_result *results, int capacity,
+ int *results_out)
+{
+ struct ndb_filter_elements *els;
+ struct ndb_query_result res;
+ struct cursor results_arr;
+ uint64_t limit, since, until;
+ const unsigned char *id;
+ int i, k, rc;
+ MDB_cursor *cur;
+ MDB_dbi db;
+
+ since = UINT64_MAX;
+ until = UINT64_MAX;
+ limit = capacity;
+
+ ndb_filter_get_limit(filter, &limit);
+ ndb_filter_get_since(filter, &since);
+ ndb_filter_get_until(filter, &until);
+
+ limit = min(capacity, limit);
+ make_cursor((unsigned char *)results,
+ ((unsigned char *)results) + limit * sizeof(*results),
+ &results_arr);
+
+ for (i = 0; i < filter->num_elements; i++) {
+ if (results_arr.p >= results_arr.end)
+ goto done;
+
+ els = filter->elements[i];
+ switch (els->field.type) {
+ case NDB_FILTER_IDS:
+ int matched = 0;
+ db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
+ if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
+ return 0;
+
+ // for each id in our ids filter, find in the db
+ for (k = 0; k < els->count; k++) {
+ if (results_arr.p >= results_arr.end) {
+ mdb_cursor_close(cur);
+ goto done;
+ }
+
+ id = els->elements[k].id;
+ if (!(rc = ndb_query_filter_id(txn, filter, cur,
+ id, since,
+ &matched,
+ &res))) {
+ // there was a fatal error
+ mdb_cursor_close(cur);
+ return 0;
+ }
+
+ // no match, just try next id
+ if (rc == 1)
+ continue;
+
+ // rc > 1, matched!
+ if (!push_query_result(&results_arr, &res)) {
+ // this should never happen, but if
+ // it fails to push that means there
+ // are no more result to push,
+ // so just return
+ mdb_cursor_close(cur);
+ goto done;
+ }
+
+ // look for more ids... continue!
+ }
+
+ mdb_cursor_close(cur);
+ break;
+ case NDB_FILTER_AUTHORS:
+ break;
+ case NDB_FILTER_KINDS:
+ break;
+ case NDB_FILTER_GENERIC:
+ break;
+ case NDB_FILTER_SINCE:
+ case NDB_FILTER_UNTIL:
+ case NDB_FILTER_LIMIT:
+ break;
+ }
+ }
+
+done:
+ *results_out = cursor_count(&results_arr, sizeof(*results));
+ return 1;
+}
+
+int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters,
+ struct ndb_query_result *results, int result_capacity, int *count)
+{
+ int i, out;
+ struct ndb_query_result *p = results;
+
+ *count = 0;
+
+ for (i = 0; i < num_filters; i++) {
+ if (!ndb_query_filter(txn, &filters[i], p,
+ result_capacity, &out)) {
+ return 0;
+ }
+
+ *count += out;
+ p += out;
+ result_capacity -= out;
+ if (result_capacity <= 0)
+ break;
+ }
+
+ // sort results
+ qsort(results, *count, sizeof(*results), compare_query_results);
+ return 1;
}
-*/
static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note,
uint64_t note_key)
diff --git a/src/nostrdb.h b/src/nostrdb.h
@@ -397,6 +397,10 @@ struct ndb_block_iterator {
unsigned char *p;
};
+struct ndb_query_result {
+ struct ndb_note *note;
+};
+
// CONFIG
void ndb_default_config(struct ndb_config *);
void ndb_config_set_ingest_threads(struct ndb_config *config, int threads);
@@ -476,7 +480,7 @@ void ndb_text_search_config_set_order(struct ndb_text_search_config *, enum ndb_
void ndb_text_search_config_set_limit(struct ndb_text_search_config *, int limit);
// QUERY
-void ndb_query(struct ndb_filter **, int num_filters);
+int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters, struct ndb_query_result *results, int result_capacity, int *count);
// STATS
int ndb_stat(struct ndb *ndb, struct ndb_stat *stat);
diff --git a/test.c b/test.c
@@ -1141,6 +1141,55 @@ static void test_fast_strchr()
assert(fast_strchr(testStr6, 'm', strlen(testStr6)) == testStr6 + 38);
}
+static void test_query()
+{
+ struct ndb *ndb;
+ struct ndb_txn txn;
+ struct ndb_filter filter, *f = &filter;
+ struct ndb_config config;
+ struct ndb_query_result results[2];
+ int count;
+ uint64_t subid, note_ids[2];
+ ndb_default_config(&config);
+
+ const unsigned char id[] = {
+ 0x03, 0x36, 0x94, 0x8b, 0xdf, 0xbf, 0x5f, 0x93, 0x98, 0x02, 0xeb, 0xa0,
+ 0x3a, 0xa7, 0x87, 0x35, 0xc8, 0x28, 0x25, 0x21, 0x1e, 0xec, 0xe9, 0x87,
+ 0xa6, 0xd2, 0xe2, 0x0e, 0x3c, 0xff, 0xf9, 0x30
+ };
+
+ const unsigned char id2[] = {
+ 0x0a, 0x35, 0x0c, 0x58, 0x51, 0xaf, 0x6f, 0x6c, 0xe3, 0x68, 0xba, 0xb4,
+ 0xe2, 0xd4, 0xfe, 0x44, 0x2a, 0x13, 0x18, 0x64, 0x2c, 0x7f, 0xe5, 0x8d,
+ 0xe5, 0x39, 0x21, 0x03, 0x70, 0x0c, 0x10, 0xfc
+ };
+
+
+ const char *ev = "[\"EVENT\",\"s\",{\"id\": \"0336948bdfbf5f939802eba03aa78735c82825211eece987a6d2e20e3cfff930\",\"pubkey\": \"aeadd3bf2fd92e509e137c9e8bdf20e99f286b90be7692434e03c015e1d3bbfe\",\"created_at\": 1704401597,\"kind\": 1,\"tags\": [],\"content\": \"hello\",\"sig\": \"232395427153b693e0426b93d89a8319324d8657e67d23953f014a22159d2127b4da20b95644b3e34debd5e20be0401c283e7308ccb63c1c1e0f81cac7502f09\"}]";
+
+ const char *ev2 = "[\"EVENT\",\"s\",{\"id\": \"0a350c5851af6f6ce368bab4e2d4fe442a1318642c7fe58de5392103700c10fc\",\"pubkey\": \"dfa3fc062f7430dab3d947417fd3c6fb38a7e60f82ffe3387e2679d4c6919b1d\",\"created_at\": 1704404822,\"kind\": 1,\"tags\": [],\"content\": \"hello2\",\"sig\": \"48a0bb9560b89ee2c6b88edcf1cbeeff04f5e1b10d26da8564cac851065f30fa6961ee51f450cefe5e8f4895e301e8ffb2be06a2ff44259684fbd4ea1c885696\"}]";
+
+ assert(ndb_init(&ndb, test_dir, &config));
+
+ ndb_filter_init(f);
+ ndb_filter_start_field(f, NDB_FILTER_IDS);
+ ndb_filter_add_id_element(f, id);
+ ndb_filter_add_id_element(f, id2);
+ ndb_filter_end_field(f);
+
+ assert((subid = ndb_subscribe(ndb, f, 1)));
+ assert(ndb_process_event(ndb, ev, strlen(ev)));
+ assert(ndb_process_event(ndb, ev2, strlen(ev2)));
+ assert(ndb_wait_for_notes(ndb, subid, note_ids, 2));
+
+ ndb_begin_query(ndb, &txn);
+ assert(ndb_query(&txn, f, 1, results, 2, &count));
+ assert(count == 2);
+ assert(0 == memcmp(ndb_note_id(results[0].note), id2, 32));
+ ndb_end_query(&txn);
+ ndb_destroy(ndb);
+}
+
static void test_fulltext()
{
struct ndb *ndb;
@@ -1259,6 +1308,7 @@ static void test_subscriptions()
}
int main(int argc, const char *argv[]) {
+ test_query();
test_parse_content();
test_url_parsing();
test_subscriptions();