nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

nostrdb.c (159221B)


      1 
      2 #include "nostrdb.h"
      3 #include "jsmn.h"
      4 #include "hex.h"
      5 #include "cursor.h"
      6 #include "random.h"
      7 #include "ccan/crypto/sha256/sha256.h"
      8 #include "bolt11/bolt11.h"
      9 #include "bolt11/amount.h"
     10 #include "lmdb.h"
     11 #include "util.h"
     12 #include "cpu.h"
     13 #include "block.h"
     14 #include "threadpool.h"
     15 #include "protected_queue.h"
     16 #include "memchr.h"
     17 #include "print_util.h"
     18 #include <stdlib.h>
     19 #include <limits.h>
     20 #include <assert.h>
     21 
     22 #include "bindings/c/profile_json_parser.h"
     23 #include "bindings/c/profile_builder.h"
     24 #include "bindings/c/meta_builder.h"
     25 #include "bindings/c/meta_reader.h"
     26 #include "bindings/c/profile_verifier.h"
     27 #include "secp256k1.h"
     28 #include "secp256k1_ecdh.h"
     29 #include "secp256k1_schnorrsig.h"
     30 
     31 #define max(a,b) ((a) > (b) ? (a) : (b))
     32 #define min(a,b) ((a) < (b) ? (a) : (b))
     33 
     34 // the maximum number of things threads pop and push in bulk
     35 static const int THREAD_QUEUE_BATCH = 4096;
     36 
     37 // maximum number of active subscriptions
     38 #define MAX_SUBSCRIPTIONS 256
     39 #define MAX_SCAN_CURSORS 12
     40 #define MAX_FILTERS    16
     41 
     42 // the maximum size of inbox queues
     43 static const int DEFAULT_QUEUE_SIZE = 1000000;
     44 
     45 
     46 // increase if we need bigger filters
     47 #define NDB_FILTER_PAGES 64
     48 
     49 #define ndb_flag_set(flags, f) ((flags & f) == f)
     50 
     51 #define NDB_PARSED_ID           (1 << 0)
     52 #define NDB_PARSED_PUBKEY       (1 << 1)
     53 #define NDB_PARSED_SIG          (1 << 2)
     54 #define NDB_PARSED_CREATED_AT   (1 << 3)
     55 #define NDB_PARSED_KIND         (1 << 4)
     56 #define NDB_PARSED_CONTENT      (1 << 5)
     57 #define NDB_PARSED_TAGS         (1 << 6)
     58 #define NDB_PARSED_ALL          (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS)
     59 
     60 typedef int (*ndb_migrate_fn)(struct ndb *);
     61 typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len,
     62 				  int word_index);
     63 
     64 // these must be byte-aligned, they are directly accessing the serialized data
     65 // representation
     66 #pragma pack(push, 1)
     67 
     68 union ndb_packed_str {
     69 	struct {
     70 		char str[3];
     71 		// we assume little endian everywhere. sorry not sorry.
     72 		unsigned char flag; // NDB_PACKED_STR, etc
     73 	} packed;
     74 
     75 	uint32_t offset;
     76 	unsigned char bytes[4];
     77 };
     78 
     79 struct ndb_tag {
     80 	uint16_t count;
     81 	union ndb_packed_str strs[0];
     82 };
     83 
     84 struct ndb_tags {
     85 	uint16_t padding;
     86 	uint16_t count;
     87 	struct ndb_tag tag[0];
     88 };
     89 
     90 // v1
     91 struct ndb_note {
     92 	unsigned char version;    // v=1
     93 	unsigned char padding[3]; // keep things aligned
     94 	unsigned char id[32];
     95 	unsigned char pubkey[32];
     96 	unsigned char sig[64];
     97 
     98 	uint64_t created_at;
     99 	uint32_t kind;
    100 	uint32_t content_length;
    101 	union ndb_packed_str content;
    102 	uint32_t strings;
    103 	// nothing can come after tags since it contains variadic data
    104 	struct ndb_tags tags;
    105 };
    106 
    107 #pragma pack(pop)
    108 
    109 
    110 struct ndb_migration {
    111 	ndb_migrate_fn fn;
    112 };
    113 
    114 struct ndb_profile_record_builder {
    115 	flatcc_builder_t *builder;
    116 	void *flatbuf;
    117 };
    118 
    119 // controls whether to continue or stop the json parser
    120 enum ndb_idres {
    121 	NDB_IDRES_CONT,
    122 	NDB_IDRES_STOP,
    123 };
    124 
    125 // closure data for the id-detecting ingest controller
    126 struct ndb_ingest_controller
    127 {
    128 	MDB_txn *read_txn;
    129 	struct ndb_lmdb *lmdb;
    130 };
    131 
    132 enum ndb_writer_msgtype {
    133 	NDB_WRITER_QUIT, // kill thread immediately
    134 	NDB_WRITER_NOTE, // write a note to the db
    135 	NDB_WRITER_PROFILE, // write a profile to the db
    136 	NDB_WRITER_DBMETA, // write ndb metadata
    137 	NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched
    138 	NDB_WRITER_BLOCKS, // write parsed note blocks
    139 };
    140 
    141 // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META)
    142 enum ndb_meta_key {
    143 	NDB_META_KEY_VERSION = 1
    144 };
    145 
    146 struct ndb_json_parser {
    147 	const char *json;
    148 	int json_len;
    149 	struct ndb_builder builder;
    150 	jsmn_parser json_parser;
    151 	jsmntok_t *toks, *toks_end;
    152 	int i;
    153 	int num_tokens;
    154 };
    155 
    156 // useful to pass to threads on its own
    157 struct ndb_lmdb {
    158 	MDB_env *env;
    159 	MDB_dbi dbs[NDB_DBS];
    160 };
    161 
    162 struct ndb_writer {
    163 	struct ndb_lmdb *lmdb;
    164 	struct ndb_monitor *monitor;
    165 
    166 	void *queue_buf;
    167 	int queue_buflen;
    168 	pthread_t thread_id;
    169 
    170 	struct prot_queue inbox;
    171 };
    172 
    173 struct ndb_ingester {
    174 	uint32_t flags;
    175 	struct threadpool tp;
    176 	struct ndb_writer *writer;
    177 	void *filter_context;
    178 	ndb_ingest_filter_fn filter;
    179 };
    180 
    181 struct ndb_filter_group {
    182 	struct ndb_filter filters[MAX_FILTERS];
    183 	int num_filters;
    184 };
    185 
    186 struct ndb_subscription {
    187 	uint64_t subid;
    188 	struct ndb_filter_group group;
    189 	struct prot_queue inbox;
    190 };
    191 
    192 struct ndb_monitor {
    193 	struct ndb_subscription subscriptions[MAX_SUBSCRIPTIONS];
    194 	ndb_sub_fn sub_cb;
    195 	void *sub_cb_ctx;
    196 	int num_subscriptions;
    197 };
    198 
    199 struct ndb {
    200 	struct ndb_lmdb lmdb;
    201 	struct ndb_ingester ingester;
    202 	struct ndb_monitor monitor;
    203 	struct ndb_writer writer;
    204 	int version;
    205 	uint32_t flags; // setting flags
    206 	// lmdb environ handles, etc
    207 };
    208 
    209 ///
    210 /// Query Plans
    211 ///
    212 /// There are general strategies for performing certain types of query
    213 /// depending on the filter. For example, for large contact list queries
    214 /// with many authors, we simply do a descending scan on created_at
    215 /// instead of doing 1000s of pubkey scans.
    216 ///
    217 /// Query plans are calculated from filters via `ndb_filter_plan`
    218 ///
    219 enum ndb_query_plan {
    220 	NDB_PLAN_KINDS,
    221 	NDB_PLAN_IDS,
    222 	NDB_PLAN_AUTHORS,
    223 	NDB_PLAN_CREATED,
    224 	NDB_PLAN_TAGS,
    225 };
    226 
    227 // A clustered key with an id and a timestamp
    228 struct ndb_tsid {
    229 	unsigned char id[32];
    230 	uint64_t timestamp;
    231 };
    232 
    233 // A u64 + timestamp id. Just using this for kinds at the moment.
    234 struct ndb_u64_tsid {
    235 	uint64_t u64; // kind, etc
    236 	uint64_t timestamp;
    237 };
    238 
    239 struct ndb_word
    240 {
    241 	const char *word;
    242 	int word_len;
    243 };
    244 
    245 struct ndb_search_words
    246 {
    247 	struct ndb_word words[MAX_TEXT_SEARCH_WORDS];
    248 	int num_words;
    249 };
    250 
    251 // ndb_text_search_key
    252 //
    253 // This is compressed when in lmdb:
    254 //
    255 //   note_id:    varint
    256 //   strlen:     varint
    257 //   str:        cstr
    258 //   timestamp:  varint
    259 //   word_index: varint
    260 //
    261 static int ndb_make_text_search_key(unsigned char *buf, int bufsize,
    262 				    int word_index, int word_len, const char *str,
    263 				    uint64_t timestamp, uint64_t note_id,
    264 				    int *keysize)
    265 {
    266 	struct cursor cur;
    267 	make_cursor(buf, buf + bufsize, &cur);
    268 
    269 	// TODO: need update this to uint64_t
    270 	// we push this first because our query function can pull this off
    271 	// quickly to check matches
    272 	if (!cursor_push_varint(&cur, (int32_t)note_id))
    273 		return 0;
    274 
    275 	// string length
    276 	if (!cursor_push_varint(&cur, word_len))
    277 		return 0;
    278 
    279 	// non-null terminated, lowercase string
    280 	if (!cursor_push_lowercase(&cur, str, word_len))
    281 		return 0;
    282 
    283 	// TODO: need update this to uint64_t
    284 	if (!cursor_push_varint(&cur, (int)timestamp))
    285 		return 0;
    286 
    287 	// the index of the word in the content so that we can do more accurate
    288 	// phrase searches
    289 	if (!cursor_push_varint(&cur, word_index))
    290 		return 0;
    291 
    292 	// pad to 8-byte alignment
    293 	if (!cursor_align(&cur, 8))
    294 		return 0;
    295 
    296 	*keysize = cur.p - cur.start;
    297 	assert((*keysize % 8) == 0);
    298 
    299 	return 1;
    300 }
    301 
    302 static int ndb_make_noted_text_search_key(unsigned char *buf, int bufsize,
    303 					  int wordlen, const char *word,
    304 					  uint64_t timestamp, uint64_t note_id,
    305 					  int *keysize)
    306 {
    307 	return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word,
    308 					timestamp, note_id, keysize);
    309 }
    310 
    311 static int ndb_make_text_search_key_low(unsigned char *buf, int bufsize,
    312 					int wordlen, const char *word,
    313 					int *keysize)
    314 {
    315 	uint64_t timestamp, note_id;
    316 	timestamp = 0;
    317 	note_id = 0;
    318 	return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word,
    319 					timestamp, note_id, keysize);
    320 }
    321 
    322 static int ndb_make_text_search_key_high(unsigned char *buf, int bufsize,
    323 					 int wordlen, const char *word,
    324 					 int *keysize)
    325 {
    326 	uint64_t timestamp, note_id;
    327 	timestamp = INT32_MAX;
    328 	note_id = INT32_MAX;
    329 	return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word,
    330 					timestamp, note_id, keysize);
    331 }
    332 
    333 typedef int (*ndb_text_search_key_order_fn)(unsigned char *buf, int bufsize, int wordlen, const char *word, int *keysize);
    334 
    335 /** From LMDB: Compare two items lexically */
    336 static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) {
    337 	int diff;
    338 	ssize_t len_diff;
    339 	unsigned int len;
    340 
    341 	len = a->mv_size;
    342 	len_diff = (ssize_t) a->mv_size - (ssize_t) b->mv_size;
    343 	if (len_diff > 0) {
    344 		len = b->mv_size;
    345 		len_diff = 1;
    346 	}
    347 
    348 	diff = memcmp(a->mv_data, b->mv_data, len);
    349 	return diff ? diff : len_diff<0 ? -1 : len_diff;
    350 }
    351 
    352 static int ndb_tag_key_compare(const MDB_val *a, const MDB_val *b)
    353 {
    354 	MDB_val va, vb;
    355 	uint64_t ts_a, ts_b;
    356 	int cmp;
    357 
    358 	va.mv_data = a->mv_data;
    359 	va.mv_size = a->mv_size - 8;
    360 
    361 	vb.mv_data = b->mv_data;
    362 	vb.mv_size = b->mv_size - 8;
    363 
    364 	if ((cmp = mdb_cmp_memn(&va, &vb)))
    365 		return cmp;
    366 
    367 	ts_a = *(uint64_t*)(va.mv_data + va.mv_size);
    368 	ts_b = *(uint64_t*)(vb.mv_data + vb.mv_size);
    369 
    370 	if (ts_a < ts_b)
    371 		return -1;
    372 	else if (ts_a > ts_b)
    373 		return 1;
    374 
    375 	return 0;
    376 }
    377 
    378 static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b)
    379 {
    380 	struct cursor ca, cb;
    381 	uint64_t sa, sb, nid_a, nid_b;
    382 	MDB_val a2, b2;
    383 
    384 	make_cursor(a->mv_data, a->mv_data + a->mv_size, &ca);
    385 	make_cursor(b->mv_data, b->mv_data + b->mv_size, &cb);
    386 
    387 	// note_id
    388 	if (unlikely(!cursor_pull_varint(&ca, &nid_a) || !cursor_pull_varint(&cb, &nid_b)))
    389 		return 0;
    390 
    391 	// string size
    392 	if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb)))
    393 		return 0;
    394 
    395 	a2.mv_data = ca.p;
    396 	a2.mv_size = sa;
    397 
    398 	b2.mv_data = cb.p;
    399 	b2.mv_size = sb;
    400 
    401 	int cmp = mdb_cmp_memn(&a2, &b2);
    402 	if (cmp) return cmp;
    403 
    404 	// skip over string
    405 	ca.p += sa;
    406 	cb.p += sb;
    407 
    408 	// timestamp
    409 	if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb)))
    410 		return 0;
    411 
    412 	if      (sa < sb) return -1;
    413 	else if (sa > sb) return 1;
    414 
    415 	// note_id
    416 	if      (nid_a < nid_b) return -1;
    417 	else if (nid_a > nid_b) return 1;
    418 
    419 	// word index
    420 	if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb)))
    421 		return 0;
    422 
    423 	if      (sa < sb) return -1;
    424 	else if (sa > sb) return 1;
    425 
    426 	return 0;
    427 }
    428 
    429 static inline int ndb_unpack_text_search_key_noteid(
    430 		struct cursor *cur, uint64_t *note_id)
    431 {
    432 	if (!cursor_pull_varint(cur, note_id))
    433 		return 0;
    434 
    435 	return 1;
    436 }
    437 
    438 // faster peek of just the string instead of unpacking everything
    439 // this is used to quickly discard range query matches if there is no
    440 // common prefix
    441 static inline int ndb_unpack_text_search_key_string(struct cursor *cur,
    442 						    const char **str,
    443 						    int *str_len)
    444 {
    445 	uint64_t len;
    446 
    447 	if (!cursor_pull_varint(cur, &len))
    448 		return 0;
    449 
    450 	*str_len = len;
    451 
    452 	*str = (const char *)cur->p;
    453 
    454 	if (!cursor_skip(cur, *str_len))
    455 		return 0;
    456 
    457 	return 1;
    458 }
    459 
    460 // should be called after ndb_unpack_text_search_key_string. It continues
    461 // the unpacking of a text search key if we've already started it.
    462 static inline int
    463 ndb_unpack_remaining_text_search_key(struct cursor *cur,
    464 				     struct ndb_text_search_key *key)
    465 {
    466 	if (!cursor_pull_varint(cur, &key->timestamp))
    467 		return 0;
    468 
    469 	if (!cursor_pull_varint(cur, &key->word_index))
    470 		return 0;
    471 
    472 	return 1;
    473 }
    474 
    475 // unpack a fulltext search key
    476 //
    477 // full version of string + unpack remaining. This is split up because text
    478 // searching only requires to pull the string for prefix searching, and the
    479 // remaining is optional
    480 static inline int ndb_unpack_text_search_key(unsigned char *p, int len,
    481 				      struct ndb_text_search_key *key)
    482 {
    483 	struct cursor c;
    484 	make_cursor(p, p + len, &c);
    485 
    486 	if (!ndb_unpack_text_search_key_noteid(&c, &key->note_id))
    487 		return 0;
    488 
    489 	if (!ndb_unpack_text_search_key_string(&c, &key->str, &key->str_len))
    490 		return 0;
    491 
    492 	return ndb_unpack_remaining_text_search_key(&c, key);
    493 }
    494 
    495 // Copies only lowercase characters to the destination string and fills the rest with null bytes.
    496 // `dst` and `src` are pointers to the destination and source strings, respectively.
    497 // `n` is the maximum number of characters to copy.
    498 static void lowercase_strncpy(char *dst, const char *src, int n) {
    499 	int j = 0, i = 0;
    500 
    501 	if (!dst || !src || n == 0) {
    502 		return;
    503 	}
    504 
    505 	while (src[i] != '\0' && j < n) {
    506 		dst[j++] = tolower(src[i++]);
    507 	}
    508 
    509 	// Null-terminate and fill the destination string
    510 	while (j < n) {
    511 		dst[j++] = '\0';
    512 	}
    513 }
    514 
    515 static inline int ndb_filter_elem_is_ptr(struct ndb_filter_field *field) {
    516 	return field->elem_type == NDB_ELEMENT_STRING || field->elem_type == NDB_ELEMENT_ID;
    517 }
    518 
    519 // Copy the filter
    520 int ndb_filter_clone(struct ndb_filter *dst, struct ndb_filter *src)
    521 {
    522 	size_t src_size, elem_size, data_size;
    523 
    524 	memcpy(dst, src, sizeof(*src));
    525 
    526 	elem_size = src->elem_buf.end - src->elem_buf.start;
    527 	data_size = src->data_buf.end - src->data_buf.start;
    528 	src_size = data_size + elem_size;
    529 
    530 	// let's only allow finalized filters to be cloned
    531 	if (!src || !src->finalized)
    532 		return 0;
    533 
    534 	dst->elem_buf.start = malloc(src_size);
    535 	dst->elem_buf.end = dst->elem_buf.start + elem_size;
    536 	dst->elem_buf.p = dst->elem_buf.end;
    537 
    538 	dst->data_buf.start = dst->elem_buf.start + elem_size;
    539 	dst->data_buf.end = dst->data_buf.start + data_size;
    540 	dst->data_buf.p = dst->data_buf.end;
    541 
    542 	if (dst->elem_buf.start == NULL)
    543 		return 0;
    544 
    545 	memcpy(dst->elem_buf.start, src->elem_buf.start, src_size);
    546 
    547 	return 1;
    548 }
    549 
    550 // "Finalize" the filter. This resizes the allocated heap buffers so that they
    551 // are as small as possible. This also prevents new fields from being added
    552 int ndb_filter_end(struct ndb_filter *filter)
    553 {
    554 #ifdef DEBUG
    555 	size_t orig_size;
    556 #endif
    557 	size_t data_len, elem_len;
    558 	if (filter->finalized == 1)
    559 		return 0;
    560 
    561 	// move the data buffer to the end of the element buffer and update
    562 	// all of the element pointers accordingly
    563 	data_len = filter->data_buf.p - filter->data_buf.start;
    564 	elem_len = filter->elem_buf.p - filter->elem_buf.start;
    565 #ifdef DEBUG
    566 	orig_size = filter->data_buf.end - filter->elem_buf.start;
    567 #endif
    568 
    569 	// cap the elem buff
    570 	filter->elem_buf.end = filter->elem_buf.p;
    571 
    572 	// move the data buffer to the end of the element buffer
    573 	memmove(filter->elem_buf.p, filter->data_buf.start, data_len);
    574 
    575 	// realloc the whole thing
    576 	filter->elem_buf.start = realloc(filter->elem_buf.start, elem_len + data_len);
    577 	filter->elem_buf.end = filter->elem_buf.start + elem_len;
    578 	filter->elem_buf.p = filter->elem_buf.end;
    579 
    580 	filter->data_buf.start = filter->elem_buf.end;
    581 	filter->data_buf.end = filter->data_buf.start + data_len;
    582 	filter->data_buf.p = filter->data_buf.end;
    583 
    584 	filter->finalized = 1;
    585 
    586 	ndb_debug("ndb_filter_end: %ld -> %ld\n", orig_size, elem_len + data_len);
    587 
    588 	return 1;
    589 }
    590 
    591 static inline struct ndb_filter_elements *
    592 ndb_filter_get_elements_by_offset(const struct ndb_filter *filter, int offset)
    593 {
    594 	struct ndb_filter_elements *els;
    595 
    596 	if (offset < 0)
    597 		return NULL;
    598 
    599 	els = (struct ndb_filter_elements *)(filter->elem_buf.start + offset);
    600 
    601 	if ((unsigned char *)els > filter->elem_buf.p)
    602 		return NULL;
    603 
    604 	return els;
    605 }
    606 
    607 struct ndb_filter_elements *
    608 ndb_filter_current_element(const struct ndb_filter *filter)
    609 {
    610 	return ndb_filter_get_elements_by_offset(filter, filter->current);
    611 }
    612 
    613 struct ndb_filter_elements *
    614 ndb_filter_get_elements(const struct ndb_filter *filter, int index)
    615 {
    616 	if (filter->num_elements <= 0)
    617 		return NULL;
    618 
    619 	if (index > filter->num_elements-1)
    620 		return NULL;
    621 
    622 	return ndb_filter_get_elements_by_offset(filter, filter->elements[index]);
    623 }
    624 
    625 static inline unsigned char *
    626 ndb_filter_elements_data(const struct ndb_filter *filter, int offset)
    627 {
    628 	unsigned char *data;
    629 
    630 	if (offset < 0)
    631 		return NULL;
    632 
    633 	data = filter->data_buf.start + offset;
    634 	if (data > filter->data_buf.p)
    635 		return NULL;
    636 
    637 	return data;
    638 }
    639 
    640 unsigned char *
    641 ndb_filter_get_id_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index)
    642 {
    643 	return ndb_filter_elements_data(filter, els->elements[index]);
    644 }
    645 
    646 const char *
    647 ndb_filter_get_string_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index)
    648 {
    649 	return (const char *)ndb_filter_elements_data(filter, els->elements[index]);
    650 }
    651 
    652 uint64_t *
    653 ndb_filter_get_int_element_ptr(struct ndb_filter_elements *els, int index)
    654 {
    655 	return &els->elements[index];
    656 }
    657 
    658 uint64_t
    659 ndb_filter_get_int_element(const struct ndb_filter_elements *els, int index)
    660 {
    661 	return els->elements[index];
    662 }
    663 
    664 int ndb_filter_init(struct ndb_filter *filter)
    665 {
    666 	struct cursor cur;
    667 	int page_size, elem_pages, data_pages, buf_size;
    668 
    669 	page_size = 4096; // assuming this, not a big deal if we're wrong
    670 	elem_pages = NDB_FILTER_PAGES / 4;
    671 	data_pages = NDB_FILTER_PAGES - elem_pages;
    672 	buf_size = page_size * NDB_FILTER_PAGES;
    673 
    674 	unsigned char *buf = malloc(buf_size);
    675 	if (!buf)
    676 		return 0;
    677 
    678 	// init memory arena for the cursor
    679 	make_cursor(buf, buf + buf_size, &cur);
    680 
    681 	cursor_slice(&cur, &filter->elem_buf, page_size * elem_pages);
    682 	cursor_slice(&cur, &filter->data_buf, page_size * data_pages);
    683 
    684 	// make sure we are fully allocated
    685 	assert(cur.p == cur.end);
    686 
    687 	// make sure elem_buf is the start of the buffer
    688 	assert(filter->elem_buf.start == cur.start);
    689 
    690 	filter->num_elements = 0;
    691 	filter->elements[0] = 0;
    692 	filter->current = -1;
    693 	filter->finalized = 0;
    694 
    695 	return 1;
    696 }
    697 
    698 void ndb_filter_destroy(struct ndb_filter *filter)
    699 {
    700 	if (filter->elem_buf.start)
    701 		free(filter->elem_buf.start);
    702 
    703 	memset(filter, 0, sizeof(*filter));
    704 }
    705 
    706 static const char *ndb_filter_field_name(enum ndb_filter_fieldtype field)
    707 {
    708 	switch (field) {
    709 	case NDB_FILTER_IDS: return "ids";
    710 	case NDB_FILTER_AUTHORS: return "authors";
    711 	case NDB_FILTER_KINDS: return "kinds";
    712 	case NDB_FILTER_TAGS: return "tags";
    713 	case NDB_FILTER_SINCE: return "since";
    714 	case NDB_FILTER_UNTIL: return "until";
    715 	case NDB_FILTER_LIMIT: return "limit";
    716 	}
    717 
    718 	return "unknown";
    719 }
    720 
    721 static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filter_fieldtype field, char tag)
    722 {
    723 	int i;
    724 	struct ndb_filter_elements *els, *el;
    725 
    726 	if (ndb_filter_current_element(filter)) {
    727 		fprintf(stderr, "ndb_filter_start_field: filter field already in progress, did you forget to call ndb_filter_end_field?\n");
    728 		return 0;
    729 	}
    730 
    731 	// you can only start and end fields once
    732 	for (i = 0; i < filter->num_elements; i++) {
    733 		el = ndb_filter_get_elements(filter, i);
    734 		assert(el);
    735 		// TODO: fix this tags check to try to find matching tags
    736 		if (el->field.type == field && field != NDB_FILTER_TAGS) {
    737 			fprintf(stderr, "ndb_filter_start_field: field '%s' already exists\n",
    738 					ndb_filter_field_name(field));
    739 			return 0;
    740 		}
    741 	}
    742 
    743 	filter->current = filter->elem_buf.p - filter->elem_buf.start;
    744 	els = ndb_filter_current_element(filter);
    745 	assert(els);
    746 
    747 	// advance elem buffer to the variable data section
    748 	if (!cursor_skip(&filter->elem_buf, sizeof(struct ndb_filter_elements))) {
    749 		fprintf(stderr, "ndb_filter_start_field: '%s' oom (todo: realloc?)\n",
    750 				ndb_filter_field_name(field));
    751 		return 0;
    752 	}
    753 
    754 	els->field.type = field;
    755 	els->field.tag = tag;
    756 	els->field.elem_type = 0;
    757 	els->count = 0;
    758 
    759 	return 1;
    760 }
    761 
    762 int ndb_filter_start_field(struct ndb_filter *filter, enum ndb_filter_fieldtype field)
    763 {
    764 	return ndb_filter_start_field_impl(filter, field, 0);
    765 }
    766 
    767 int ndb_filter_start_tag_field(struct ndb_filter *filter, char tag)
    768 {
    769 	return ndb_filter_start_field_impl(filter, NDB_FILTER_TAGS, tag);
    770 }
    771 
    772 static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_element el)
    773 {
    774 	struct ndb_filter_elements *current;
    775 	uint64_t offset;
    776 
    777 	if (!(current = ndb_filter_current_element(filter)))
    778 		return 0;
    779 
    780 	offset = filter->data_buf.p - filter->data_buf.start;
    781 
    782 	switch (current->field.type) {
    783 	case NDB_FILTER_IDS:
    784 	case NDB_FILTER_AUTHORS:
    785 		if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32))
    786 			return 0;
    787 		break;
    788 	case NDB_FILTER_KINDS:
    789 		offset = el.integer;
    790 		break;
    791 	case NDB_FILTER_SINCE:
    792 	case NDB_FILTER_UNTIL:
    793 	case NDB_FILTER_LIMIT:
    794 		// only one allowed for since/until
    795 		if (current->count != 0)
    796 			return 0;
    797 		offset = el.integer;
    798 		break;
    799 	case NDB_FILTER_TAGS:
    800 		switch (current->field.elem_type) {
    801 		case NDB_ELEMENT_ID:
    802 			if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32))
    803 				return 0;
    804 			break;
    805 		case NDB_ELEMENT_STRING:
    806 			if (!cursor_push(&filter->data_buf, (unsigned char *)el.string.string, el.string.len))
    807 				return 0;
    808 			if (!cursor_push_byte(&filter->data_buf, 0))
    809 				return 0;
    810 			break;
    811 		case NDB_ELEMENT_INT:
    812 			// ints are not allowed in tag filters
    813 		case NDB_ELEMENT_UNKNOWN:
    814 			return 0;
    815 		}
    816 		// push a pointer of the string in the databuf as an element
    817 		break;
    818 	}
    819 
    820 	if (!cursor_push(&filter->elem_buf, (unsigned char *)&offset,
    821 			 sizeof(offset))) {
    822 		return 0;
    823 	}
    824 
    825 	current->count++;
    826 
    827 	return 1;
    828 }
    829 
    830 static int ndb_filter_set_elem_type(struct ndb_filter *filter,
    831 				    enum ndb_generic_element_type elem_type)
    832 {
    833 	enum ndb_generic_element_type current_elem_type;
    834 	struct ndb_filter_elements *current;
    835 
    836 	if (!(current = ndb_filter_current_element(filter)))
    837 		return 0;
    838 
    839 	current_elem_type = current->field.elem_type;
    840 
    841 	// element types must be uniform
    842 	if (current_elem_type != elem_type && current_elem_type != NDB_ELEMENT_UNKNOWN) {
    843 		fprintf(stderr, "ndb_filter_set_elem_type: element types must be uniform\n");
    844 		return 0;
    845 	}
    846 
    847 	current->field.elem_type = elem_type;
    848 
    849 	return 1;
    850 }
    851 
    852 
    853 int ndb_filter_add_str_element_len(struct ndb_filter *filter, const char *str, int len)
    854 {
    855 	union ndb_filter_element el;
    856 	struct ndb_filter_elements *current;
    857 
    858 	if (!(current = ndb_filter_current_element(filter)))
    859 		return 0;
    860 
    861 	// only generic queries are allowed to have strings
    862 	switch (current->field.type) {
    863 	case NDB_FILTER_SINCE:
    864 	case NDB_FILTER_UNTIL:
    865 	case NDB_FILTER_LIMIT:
    866 	case NDB_FILTER_IDS:
    867 	case NDB_FILTER_AUTHORS:
    868 	case NDB_FILTER_KINDS:
    869 		return 0;
    870 	case NDB_FILTER_TAGS:
    871 		break;
    872 	}
    873 
    874 	if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_STRING))
    875 		return 0;
    876 
    877 	el.string.string = str;
    878 	el.string.len = len;
    879 
    880 	return ndb_filter_add_element(filter, el);
    881 }
    882 
    883 int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str)
    884 {
    885 	return ndb_filter_add_str_element_len(filter, str, strlen(str));
    886 }
    887 
    888 int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer)
    889 {
    890 	union ndb_filter_element el;
    891 	struct ndb_filter_elements *current;
    892 	if (!(current = ndb_filter_current_element(filter)))
    893 		return 0;
    894 
    895 	switch (current->field.type) {
    896 	case NDB_FILTER_IDS:
    897 	case NDB_FILTER_AUTHORS:
    898 	case NDB_FILTER_TAGS:
    899 		return 0;
    900 	case NDB_FILTER_KINDS:
    901 	case NDB_FILTER_SINCE:
    902 	case NDB_FILTER_UNTIL:
    903 	case NDB_FILTER_LIMIT:
    904 		break;
    905 	}
    906 
    907 	if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_INT))
    908 		return 0;
    909 
    910 	el.integer = integer;
    911 
    912 	return ndb_filter_add_element(filter, el);
    913 }
    914 
    915 int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id)
    916 {
    917 	union ndb_filter_element el;
    918 	struct ndb_filter_elements *current;
    919 
    920 	if (!(current = ndb_filter_current_element(filter)))
    921 		return 0;
    922 
    923 	// only certain filter types allow pushing id elements
    924 	switch (current->field.type) {
    925 	case NDB_FILTER_SINCE:
    926 	case NDB_FILTER_UNTIL:
    927 	case NDB_FILTER_LIMIT:
    928 	case NDB_FILTER_KINDS:
    929 		return 0;
    930 	case NDB_FILTER_IDS:
    931 	case NDB_FILTER_AUTHORS:
    932 	case NDB_FILTER_TAGS:
    933 		break;
    934 	}
    935 
    936 	if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_ID))
    937 		return 0;
    938 
    939 	// this is needed so that generic filters know its an id
    940 	el.id = id;
    941 
    942 	return ndb_filter_add_element(filter, el);
    943 }
    944 
    945 static int ndb_tag_filter_matches(struct ndb_filter *filter,
    946 				  struct ndb_filter_elements *els,
    947 				  struct ndb_note *note)
    948 {
    949 	int i;
    950 	const unsigned char *id;
    951 	const char *el_str;
    952 	struct ndb_iterator iter, *it = &iter;
    953 	struct ndb_str str;
    954 
    955 	ndb_tags_iterate_start(note, it);
    956 
    957 	while (ndb_tags_iterate_next(it)) {
    958 		// we're looking for tags with 2 or more entries: ["p", id], etc
    959 		if (it->tag->count < 2)
    960 			continue;
    961 
    962 		str = ndb_tag_str(note, it->tag, 0);
    963 
    964 		// we only care about packed strings (single char, etc)
    965 		if (str.flag != NDB_PACKED_STR)
    966 			continue;
    967 
    968 		// do we have #e matching e (or p, etc)
    969 		if (str.str[0] != els->field.tag || str.str[1] != 0)
    970 			continue;
    971 
    972 		str = ndb_tag_str(note, it->tag, 1);
    973 
    974 		switch (els->field.elem_type) {
    975 		case NDB_ELEMENT_ID:
    976 			// if our filter element type is an id, then we
    977 			// expect a packed id in the tag, otherwise skip
    978 			if (str.flag != NDB_PACKED_ID)
    979 				continue;
    980 			break;
    981 		case NDB_ELEMENT_STRING:
    982 			// if our filter element type is a string, then
    983 			// we should not expect an id
    984 			if (str.flag == NDB_PACKED_ID)
    985 				continue;
    986 
    987 			break;
    988 		case NDB_ELEMENT_UNKNOWN:
    989 		default:
    990 			// For some reason the element type is not set. It's
    991 			// possible nothing was added to the generic filter?
    992 			// Let's just fail here and log a note for debugging
    993 			fprintf(stderr, "UNUSUAL ndb_tag_filter_matches: have unknown element type %d\n", els->field.elem_type);
    994 			return 0;
    995 		}
    996 
    997 		for (i = 0; i < els->count; i++) {
    998 			switch (els->field.elem_type) {
    999 			case NDB_ELEMENT_ID:
   1000 				id = ndb_filter_get_id_element(filter, els, i);
   1001 				if (!memcmp(id, str.id, 32))
   1002 					return 1;
   1003 				break;
   1004 			case NDB_ELEMENT_STRING:
   1005 				el_str = ndb_filter_get_string_element(filter, els, i);
   1006 				if (!strcmp(el_str, str.str))
   1007 					return 1;
   1008 				break;
   1009 			case NDB_ELEMENT_INT:
   1010 				// int elements int tag queries are not supported
   1011 			case NDB_ELEMENT_UNKNOWN:
   1012 				return 0;
   1013 			}
   1014 		}
   1015 	}
   1016 
   1017 	return 0;
   1018 }
   1019 
   1020 struct search_id_state {
   1021 	struct ndb_filter *filter;
   1022 	struct ndb_filter_elements *els;
   1023 	unsigned char *key;
   1024 };
   1025 
   1026 static int compare_ids(const void *pa, const void *pb)
   1027 {
   1028         unsigned char *a = *(unsigned char **)pa;
   1029         unsigned char *b = *(unsigned char **)pb;
   1030 
   1031         return memcmp(a, b, 32);
   1032 }
   1033 
   1034 static int search_ids(const void *ctx, const void *mid_ptr)
   1035 {
   1036 	struct search_id_state *state;
   1037 	unsigned char *mid_id;
   1038 	uint32_t mid;
   1039 
   1040 	state = (struct search_id_state *)ctx;
   1041 	mid = *(uint32_t *)mid_ptr;
   1042 
   1043 	mid_id = ndb_filter_elements_data(state->filter, mid);
   1044 	assert(mid_id);
   1045 
   1046 	return memcmp(state->key, mid_id, 32);
   1047 }
   1048 
   1049 static int compare_kinds(const void *pa, const void *pb)
   1050 {
   1051 
   1052 	// NOTE: this should match type in `union ndb_filter_element`
   1053 	uint64_t a = *(uint64_t *)pa;
   1054 	uint64_t b = *(uint64_t *)pb;
   1055 
   1056 	if (a < b) {
   1057 		return -1;
   1058 	} else if (a > b) {
   1059 		return 1;
   1060 	} else {
   1061 		return 0;
   1062 	}
   1063 }
   1064 
   1065 //
   1066 // returns 1 if a filter matches a note
   1067 static int ndb_filter_matches_with(struct ndb_filter *filter,
   1068 				   struct ndb_note *note, int already_matched)
   1069 {
   1070 	int i, j;
   1071 	struct ndb_filter_elements *els;
   1072 	struct search_id_state state;
   1073 
   1074 	state.filter = filter;
   1075 
   1076 	for (i = 0; i < filter->num_elements; i++) {
   1077 		els = ndb_filter_get_elements(filter, i);
   1078 		state.els = els;
   1079 		assert(els);
   1080 
   1081 		// if we know we already match from a query scan result,
   1082 		// we can skip this check
   1083 		if ((1 << els->field.type) & already_matched)
   1084 			continue;
   1085 
   1086 		switch (els->field.type) {
   1087 		case NDB_FILTER_KINDS:
   1088 			for (j = 0; j < els->count; j++) {
   1089 				if ((unsigned int)els->elements[j] == note->kind)
   1090 					goto cont;
   1091 			}
   1092 			break;
   1093 		case NDB_FILTER_IDS:
   1094 			state.key = ndb_note_id(note);
   1095 			if (bsearch(&state, &els->elements[0], els->count,
   1096 				    sizeof(els->elements[0]), search_ids)) {
   1097 				continue;
   1098 			}
   1099 			break;
   1100 		case NDB_FILTER_AUTHORS:
   1101 			state.key = ndb_note_pubkey(note);
   1102 			if (bsearch(&state, &els->elements[0], els->count,
   1103 				    sizeof(els->elements[0]), search_ids)) {
   1104 				continue;
   1105 			}
   1106 			break;
   1107 		case NDB_FILTER_TAGS:
   1108 			if (ndb_tag_filter_matches(filter, els, note))
   1109 				continue;
   1110 			break;
   1111 		case NDB_FILTER_SINCE:
   1112 			assert(els->count == 1);
   1113 			if (note->created_at >= els->elements[0])
   1114 				continue;
   1115 			break;
   1116 		case NDB_FILTER_UNTIL:
   1117 			assert(els->count == 1);
   1118 			if (note->created_at < els->elements[0])
   1119 				continue;
   1120 		case NDB_FILTER_LIMIT:
   1121 cont:
   1122 			continue;
   1123 		}
   1124 
   1125 		// all need to match
   1126 		return 0;
   1127 	}
   1128 
   1129 	return 1;
   1130 }
   1131 
   1132 int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note)
   1133 {
   1134 	return ndb_filter_matches_with(filter, note, 0);
   1135 }
   1136 
   1137 // because elements are stored as offsets and qsort doesn't support context,
   1138 // we do a dumb thing where we convert elements to pointers and back if we
   1139 // are doing an id or string sort
   1140 static void sort_filter_elements(struct ndb_filter *filter,
   1141 				 struct ndb_filter_elements *els,
   1142 				 int (*cmp)(const void *, const void *))
   1143 {
   1144 	int i;
   1145 
   1146 	assert(ndb_filter_elem_is_ptr(&els->field));
   1147 
   1148 	for (i = 0; i < els->count; i++)
   1149 		els->elements[i] += (uint64_t)filter->data_buf.start;
   1150 
   1151 	qsort(&els->elements[0], els->count, sizeof(els->elements[0]), cmp);
   1152 
   1153 	for (i = 0; i < els->count; i++)
   1154 		els->elements[i] -= (uint64_t)filter->data_buf.start;
   1155 }
   1156 
   1157 void ndb_filter_end_field(struct ndb_filter *filter)
   1158 {
   1159 	int cur_offset;
   1160 	struct ndb_filter_elements *cur;
   1161 
   1162 	cur_offset = filter->current;
   1163 
   1164 	if (!(cur = ndb_filter_current_element(filter)))
   1165 		return;
   1166 
   1167 	filter->elements[filter->num_elements++] = cur_offset;
   1168 
   1169 	// sort elements for binary search
   1170 	switch (cur->field.type) {
   1171 	case NDB_FILTER_IDS:
   1172 	case NDB_FILTER_AUTHORS:
   1173 		sort_filter_elements(filter, cur, compare_ids);
   1174 		break;
   1175 	case NDB_FILTER_KINDS:
   1176 		qsort(&cur->elements[0], cur->count,
   1177 		      sizeof(cur->elements[0]), compare_kinds);
   1178 		break;
   1179 	case NDB_FILTER_TAGS:
   1180 		// TODO: generic tag search sorting
   1181 		break;
   1182 	case NDB_FILTER_SINCE:
   1183 	case NDB_FILTER_UNTIL:
   1184 	case NDB_FILTER_LIMIT:
   1185 		// don't need to sort these
   1186 		break;
   1187 	}
   1188 
   1189 	filter->current = -1;
   1190 
   1191 }
   1192 
   1193 static void ndb_filter_group_init(struct ndb_filter_group *group)
   1194 {
   1195 	group->num_filters = 0;
   1196 }
   1197 
   1198 static int ndb_filter_group_add(struct ndb_filter_group *group,
   1199 				struct ndb_filter *filter)
   1200 {
   1201 	if (group->num_filters + 1 > MAX_FILTERS)
   1202 		return 0;
   1203 
   1204 	return ndb_filter_clone(&group->filters[group->num_filters++], filter);
   1205 }
   1206 
   1207 static int ndb_filter_group_matches(struct ndb_filter_group *group,
   1208 				    struct ndb_note *note)
   1209 {
   1210 	int i;
   1211 	struct ndb_filter *filter;
   1212 
   1213 	if (group->num_filters == 0)
   1214 		return 1;
   1215 
   1216 	for (i = 0; i < group->num_filters; i++) {
   1217 		filter = &group->filters[i];
   1218 
   1219 		if (ndb_filter_matches(filter, note))
   1220 			return 1;
   1221 	}
   1222 
   1223 	return 0;
   1224 }
   1225 
   1226 static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id,
   1227 			        uint64_t timestamp, const char *search)
   1228 {
   1229 	memcpy(key->id, id, 32);
   1230 	key->timestamp = timestamp;
   1231 	lowercase_strncpy(key->search, search, sizeof(key->search) - 1);
   1232 	key->search[sizeof(key->search) - 1] = '\0';
   1233 }
   1234 
   1235 static int ndb_write_profile_search_index(struct ndb_txn *txn,
   1236 					  struct ndb_search_key *index_key,
   1237 					  uint64_t profile_key)
   1238 {
   1239 	int rc;
   1240 	MDB_val key, val;
   1241 
   1242 	key.mv_data = index_key;
   1243 	key.mv_size = sizeof(*index_key);
   1244 	val.mv_data = &profile_key;
   1245 	val.mv_size = sizeof(profile_key);
   1246 
   1247 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH],
   1248 			  &key, &val, 0)))
   1249 	{
   1250 		ndb_debug("ndb_write_profile_search_index failed: %s\n",
   1251 			  mdb_strerror(rc));
   1252 		return 0;
   1253 	}
   1254 
   1255 	return 1;
   1256 }
   1257 
   1258 
   1259 // map usernames and display names to profile keys for user searching
   1260 static int ndb_write_profile_search_indices(struct ndb_txn *txn,
   1261 					    struct ndb_note *note,
   1262 					    uint64_t profile_key,
   1263 					    void *profile_root)
   1264 {
   1265 	struct ndb_search_key index;
   1266 	NdbProfileRecord_table_t profile_record;
   1267 	NdbProfile_table_t profile;
   1268 
   1269 	profile_record = NdbProfileRecord_as_root(profile_root);
   1270 	profile = NdbProfileRecord_profile_get(profile_record);
   1271 
   1272 	const char *name = NdbProfile_name_get(profile);
   1273 	const char *display_name = NdbProfile_display_name_get(profile);
   1274 
   1275 	// words + pubkey + created
   1276 	if (name) {
   1277 		ndb_make_search_key(&index, note->pubkey, note->created_at,
   1278 				    name);
   1279 		if (!ndb_write_profile_search_index(txn, &index, profile_key))
   1280 			return 0;
   1281 	}
   1282 
   1283 	if (display_name) {
   1284 		// don't write the same name/display_name twice
   1285 		if (name && !strcmp(display_name, name)) {
   1286 			return 1;
   1287 		}
   1288 		ndb_make_search_key(&index, note->pubkey, note->created_at,
   1289 				    display_name);
   1290 		if (!ndb_write_profile_search_index(txn, &index, profile_key))
   1291 			return 0;
   1292 	}
   1293 
   1294 	return 1;
   1295 }
   1296 
   1297 
   1298 static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags)
   1299 {
   1300 	txn->lmdb = &ndb->lmdb;
   1301 	MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn;
   1302 	if (!txn->lmdb->env)
   1303 		return 0;
   1304 	return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0;
   1305 }
   1306 
   1307 int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn)
   1308 {
   1309 	return _ndb_begin_query(ndb, txn, MDB_RDONLY);
   1310 }
   1311 
   1312 // this should only be used in migrations, etc
   1313 static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn)
   1314 {
   1315 	return _ndb_begin_query(ndb, txn, 0);
   1316 }
   1317 
   1318 
   1319 // Migrations
   1320 //
   1321 
   1322 static int ndb_migrate_user_search_indices(struct ndb *ndb)
   1323 {
   1324 	int rc;
   1325 	MDB_cursor *cur;
   1326 	MDB_val k, v;
   1327 	void *profile_root;
   1328 	NdbProfileRecord_table_t record;
   1329 	struct ndb_txn txn;
   1330 	struct ndb_note *note;
   1331 	uint64_t note_key, profile_key;
   1332 	size_t len;
   1333 	int count;
   1334 
   1335 	if (!ndb_begin_rw_query(ndb, &txn)) {
   1336 		fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_rw_query failed\n");
   1337 		return 0;
   1338 	}
   1339 
   1340 	if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) {
   1341 		fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc);
   1342 		return 0;
   1343 	}
   1344 
   1345 	count = 0;
   1346 
   1347 	// loop through all profiles and write search indices
   1348 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   1349 		profile_root = v.mv_data;
   1350 		profile_key = *((uint64_t*)k.mv_data);
   1351 		record = NdbProfileRecord_as_root(profile_root);
   1352 		note_key = NdbProfileRecord_note_key(record);
   1353 		note = ndb_get_note_by_key(&txn, note_key, &len);
   1354 
   1355 		if (note == NULL) {
   1356 			fprintf(stderr, "ndb_migrate_user_search_indices: note lookup failed\n");
   1357 			return 0;
   1358 		}
   1359 
   1360 		if (!ndb_write_profile_search_indices(&txn, note, profile_key,
   1361 						      profile_root)) {
   1362 
   1363 			fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n");
   1364 			return 0;
   1365 		}
   1366 
   1367 		count++;
   1368 	}
   1369 
   1370 	fprintf(stderr, "migrated %d profiles to include search indices\n", count);
   1371 
   1372 	mdb_cursor_close(cur);
   1373 
   1374 	ndb_end_query(&txn);
   1375 
   1376 	return 1;
   1377 }
   1378 
   1379 static int ndb_migrate_lower_user_search_indices(struct ndb *ndb)
   1380 {
   1381 	MDB_txn *txn;
   1382 
   1383 	if (mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn)) {
   1384 		fprintf(stderr, "ndb_migrate_lower_user_search_indices: ndb_txn_begin failed\n");
   1385 		return 0;
   1386 	}
   1387 
   1388 	// just drop the search db so we can rebuild it
   1389 	if (mdb_drop(txn, ndb->lmdb.dbs[NDB_DB_PROFILE_SEARCH], 0)) {
   1390 		fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n");
   1391 		return 0;
   1392 	}
   1393 
   1394 	mdb_txn_commit(txn);
   1395 
   1396 	return ndb_migrate_user_search_indices(ndb);
   1397 }
   1398 
   1399 int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile);
   1400 
   1401 
   1402 int ndb_db_version(struct ndb *ndb)
   1403 {
   1404 	int rc;
   1405 	uint64_t version, version_key;
   1406 	MDB_val k, v;
   1407 	MDB_txn *txn;
   1408 
   1409 	version_key = NDB_META_KEY_VERSION;
   1410 	k.mv_data = &version_key;
   1411 	k.mv_size = sizeof(version_key);
   1412 
   1413 	if ((rc = mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn))) {
   1414 		fprintf(stderr, "ndb_db_version: mdb_txn_begin failed, error %d\n", rc);
   1415 		return -1;
   1416 	}
   1417 
   1418 	if (mdb_get(txn, ndb->lmdb.dbs[NDB_DB_NDB_META], &k, &v)) {
   1419 		version = -1;
   1420 	} else {
   1421 		if (v.mv_size != 8) {
   1422 			fprintf(stderr, "run_migrations: invalid version size?");
   1423 			return 0;
   1424 		}
   1425 		version = *((uint64_t*)v.mv_data);
   1426 	}
   1427 
   1428 	mdb_txn_abort(txn);
   1429 	return version;
   1430 }
   1431 
   1432 // custom kind+timestamp comparison function. This is used by lmdb to perform
   1433 // b+ tree searches over the kind+timestamp index
   1434 static int ndb_u64_tsid_compare(const MDB_val *a, const MDB_val *b)
   1435 {
   1436 	struct ndb_u64_tsid *tsa, *tsb;
   1437 	tsa = a->mv_data;
   1438 	tsb = b->mv_data;
   1439 
   1440 	if (tsa->u64 < tsb->u64)
   1441 		return -1;
   1442 	else if (tsa->u64 > tsb->u64)
   1443 		return 1;
   1444 
   1445 	if (tsa->timestamp < tsb->timestamp)
   1446 		return -1;
   1447 	else if (tsa->timestamp > tsb->timestamp)
   1448 		return 1;
   1449 
   1450 	return 0;
   1451 }
   1452 
   1453 static int ndb_tsid_compare(const MDB_val *a, const MDB_val *b)
   1454 {
   1455 	struct ndb_tsid *tsa, *tsb;
   1456 	MDB_val a2 = *a, b2 = *b;
   1457 
   1458 	a2.mv_size = sizeof(tsa->id);
   1459 	b2.mv_size = sizeof(tsb->id);
   1460 
   1461 	int cmp = mdb_cmp_memn(&a2, &b2);
   1462 	if (cmp) return cmp;
   1463 
   1464 	tsa = a->mv_data;
   1465 	tsb = b->mv_data;
   1466 
   1467 	if (tsa->timestamp < tsb->timestamp)
   1468 		return -1;
   1469 	else if (tsa->timestamp > tsb->timestamp)
   1470 		return 1;
   1471 	return 0;
   1472 }
   1473 
   1474 static inline void ndb_tsid_low(struct ndb_tsid *key, unsigned char *id)
   1475 {
   1476 	memcpy(key->id, id, 32);
   1477 	key->timestamp = 0;
   1478 }
   1479 
   1480 static inline void ndb_tsid_init(struct ndb_tsid *key, unsigned char *id,
   1481 				 uint64_t timestamp)
   1482 {
   1483 	memcpy(key->id, id, 32);
   1484 	key->timestamp = timestamp;
   1485 }
   1486 
   1487 static inline void ndb_u64_tsid_init(struct ndb_u64_tsid *key, uint64_t integer,
   1488 				     uint64_t timestamp)
   1489 {
   1490 	key->u64 = integer;
   1491 	key->timestamp = timestamp;
   1492 }
   1493 
   1494 // useful for range-searching for the latest key with a clustered created_at timen
   1495 static inline void ndb_tsid_high(struct ndb_tsid *key, const unsigned char *id)
   1496 {
   1497 	memcpy(key->id, id, 32);
   1498 	key->timestamp = UINT64_MAX;
   1499 }
   1500 
   1501 enum ndb_ingester_msgtype {
   1502 	NDB_INGEST_EVENT, // write json to the ingester queue for processing
   1503 	NDB_INGEST_QUIT,  // kill ingester thread immediately
   1504 };
   1505 
   1506 struct ndb_ingester_event {
   1507 	char *json;
   1508 	unsigned client : 1; // ["EVENT", {...}] messages
   1509 	unsigned len : 31;
   1510 };
   1511 
   1512 struct ndb_writer_note {
   1513 	struct ndb_note *note;
   1514 	size_t note_len;
   1515 };
   1516 
   1517 struct ndb_writer_profile {
   1518 	struct ndb_writer_note note;
   1519 	struct ndb_profile_record_builder record;
   1520 };
   1521 
   1522 struct ndb_ingester_msg {
   1523 	enum ndb_ingester_msgtype type;
   1524 	union {
   1525 		struct ndb_ingester_event event;
   1526 	};
   1527 };
   1528 
   1529 struct ndb_writer_ndb_meta {
   1530 	// these are 64 bit because I'm paranoid of db-wide alignment issues
   1531 	uint64_t version;
   1532 };
   1533 
   1534 // Used in the writer thread when writing ndb_profile_fetch_record's
   1535 //   kv = pubkey: recor
   1536 struct ndb_writer_last_fetch {
   1537 	unsigned char pubkey[32];
   1538 	uint64_t fetched_at;
   1539 };
   1540 
   1541 // write note blocks
   1542 struct ndb_writer_blocks {
   1543 	struct ndb_blocks *blocks;
   1544 	uint64_t note_key;
   1545 };
   1546 
   1547 // The different types of messages that the writer thread can write to the
   1548 // database
   1549 struct ndb_writer_msg {
   1550 	enum ndb_writer_msgtype type;
   1551 	union {
   1552 		struct ndb_writer_note note;
   1553 		struct ndb_writer_profile profile;
   1554 		struct ndb_writer_ndb_meta ndb_meta;
   1555 		struct ndb_writer_last_fetch last_fetch;
   1556 		struct ndb_writer_blocks blocks;
   1557 	};
   1558 };
   1559 
   1560 static inline int ndb_writer_queue_msg(struct ndb_writer *writer,
   1561 				       struct ndb_writer_msg *msg)
   1562 {
   1563 	return prot_queue_push(&writer->inbox, msg);
   1564 }
   1565 
   1566 static int ndb_migrate_utf8_profile_names(struct ndb *ndb)
   1567 {
   1568 	int rc;
   1569 	MDB_cursor *cur;
   1570 	MDB_val k, v;
   1571 	void *profile_root;
   1572 	NdbProfileRecord_table_t record;
   1573 	struct ndb_txn txn;
   1574 	struct ndb_note *note, *copied_note;
   1575 	uint64_t note_key;
   1576 	size_t len;
   1577 	int count, failed;
   1578 	struct ndb_writer_msg out;
   1579 
   1580 	if (!ndb_begin_rw_query(ndb, &txn)) {
   1581 		fprintf(stderr, "ndb_migrate_utf8_profile_names: ndb_begin_rw_query failed\n");
   1582 		return 0;
   1583 	}
   1584 
   1585 	if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) {
   1586 		fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc);
   1587 		return 0;
   1588 	}
   1589 
   1590 	count = 0;
   1591 	failed = 0;
   1592 
   1593 	// loop through all profiles and write search indices
   1594 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   1595 		profile_root = v.mv_data;
   1596 		record = NdbProfileRecord_as_root(profile_root);
   1597 		note_key = NdbProfileRecord_note_key(record);
   1598 		note = ndb_get_note_by_key(&txn, note_key, &len);
   1599 
   1600 		if (note == NULL) {
   1601 			fprintf(stderr, "ndb_migrate_utf8_profile_names: note lookup failed\n");
   1602 			return 0;
   1603 		}
   1604 
   1605 		struct ndb_profile_record_builder *b = &out.profile.record;
   1606 
   1607 		// reprocess profile
   1608 		if (!ndb_process_profile_note(note, b)) {
   1609 			failed++;
   1610 			continue;
   1611 		}
   1612 
   1613 		// the writer needs to own this note, and its expected to free it
   1614 		copied_note = malloc(len);
   1615 		memcpy(copied_note, note, len);
   1616 
   1617 		out.type = NDB_WRITER_PROFILE;
   1618 		out.profile.note.note = copied_note;
   1619 		out.profile.note.note_len = len;
   1620 
   1621 		ndb_writer_queue_msg(&ndb->writer, &out);
   1622 
   1623 		count++;
   1624 	}
   1625 
   1626 	fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count);
   1627 
   1628 	if (failed != 0) {
   1629 		fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed);
   1630 	}
   1631 
   1632 	mdb_cursor_close(cur);
   1633 
   1634 	ndb_end_query(&txn);
   1635 
   1636 	return 1;
   1637 }
   1638 
   1639 static struct ndb_migration MIGRATIONS[] = {
   1640 	{ .fn = ndb_migrate_user_search_indices },
   1641 	{ .fn = ndb_migrate_lower_user_search_indices },
   1642 	{ .fn = ndb_migrate_utf8_profile_names }
   1643 };
   1644 
   1645 
   1646 int ndb_end_query(struct ndb_txn *txn)
   1647 {
   1648 	// this works on read or write queries.
   1649 	return mdb_txn_commit(txn->mdb_txn) == 0;
   1650 }
   1651 
   1652 int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32],
   1653 		    unsigned char sig[64])
   1654 {
   1655 	secp256k1_xonly_pubkey xonly_pubkey;
   1656 	int ok;
   1657 
   1658 	ok = secp256k1_xonly_pubkey_parse((secp256k1_context*)ctx, &xonly_pubkey,
   1659 					  pubkey) != 0;
   1660 	if (!ok) return 0;
   1661 
   1662 	ok = secp256k1_schnorrsig_verify((secp256k1_context*)ctx, sig, id, 32,
   1663 					 &xonly_pubkey) > 0;
   1664 	if (!ok) return 0;
   1665 
   1666 	return 1;
   1667 }
   1668 
   1669 static inline int ndb_writer_queue_msgs(struct ndb_writer *writer,
   1670 					struct ndb_writer_msg *msgs,
   1671 					int num_msgs)
   1672 {
   1673 	return prot_queue_push_all(&writer->inbox, msgs, num_msgs);
   1674 }
   1675 
   1676 static int ndb_writer_queue_note(struct ndb_writer *writer,
   1677 				 struct ndb_note *note, size_t note_len)
   1678 {
   1679 	struct ndb_writer_msg msg;
   1680 	msg.type = NDB_WRITER_NOTE;
   1681 
   1682 	msg.note.note = note;
   1683 	msg.note.note_len = note_len;
   1684 
   1685 	return prot_queue_push(&writer->inbox, &msg);
   1686 }
   1687 
   1688 static void ndb_writer_last_profile_fetch(struct ndb_txn *txn,
   1689 					  const unsigned char *pubkey,
   1690 					  uint64_t fetched_at)
   1691 {
   1692 	int rc;
   1693 	MDB_val key, val;
   1694 
   1695 	key.mv_data = (unsigned char*)pubkey;
   1696 	key.mv_size = 32;
   1697 	val.mv_data = &fetched_at;
   1698 	val.mv_size = sizeof(fetched_at);
   1699 
   1700 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH],
   1701 			  &key, &val, 0)))
   1702 	{
   1703 		ndb_debug("write version to ndb_meta failed: %s\n",
   1704 				mdb_strerror(rc));
   1705 		return;
   1706 	}
   1707 
   1708 	//fprintf(stderr, "writing version %" PRIu64 "\n", version);
   1709 }
   1710 
   1711 
   1712 // We just received a profile that we haven't processed yet, but it could
   1713 // be an older one! Make sure we only write last fetched profile if it's a new
   1714 // one
   1715 //
   1716 // To do this, we first check the latest profile in the database. If the
   1717 // created_date for this profile note is newer, then we write a
   1718 // last_profile_fetch record, otherwise we do not.
   1719 //
   1720 // WARNING: This function is only valid when called from the writer thread
   1721 static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn,
   1722 					       struct ndb_note *note)
   1723 {
   1724 	size_t len;
   1725 	uint64_t profile_key, note_key;
   1726 	void *root;
   1727 	struct ndb_note *last_profile;
   1728 	NdbProfileRecord_table_t record;
   1729 
   1730 	if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) {
   1731 		record = NdbProfileRecord_as_root(root);
   1732 		note_key = NdbProfileRecord_note_key(record);
   1733 		last_profile = ndb_get_note_by_key(txn, note_key, &len);
   1734 		if (last_profile == NULL) {
   1735 			return 0;
   1736 		}
   1737 
   1738 		// found profile, let's see if it's newer than ours
   1739 		if (note->created_at > last_profile->created_at) {
   1740 			// this is a new profile note, record last fetched time
   1741 			ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL));
   1742 		}
   1743 	} else {
   1744 		// couldn't fetch profile. record last fetched time
   1745 		ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL));
   1746 	}
   1747 
   1748 	return 1;
   1749 }
   1750 
   1751 int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey,
   1752 				 uint64_t fetched_at)
   1753 {
   1754 	struct ndb_writer_msg msg;
   1755 	msg.type = NDB_WRITER_PROFILE_LAST_FETCH;
   1756 	memcpy(&msg.last_fetch.pubkey[0], pubkey, 32);
   1757 	msg.last_fetch.fetched_at = fetched_at;
   1758 
   1759 	return ndb_writer_queue_msg(&ndb->writer, &msg);
   1760 }
   1761 
   1762 
   1763 // When doing cursor scans from greatest to lowest, this function positions the
   1764 // cursor at the first element before descending. MDB_SET_RANGE puts us right
   1765 // after the first element, so we have to go back one.
   1766 static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v)
   1767 {
   1768 	// Position cursor at the next key greater than or equal to the
   1769 	// specified key
   1770 	if (mdb_cursor_get(cur, k, v, MDB_SET_RANGE)) {
   1771 		// Failed :(. It could be the last element?
   1772 		if (mdb_cursor_get(cur, k, v, MDB_LAST))
   1773 			return 0;
   1774 	} else {
   1775 		// if set range worked and our key exists, it should be
   1776 		// the one right before this one
   1777 		if (mdb_cursor_get(cur, k, v, MDB_PREV))
   1778 			return 0;
   1779 	}
   1780 
   1781 	return 1;
   1782 }
   1783 
   1784 // get some value based on a clustered id key
   1785 int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id,
   1786 		 MDB_val *val)
   1787 {
   1788 	MDB_val k, v;
   1789 	MDB_cursor *cur;
   1790 	int success = 0, rc;
   1791 	struct ndb_tsid tsid;
   1792 
   1793 	// position at the most recent
   1794 	ndb_tsid_high(&tsid, id);
   1795 
   1796 	k.mv_data = &tsid;
   1797 	k.mv_size = sizeof(tsid);
   1798 
   1799 	if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur))) {
   1800 		ndb_debug("ndb_get_tsid: failed to open cursor: '%s'\n", mdb_strerror(rc));
   1801 		return 0;
   1802 	}
   1803 
   1804 	if (!ndb_cursor_start(cur, &k, &v))
   1805 		goto cleanup;
   1806 
   1807 	if (memcmp(k.mv_data, id, 32) == 0) {
   1808 		*val = v;
   1809 		success = 1;
   1810 	}
   1811 
   1812 cleanup:
   1813 	mdb_cursor_close(cur);
   1814 	return success;
   1815 }
   1816 
   1817 static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key,
   1818 			       enum ndb_dbs store, size_t *len)
   1819 {
   1820 	MDB_val k, v;
   1821 
   1822 	k.mv_data = &key;
   1823 	k.mv_size = sizeof(key);
   1824 
   1825 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) {
   1826 		ndb_debug("ndb_lookup_by_key: mdb_get note failed\n");
   1827 		return NULL;
   1828 	}
   1829 
   1830 	if (len)
   1831 		*len = v.mv_size;
   1832 
   1833 	return v.mv_data;
   1834 }
   1835 
   1836 static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind,
   1837 			     enum ndb_dbs store, const unsigned char *pk,
   1838 			     size_t *len, uint64_t *primkey)
   1839 {
   1840 	MDB_val k, v;
   1841 	void *res = NULL;
   1842 	if (len)
   1843 		*len = 0;
   1844 
   1845 	if (!ndb_get_tsid(txn, ind, pk, &k)) {
   1846 		//ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n");
   1847 		return 0;
   1848 	}
   1849 
   1850 	if (primkey)
   1851 		*primkey = *(uint64_t*)k.mv_data;
   1852 
   1853 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) {
   1854 		ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
   1855 		return 0;
   1856 	}
   1857 
   1858 	res = v.mv_data;
   1859 	assert(((uint64_t)res % 4) == 0);
   1860 	if (len)
   1861 		*len = v.mv_size;
   1862 	return res;
   1863 }
   1864 
   1865 void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pk, size_t *len, uint64_t *key)
   1866 {
   1867 	return ndb_lookup_tsid(txn, NDB_DB_PROFILE_PK, NDB_DB_PROFILE, pk, len, key);
   1868 }
   1869 
   1870 struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *key)
   1871 {
   1872 	return ndb_lookup_tsid(txn, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len, key);
   1873 }
   1874 
   1875 static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn,
   1876 					      enum ndb_dbs db,
   1877 					      const unsigned char *id)
   1878 {
   1879 	MDB_val k;
   1880 
   1881 	if (!ndb_get_tsid(txn, db, id, &k))
   1882 		return 0;
   1883 
   1884 	return *(uint32_t*)k.mv_data;
   1885 }
   1886 
   1887 uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id)
   1888 {
   1889 	return ndb_get_indexkey_by_id(txn, NDB_DB_NOTE_ID, id);
   1890 }
   1891 
   1892 uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id)
   1893 {
   1894 	return ndb_get_indexkey_by_id(txn, NDB_DB_PROFILE_PK, id);
   1895 }
   1896 
   1897 struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len)
   1898 {
   1899 	return ndb_lookup_by_key(txn, key, NDB_DB_NOTE, len);
   1900 }
   1901 
   1902 void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len)
   1903 {
   1904 	return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len);
   1905 }
   1906 
   1907 uint64_t
   1908 ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey)
   1909 {
   1910 	MDB_val k, v;
   1911 
   1912 	k.mv_data = (unsigned char*)pubkey;
   1913 	k.mv_size = 32;
   1914 
   1915 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) {
   1916 		//ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n");
   1917 		return 0;
   1918 	}
   1919 
   1920 	return *((uint64_t*)v.mv_data);
   1921 }
   1922 
   1923 
   1924 static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id)
   1925 {
   1926 	MDB_val val;
   1927 
   1928 	if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val))
   1929 		return 0;
   1930 
   1931 	return 1;
   1932 }
   1933 
   1934 static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb,
   1935 			     MDB_txn *mdb_txn)
   1936 {
   1937 	txn->lmdb = lmdb;
   1938 	txn->mdb_txn = mdb_txn;
   1939 }
   1940 
   1941 static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid)
   1942 {
   1943 	unsigned char id[32];
   1944 	struct ndb_ingest_controller *c = data;
   1945 	struct ndb_txn txn;
   1946 
   1947 	hex_decode(hexid, 64, id, sizeof(id));
   1948 
   1949 	// let's see if we already have it
   1950 
   1951 	ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn);
   1952 	if (!ndb_has_note(&txn, id))
   1953 		return NDB_IDRES_CONT;
   1954 
   1955 	return NDB_IDRES_STOP;
   1956 }
   1957 
   1958 static int ndbprofile_parse_json(flatcc_builder_t *B,
   1959         const char *buf, size_t bufsiz, int flags, NdbProfile_ref_t *profile)
   1960 {
   1961 	flatcc_json_parser_t parser, *ctx = &parser;
   1962 	flatcc_json_parser_init(ctx, B, buf, buf + bufsiz, flags);
   1963 
   1964 	if (flatcc_builder_start_buffer(B, 0, 0, 0))
   1965 		return 0;
   1966 
   1967 	NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile);
   1968 	if (ctx->error)
   1969 		return 0;
   1970 
   1971 	if (!flatcc_builder_end_buffer(B, *profile))
   1972 		return 0;
   1973 
   1974 	ctx->end_loc = buf;
   1975 
   1976 
   1977 	return 1;
   1978 }
   1979 
   1980 void ndb_profile_record_builder_init(struct ndb_profile_record_builder *b)
   1981 {
   1982 	b->builder = malloc(sizeof(*b->builder));
   1983 	b->flatbuf = NULL;
   1984 }
   1985 
   1986 void ndb_profile_record_builder_free(struct ndb_profile_record_builder *b)
   1987 {
   1988 	if (b->builder)
   1989 		free(b->builder);
   1990 	if (b->flatbuf)
   1991 		free(b->flatbuf);
   1992 
   1993 	b->builder = NULL;
   1994 	b->flatbuf = NULL;
   1995 }
   1996 
   1997 int ndb_process_profile_note(struct ndb_note *note,
   1998 			     struct ndb_profile_record_builder *profile)
   1999 {
   2000 	int res;
   2001 
   2002 	NdbProfile_ref_t profile_table;
   2003 	flatcc_builder_t *builder;
   2004 
   2005 	ndb_profile_record_builder_init(profile);
   2006 	builder = profile->builder;
   2007 	flatcc_builder_init(builder);
   2008 
   2009 	NdbProfileRecord_start_as_root(builder);
   2010 
   2011 	//printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note));
   2012 	if (!(res = ndbprofile_parse_json(builder, ndb_note_content(note),
   2013 					  note->content_length,
   2014 					  flatcc_json_parser_f_skip_unknown,
   2015 					  &profile_table)))
   2016 	{
   2017 		ndb_debug("profile_parse_json failed %d '%.*s'\n", res,
   2018 			  note->content_length, ndb_note_content(note));
   2019 		ndb_profile_record_builder_free(profile);
   2020 		return 0;
   2021 	}
   2022 
   2023 	uint64_t received_at = time(NULL);
   2024 	const char *lnurl = "fixme";
   2025 
   2026 	NdbProfileRecord_profile_add(builder, profile_table);
   2027 	NdbProfileRecord_received_at_add(builder, received_at);
   2028 
   2029 	flatcc_builder_ref_t lnurl_off;
   2030 	lnurl_off = flatcc_builder_create_string_str(builder, lnurl);
   2031 
   2032 	NdbProfileRecord_lnurl_add(builder, lnurl_off);
   2033 
   2034 	//*profile = flatcc_builder_finalize_aligned_buffer(builder, profile_len);
   2035 	return 1;
   2036 }
   2037 
   2038 static int ndb_ingester_process_note(secp256k1_context *ctx,
   2039 				     struct ndb_note *note,
   2040 				     size_t note_size,
   2041 				     struct ndb_writer_msg *out,
   2042 				     struct ndb_ingester *ingester)
   2043 {
   2044 	enum ndb_ingest_filter_action action;
   2045 	action = NDB_INGEST_ACCEPT;
   2046 
   2047 	if (ingester->filter)
   2048 		action = ingester->filter(ingester->filter_context, note);
   2049 
   2050 	if (action == NDB_INGEST_REJECT)
   2051 		return 0;
   2052 
   2053 	// some special situations we might want to skip sig validation,
   2054 	// like during large imports
   2055 	if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) {
   2056 		// if we're skipping validation we don't need to verify
   2057 	} else {
   2058 		// verify! If it's an invalid note we don't need to
   2059 		// bother writing it to the database
   2060 		if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
   2061 			ndb_debug("signature verification failed\n");
   2062 			return 0;
   2063 		}
   2064 	}
   2065 
   2066 	// we didn't find anything. let's send it
   2067 	// to the writer thread
   2068 	note = realloc(note, note_size);
   2069 	assert(((uint64_t)note % 4) == 0);
   2070 
   2071 	if (note->kind == 0) {
   2072 		struct ndb_profile_record_builder *b =
   2073 			&out->profile.record;
   2074 
   2075 		ndb_process_profile_note(note, b);
   2076 
   2077 		out->type = NDB_WRITER_PROFILE;
   2078 		out->profile.note.note = note;
   2079 		out->profile.note.note_len = note_size;
   2080 	} else {
   2081 		out->type = NDB_WRITER_NOTE;
   2082 		out->note.note = note;
   2083 		out->note.note_len = note_size;
   2084 	}
   2085 
   2086 	return 1;
   2087 }
   2088 
   2089 
   2090 static int ndb_ingester_process_event(secp256k1_context *ctx,
   2091 				      struct ndb_ingester *ingester,
   2092 				      struct ndb_ingester_event *ev,
   2093 				      struct ndb_writer_msg *out,
   2094 				      MDB_txn *read_txn
   2095 				      )
   2096 {
   2097 	struct ndb_tce tce;
   2098 	struct ndb_fce fce;
   2099 	struct ndb_note *note;
   2100 	struct ndb_ingest_controller controller;
   2101 	struct ndb_id_cb cb;
   2102 	void *buf;
   2103 	int ok;
   2104 	size_t bufsize, note_size;
   2105 
   2106 	ok = 0;
   2107 
   2108 	// we will use this to check if we already have it in the DB during
   2109 	// ID parsing
   2110 	controller.read_txn = read_txn;
   2111 	controller.lmdb = ingester->writer->lmdb;
   2112 	cb.fn = ndb_ingester_json_controller;
   2113 	cb.data = &controller;
   2114 
   2115 	// since we're going to be passing this allocated note to a different
   2116 	// thread, we can't use thread-local buffers. just allocate a block
   2117         bufsize = max(ev->len * 8.0, 4096);
   2118 	buf = malloc(bufsize);
   2119 	if (!buf) {
   2120 		ndb_debug("couldn't malloc buf\n");
   2121 		return 0;
   2122 	}
   2123 
   2124 	note_size =
   2125 		ev->client ?
   2126 		ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) :
   2127 		ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb);
   2128 
   2129 	if ((int)note_size == -42) {
   2130 		// we already have this!
   2131 		//ndb_debug("already have id??\n");
   2132 		goto cleanup;
   2133 	} else if (note_size == 0) {
   2134 		ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json);
   2135 		goto cleanup;
   2136 	}
   2137 
   2138 	//ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json);
   2139 
   2140 	if (ev->client) {
   2141 		switch (fce.evtype) {
   2142 		case NDB_FCE_EVENT:
   2143 			note = fce.event.note;
   2144 			if (note != buf) {
   2145 				ndb_debug("note buffer not equal to malloc'd buffer\n");
   2146 				goto cleanup;
   2147 			}
   2148 
   2149 			if (!ndb_ingester_process_note(ctx, note, note_size,
   2150 						       out, ingester)) {
   2151 				ndb_debug("failed to process note\n");
   2152 				goto cleanup;
   2153 			} else {
   2154 				// we're done with the original json, free it
   2155 				free(ev->json);
   2156 				return 1;
   2157 			}
   2158 		}
   2159 	} else {
   2160 		switch (tce.evtype) {
   2161 		case NDB_TCE_AUTH:   goto cleanup;
   2162 		case NDB_TCE_NOTICE: goto cleanup;
   2163 		case NDB_TCE_EOSE:   goto cleanup;
   2164 		case NDB_TCE_OK:     goto cleanup;
   2165 		case NDB_TCE_EVENT:
   2166 			note = tce.event.note;
   2167 			if (note != buf) {
   2168 				ndb_debug("note buffer not equal to malloc'd buffer\n");
   2169 				goto cleanup;
   2170 			}
   2171 
   2172 			if (!ndb_ingester_process_note(ctx, note, note_size,
   2173 						       out, ingester)) {
   2174 				ndb_debug("failed to process note\n");
   2175 				goto cleanup;
   2176 			} else {
   2177 				// we're done with the original json, free it
   2178 				free(ev->json);
   2179 				return 1;
   2180 			}
   2181 		}
   2182 	}
   2183 
   2184 
   2185 cleanup:
   2186 	free(ev->json);
   2187 	free(buf);
   2188 
   2189 	return ok;
   2190 }
   2191 
   2192 static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db)
   2193 {
   2194 	MDB_cursor *mc;
   2195 	MDB_val key, val;
   2196 
   2197 	if (mdb_cursor_open(txn, db, &mc))
   2198 		return 0;
   2199 
   2200 	if (mdb_cursor_get(mc, &key, &val, MDB_LAST)) {
   2201 		mdb_cursor_close(mc);
   2202 		return 0;
   2203 	}
   2204 
   2205 	mdb_cursor_close(mc);
   2206 
   2207 	assert(key.mv_size == 8);
   2208         return *((uint64_t*)key.mv_data);
   2209 }
   2210 
   2211 //
   2212 // make a search key meant for user queries without any other note info
   2213 static void ndb_make_search_key_low(struct ndb_search_key *key, const char *search)
   2214 {
   2215 	memset(key->id, 0, sizeof(key->id));
   2216 	key->timestamp = 0;
   2217 	lowercase_strncpy(key->search, search, sizeof(key->search) - 1);
   2218 	key->search[sizeof(key->search) - 1] = '\0';
   2219 }
   2220 
   2221 int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query)
   2222 {
   2223 	int rc;
   2224 	struct ndb_search_key s;
   2225 	MDB_val k, v;
   2226 	search->cursor = NULL;
   2227 
   2228 	MDB_cursor **cursor = (MDB_cursor **)&search->cursor;
   2229 
   2230 	ndb_make_search_key_low(&s, query);
   2231 
   2232 	k.mv_data = &s;
   2233 	k.mv_size = sizeof(s);
   2234 
   2235 	if ((rc = mdb_cursor_open(txn->mdb_txn,
   2236 				  txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH],
   2237 				  cursor))) {
   2238 		printf("search_profile: cursor opened failed: %s\n",
   2239 				mdb_strerror(rc));
   2240 		return 0;
   2241 	}
   2242 
   2243 	// Position cursor at the next key greater than or equal to the specified key
   2244 	if (mdb_cursor_get(search->cursor, &k, &v, MDB_SET_RANGE)) {
   2245 		printf("search_profile: cursor get failed\n");
   2246 		goto cleanup;
   2247 	} else {
   2248 		search->key = k.mv_data;
   2249 		assert(v.mv_size == 8);
   2250 		search->profile_key = *((uint64_t*)v.mv_data);
   2251 		return 1;
   2252 	}
   2253 
   2254 cleanup:
   2255 	mdb_cursor_close(search->cursor);
   2256 	search->cursor = NULL;
   2257 	return 0;
   2258 }
   2259 
   2260 void ndb_search_profile_end(struct ndb_search *search)
   2261 {
   2262 	if (search->cursor)
   2263 		mdb_cursor_close(search->cursor);
   2264 }
   2265 
   2266 int ndb_search_profile_next(struct ndb_search *search)
   2267 {
   2268 	int rc;
   2269 	MDB_val k, v;
   2270 	unsigned char *init_id;
   2271 
   2272 	init_id = search->key->id;
   2273 	k.mv_data = search->key;
   2274 	k.mv_size = sizeof(*search->key);
   2275 
   2276 retry:
   2277 	if ((rc = mdb_cursor_get(search->cursor, &k, &v, MDB_NEXT))) {
   2278 		ndb_debug("ndb_search_profile_next: %s\n",
   2279 				mdb_strerror(rc));
   2280 		return 0;
   2281 	} else {
   2282 		search->key = k.mv_data;
   2283 		assert(v.mv_size == 8);
   2284 		search->profile_key = *((uint64_t*)v.mv_data);
   2285 
   2286 		// skip duplicate pubkeys
   2287 		if (!memcmp(init_id, search->key->id, 32))
   2288 			goto retry;
   2289 	}
   2290 
   2291 	return 1;
   2292 }
   2293 
   2294 static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b)
   2295 {
   2296 	int cmp;
   2297 	struct ndb_search_key *ska, *skb;
   2298 
   2299 	ska = a->mv_data;
   2300 	skb = b->mv_data;
   2301 
   2302 	MDB_val a2 = *a;
   2303 	MDB_val b2 = *b;
   2304 
   2305 	a2.mv_data = ska->search;
   2306 	a2.mv_size = sizeof(ska->search) + sizeof(ska->id);
   2307 
   2308 	cmp = mdb_cmp_memn(&a2, &b2);
   2309 	if (cmp) return cmp;
   2310 
   2311 	if (ska->timestamp < skb->timestamp)
   2312 		return -1;
   2313 	else if (ska->timestamp > skb->timestamp)
   2314 		return 1;
   2315 
   2316 	return 0;
   2317 }
   2318 
   2319 static int ndb_write_profile_pk_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key)
   2320 
   2321 {
   2322 	MDB_val key, val;
   2323 	int rc;
   2324 	struct ndb_tsid tsid;
   2325 	MDB_dbi pk_db;
   2326 
   2327 	pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK];
   2328 
   2329 	// write profile_pk + created_at index
   2330 	ndb_tsid_init(&tsid, note->pubkey, note->created_at);
   2331 
   2332 	key.mv_data = &tsid;
   2333 	key.mv_size = sizeof(tsid);
   2334 	val.mv_data = &profile_key;
   2335 	val.mv_size = sizeof(profile_key);
   2336 
   2337 	if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) {
   2338 		ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n",
   2339 				profile_key, mdb_strerror(rc));
   2340 		return 0;
   2341 	}
   2342 
   2343 	return 1;
   2344 }
   2345 
   2346 static int ndb_write_profile(struct ndb_txn *txn,
   2347 			     struct ndb_writer_profile *profile,
   2348 			     uint64_t note_key)
   2349 {
   2350 	uint64_t profile_key;
   2351 	struct ndb_note *note;
   2352 	void *flatbuf;
   2353 	size_t flatbuf_len;
   2354 	int rc;
   2355 
   2356 	MDB_val key, val;
   2357 	MDB_dbi profile_db;
   2358 
   2359 	note = profile->note.note;
   2360 
   2361 	// add note_key to profile record
   2362 	NdbProfileRecord_note_key_add(profile->record.builder, note_key);
   2363 	NdbProfileRecord_end_as_root(profile->record.builder);
   2364 
   2365 	flatbuf = profile->record.flatbuf =
   2366 		flatcc_builder_finalize_aligned_buffer(profile->record.builder, &flatbuf_len);
   2367 
   2368 	assert(((uint64_t)flatbuf % 8) == 0);
   2369 
   2370 	// TODO: this may not be safe!?
   2371 	flatbuf_len = (flatbuf_len + 7) & ~7;
   2372 
   2373 	//assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0);
   2374 
   2375 	// get dbs
   2376 	profile_db = txn->lmdb->dbs[NDB_DB_PROFILE];
   2377 
   2378 	// get new key
   2379 	profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1;
   2380 
   2381 	// write profile to profile store
   2382 	key.mv_data = &profile_key;
   2383 	key.mv_size = sizeof(profile_key);
   2384 	val.mv_data = flatbuf;
   2385 	val.mv_size = flatbuf_len;
   2386 
   2387 	if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) {
   2388 		ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc));
   2389 		return 0;
   2390 	}
   2391 
   2392 	// write last fetched record
   2393 	if (!ndb_maybe_write_last_profile_fetch(txn, note)) {
   2394 		ndb_debug("failed to write last profile fetched record\n");
   2395 	}
   2396 
   2397 	// write profile pubkey index
   2398 	if (!ndb_write_profile_pk_index(txn, note, profile_key)) {
   2399 		ndb_debug("failed to write profile pubkey index\n");
   2400 		return 0;
   2401 	}
   2402 
   2403 	// write name, display_name profile search indices
   2404 	if (!ndb_write_profile_search_indices(txn, note, profile_key,
   2405 					      flatbuf)) {
   2406 		ndb_debug("failed to write profile search indices\n");
   2407 		return 0;
   2408 	}
   2409 
   2410 	return 1;
   2411 }
   2412 
   2413 // find the last id tag in a note (e, p, etc)
   2414 static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type)
   2415 {
   2416 	unsigned char *last = NULL;
   2417 	struct ndb_iterator iter;
   2418 	struct ndb_str str;
   2419 
   2420 	// get the liked event id (last id)
   2421 	ndb_tags_iterate_start(note, &iter);
   2422 
   2423 	while (ndb_tags_iterate_next(&iter)) {
   2424 		if (iter.tag->count < 2)
   2425 			continue;
   2426 
   2427 		str = ndb_tag_str(note, iter.tag, 0);
   2428 
   2429 		// assign liked to the last e tag
   2430 		if (str.flag == NDB_PACKED_STR && str.str[0] == type) {
   2431 			str = ndb_tag_str(note, iter.tag, 1);
   2432 			if (str.flag == NDB_PACKED_ID)
   2433 				last = str.id;
   2434 		}
   2435 	}
   2436 
   2437 	return last;
   2438 }
   2439 
   2440 void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len)
   2441 {
   2442 	MDB_val k, v;
   2443 
   2444 	k.mv_data = (unsigned char*)id;
   2445 	k.mv_size = 32;
   2446 
   2447 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) {
   2448 		//ndb_debug("ndb_get_note_meta: mdb_get note failed\n");
   2449 		return NULL;
   2450 	}
   2451 
   2452 	if (len)
   2453 		*len = v.mv_size;
   2454 
   2455 	return v.mv_data;
   2456 }
   2457 
   2458 // When receiving a reaction note, look for the liked id and increase the
   2459 // reaction counter in the note metadata database
   2460 static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note)
   2461 {
   2462 	size_t len;
   2463 	void *root;
   2464 	int reactions, rc;
   2465 	MDB_val key, val;
   2466 	NdbEventMeta_table_t meta;
   2467 	unsigned char *liked = ndb_note_last_id_tag(note, 'e');
   2468 
   2469 	if (liked == NULL)
   2470 		return 0;
   2471 
   2472 	root = ndb_get_note_meta(txn, liked, &len);
   2473 
   2474 	flatcc_builder_t builder;
   2475 	flatcc_builder_init(&builder);
   2476 	NdbEventMeta_start_as_root(&builder);
   2477 
   2478 	// no meta record, let's make one
   2479 	if (root == NULL) {
   2480 		NdbEventMeta_reactions_add(&builder, 1);
   2481 	} else {
   2482 		// clone existing and add to it
   2483 		meta = NdbEventMeta_as_root(root);
   2484 
   2485 		reactions = NdbEventMeta_reactions_get(meta);
   2486 		NdbEventMeta_clone(&builder, meta);
   2487 		NdbEventMeta_reactions_add(&builder, reactions + 1);
   2488 	}
   2489 
   2490 	NdbProfileRecord_end_as_root(&builder);
   2491 	root = flatcc_builder_finalize_aligned_buffer(&builder, &len);
   2492 	assert(((uint64_t)root % 8) == 0);
   2493 
   2494 	if (root == NULL) {
   2495 		ndb_debug("failed to create note metadata record\n");
   2496 		return 0;
   2497 	}
   2498 
   2499 	// metadata is keyed on id because we want to collect stats regardless
   2500 	// if we have the note yet or not
   2501 	key.mv_data = liked;
   2502 	key.mv_size = 32;
   2503 
   2504 	val.mv_data = root;
   2505 	val.mv_size = len;
   2506 
   2507 	// write the new meta record
   2508 	//ndb_debug("writing stats record for ");
   2509 	//print_hex(liked, 32);
   2510 	//ndb_debug("\n");
   2511 
   2512 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) {
   2513 		ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc));
   2514 		return 0;
   2515 	}
   2516 
   2517 	free(root);
   2518 
   2519 	return 1;
   2520 }
   2521 
   2522 
   2523 static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note,
   2524 				   uint64_t note_key)
   2525 
   2526 {
   2527 	struct ndb_tsid tsid;
   2528 	int rc;
   2529 	MDB_val key, val;
   2530 	MDB_dbi id_db;
   2531 
   2532 	ndb_tsid_init(&tsid, note->id, note->created_at);
   2533 
   2534 	key.mv_data = &tsid;
   2535 	key.mv_size = sizeof(tsid);
   2536 	val.mv_data = &note_key;
   2537 	val.mv_size = sizeof(note_key);
   2538 
   2539 	id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
   2540 
   2541 	if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) {
   2542 		ndb_debug("write note id index to db failed: %s\n",
   2543 				mdb_strerror(rc));
   2544 		return 0;
   2545 	}
   2546 
   2547 	return 1;
   2548 }
   2549 
   2550 static int ndb_filter_group_add_filters(struct ndb_filter_group *group,
   2551 					struct ndb_filter *filters,
   2552 					int num_filters)
   2553 {
   2554 	int i;
   2555 
   2556 	for (i = 0; i < num_filters; i++) {
   2557 		if (!ndb_filter_group_add(group, &filters[i]))
   2558 			return 0;
   2559 	}
   2560 
   2561 	return 1;
   2562 }
   2563 
   2564 
   2565 static struct ndb_filter_elements *
   2566 ndb_filter_get_elems(struct ndb_filter *filter, enum ndb_filter_fieldtype typ)
   2567 {
   2568 	int i;
   2569 	struct ndb_filter_elements *els;
   2570 
   2571 	for (i = 0; i < filter->num_elements; i++) {
   2572 		els = ndb_filter_get_elements(filter, i);
   2573 		assert(els);
   2574 		if (els->field.type == typ) {
   2575 			return els;
   2576 		}
   2577 	}
   2578 
   2579 	return NULL;
   2580 }
   2581 
   2582 static uint64_t *
   2583 ndb_filter_get_elem(struct ndb_filter *filter, enum ndb_filter_fieldtype typ)
   2584 {
   2585 	struct ndb_filter_elements *els;
   2586 	if ((els = ndb_filter_get_elems(filter, typ)))
   2587 		return &els->elements[0];
   2588 	return NULL;
   2589 }
   2590 
   2591 static uint64_t *ndb_filter_get_int(struct ndb_filter *filter,
   2592 				    enum ndb_filter_fieldtype typ)
   2593 {
   2594 	uint64_t *el;
   2595 	if (NULL == (el = ndb_filter_get_elem(filter, typ)))
   2596 		return NULL;
   2597 	return el;
   2598 }
   2599 
   2600 static inline int push_query_result(struct ndb_query_results *results,
   2601 				    struct ndb_query_result *result)
   2602 {
   2603 	return cursor_push(&results->cur, (unsigned char*)result, sizeof(*result));
   2604 }
   2605 
   2606 static int compare_query_results(const void *pa, const void *pb)
   2607 {
   2608 	struct ndb_query_result *a, *b;
   2609 
   2610 	a = (struct ndb_query_result *)pa;
   2611 	b = (struct ndb_query_result *)pb;
   2612 
   2613 	if (a->note->created_at == b->note->created_at) {
   2614 		return 0;
   2615 	} else if (a->note->created_at > b->note->created_at) {
   2616 		return -1;
   2617 	} else {
   2618 		return 1;
   2619 	}
   2620 }
   2621 
   2622 static void ndb_query_result_init(struct ndb_query_result *res,
   2623 				  struct ndb_note *note,
   2624 				  uint64_t note_size,
   2625 				  uint64_t note_id)
   2626 {
   2627 	*res = (struct ndb_query_result){
   2628 		.note_id = note_id,
   2629 		.note_size = note_size,
   2630 		.note = note,
   2631 	};
   2632 }
   2633 
   2634 static int query_is_full(struct ndb_query_results *results, int limit)
   2635 {
   2636 	if (results->cur.p >= results->cur.end)
   2637 		return 1;
   2638 
   2639 	return cursor_count(&results->cur, sizeof(struct ndb_query_result)) >= limit;
   2640 }
   2641 
   2642 static int ndb_query_plan_execute_ids(struct ndb_txn *txn,
   2643 				      struct ndb_filter *filter,
   2644 				      struct ndb_query_results *results,
   2645 				      int limit
   2646 				      )
   2647 {
   2648 	MDB_cursor *cur;
   2649 	MDB_dbi db;
   2650 	MDB_val k, v;
   2651 	int matched, rc, i;
   2652 	struct ndb_filter_elements *ids;
   2653 	struct ndb_note *note;
   2654 	struct ndb_query_result res;
   2655 	struct ndb_tsid tsid, *ptsid;
   2656 	uint64_t note_id, until, *pint;
   2657 	size_t note_size;
   2658 	unsigned char *id;
   2659 
   2660 	matched = 0;
   2661 	until = UINT64_MAX;
   2662 
   2663 	if (!(ids = ndb_filter_get_elems(filter, NDB_FILTER_IDS)))
   2664 		return 0;
   2665 
   2666 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
   2667 		until = *pint;
   2668 
   2669 	db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
   2670 	if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
   2671 		return 0;
   2672 
   2673 	// for each id in our ids filter, find in the db
   2674 	for (i = 0; i < ids->count; i++) {
   2675 		if (query_is_full(results, limit))
   2676 			break;
   2677 
   2678 		id = ndb_filter_get_id_element(filter, ids, i);
   2679 		ndb_tsid_init(&tsid, (unsigned char *)id, until);
   2680 
   2681 		k.mv_data = &tsid;
   2682 		k.mv_size = sizeof(tsid);
   2683 
   2684 		if (!ndb_cursor_start(cur, &k, &v))
   2685 			continue;
   2686 
   2687 		ptsid = (struct ndb_tsid *)k.mv_data;
   2688 		note_id = *(uint64_t*)v.mv_data;
   2689 
   2690 		if (memcmp(id, ptsid->id, 32) == 0)
   2691 			matched |= 1 << NDB_FILTER_AUTHORS;
   2692 		else
   2693 			continue;
   2694 
   2695 		// get the note because we need it to match against the filter
   2696 		if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
   2697 			continue;
   2698 
   2699 		// Sure this particular lookup matched the index query, but
   2700 		// does it match the entire filter? Check! We also pass in
   2701 		// things we've already matched via the filter so we don't have
   2702 		// to check again. This can be pretty important for filters
   2703 		// with a large number of entries.
   2704 		if (!ndb_filter_matches_with(filter, note, matched))
   2705 			continue;
   2706 
   2707 		ndb_query_result_init(&res, note, note_size, note_id);
   2708 		if (!push_query_result(results, &res))
   2709 			break;
   2710 	}
   2711 
   2712 	mdb_cursor_close(cur);
   2713 	return 1;
   2714 }
   2715 
   2716 //
   2717 // encode a tag index key
   2718 //
   2719 // consists of:
   2720 //
   2721 //   u8   tag
   2722 //   u8   tag_val_len
   2723 //   [u8] tag_val_bytes
   2724 //   u64  created_at
   2725 //
   2726 static int ndb_encode_tag_key(unsigned char *buf, int buf_size,
   2727 			      char tag, const unsigned char *val,
   2728 			      unsigned char val_len,
   2729 			      uint64_t timestamp)
   2730 {
   2731 	struct cursor writer;
   2732 	int ok;
   2733 
   2734 	// quick exit for obvious case where it will be too big. There can be
   2735 	// values of val_len that still fail, but we just let the writer handle
   2736 	// those failure cases
   2737 	if (val_len >= buf_size)
   2738 		return 0;
   2739 
   2740 	make_cursor(buf, buf + buf_size, &writer);
   2741 
   2742 	ok =
   2743 		cursor_push_byte(&writer, tag) &&
   2744 		cursor_push(&writer, (unsigned char*)val, val_len) &&
   2745 		cursor_push(&writer, (unsigned char*)&timestamp, sizeof(timestamp));
   2746 
   2747 	if (!ok)
   2748 		return 0;
   2749 
   2750 	return writer.p - writer.start;
   2751 }
   2752 
   2753 static int ndb_query_plan_execute_created_at(struct ndb_txn *txn,
   2754 					     struct ndb_filter *filter,
   2755 					     struct ndb_query_results *results,
   2756 					     int limit)
   2757 {
   2758 	MDB_dbi db;
   2759 	MDB_val k, v;
   2760 	MDB_cursor *cur;
   2761 	int rc;
   2762 	struct ndb_note *note;
   2763 	struct ndb_tsid key, *pkey;
   2764 	uint64_t *pint, until, since, note_id;
   2765 	size_t note_size;
   2766 	struct ndb_query_result res;
   2767 	unsigned char high_key[32] = {0xFF};
   2768 
   2769 	db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
   2770 
   2771 	until = UINT64_MAX;
   2772 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
   2773 		until = *pint;
   2774 
   2775 	since = 0;
   2776 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE)))
   2777 		since = *pint;
   2778 
   2779 	if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
   2780 		return 0;
   2781 
   2782 	// if we have until, start there, otherwise just use max
   2783 	ndb_tsid_init(&key, high_key, until);
   2784 	k.mv_data = &key;
   2785 	k.mv_size = sizeof(key);
   2786 
   2787 	if (!ndb_cursor_start(cur, &k, &v))
   2788 		return 1;
   2789 
   2790 	while (!query_is_full(results, limit)) {
   2791 		pkey = (struct ndb_tsid *)k.mv_data;
   2792 		note_id = *(uint64_t*)v.mv_data;
   2793 		assert(v.mv_size == 8);
   2794 
   2795 		// TODO(perf): if we are only looking for IDs and have no other
   2796 		// condition, then we can use the ID in the index without
   2797 		// looking up the note. For now we always look up the note
   2798 		if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
   2799 			goto next;
   2800 
   2801 		// does this entry match our filter?
   2802 		if (!ndb_filter_matches_with(filter, note, 0))
   2803 			goto next;
   2804 
   2805 		// don't continue the scan if we're below `since`
   2806 		if (pkey->timestamp < since)
   2807 			break;
   2808 
   2809 		ndb_query_result_init(&res, note, (uint64_t)note_size, note_id);
   2810 		if (!push_query_result(results, &res))
   2811 			break;
   2812 next:
   2813 		if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
   2814 			break;
   2815 	}
   2816 
   2817 	mdb_cursor_close(cur);
   2818 	return 1;
   2819 }
   2820 
   2821 static int ndb_query_plan_execute_tags(struct ndb_txn *txn,
   2822 				       struct ndb_filter *filter,
   2823 				       struct ndb_query_results *results,
   2824 				       int limit)
   2825 {
   2826 	MDB_cursor *cur;
   2827 	MDB_dbi db;
   2828 	MDB_val k, v;
   2829 	int len, taglen, rc, i;
   2830 	uint64_t *pint, until, note_id;
   2831 	size_t note_size;
   2832 	unsigned char key_buffer[255];
   2833 	struct ndb_note *note;
   2834 	struct ndb_filter_elements *tags;
   2835 	unsigned char *tag;
   2836 	struct ndb_query_result res;
   2837 
   2838 	db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS];
   2839 
   2840 	if (!(tags = ndb_filter_get_elems(filter, NDB_FILTER_TAGS)))
   2841 		return 0;
   2842 
   2843 	until = UINT64_MAX;
   2844 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
   2845 		until = *pint;
   2846 
   2847 	if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
   2848 		return 0;
   2849 
   2850 	for (i = 0; i < tags->count; i++) {
   2851 		tag = ndb_filter_get_id_element(filter, tags, i);
   2852 
   2853 		taglen = tags->field.elem_type == NDB_ELEMENT_ID
   2854 		       ? 32 : strlen((const char*)tag);
   2855 
   2856 		if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer),
   2857 					       tags->field.tag, tag, taglen,
   2858 					       until)))
   2859 			return 0;
   2860 
   2861 		k.mv_data = key_buffer;
   2862 		k.mv_size = len;
   2863 
   2864 		if (!ndb_cursor_start(cur, &k, &v))
   2865 			continue;
   2866 
   2867 		// for each id in our ids filter, find in the db
   2868 		while (!query_is_full(results, limit)) {
   2869 			// check if tag value matches, bail if not
   2870 			if (((unsigned char *)k.mv_data)[0] != tags->field.tag)
   2871 				break;
   2872 
   2873 			// check if tag value matches, bail if not
   2874 			if (taglen != k.mv_size - 9)
   2875 				break;
   2876 
   2877 			if (memcmp(k.mv_data+1, tag, k.mv_size-9))
   2878 				break;
   2879 
   2880 			note_id = *(uint64_t*)v.mv_data;
   2881 
   2882 			if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
   2883 				goto next;
   2884 
   2885 			if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_TAGS))
   2886 				goto next;
   2887 
   2888 			ndb_query_result_init(&res, note, note_size, note_id);
   2889 			if (!push_query_result(results, &res))
   2890 				break;
   2891 
   2892 next:
   2893 			if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
   2894 				break;
   2895 		}
   2896 	}
   2897 
   2898 	mdb_cursor_close(cur);
   2899 	return 1;
   2900 }
   2901 
   2902 static int ndb_query_plan_execute_kinds(struct ndb_txn *txn,
   2903 					struct ndb_filter *filter,
   2904 					struct ndb_query_results *results,
   2905 					int limit)
   2906 {
   2907 	MDB_cursor *cur;
   2908 	MDB_dbi db;
   2909 	MDB_val k, v;
   2910 	struct ndb_note *note;
   2911 	struct ndb_u64_tsid tsid, *ptsid;
   2912 	struct ndb_filter_elements *kinds;
   2913 	struct ndb_query_result res;
   2914 	uint64_t kind, note_id, until, *pint;
   2915 	size_t note_size;
   2916 	int i, rc;
   2917 
   2918 	// we should have kinds in a kinds filter!
   2919 	if (!(kinds = ndb_filter_get_elems(filter, NDB_FILTER_KINDS)))
   2920 		return 0;
   2921 
   2922 	until = UINT64_MAX;
   2923 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
   2924 		until = *pint;
   2925 
   2926 	db = txn->lmdb->dbs[NDB_DB_NOTE_KIND];
   2927 
   2928 	if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
   2929 		return 0;
   2930 
   2931 	for (i = 0; i < kinds->count; i++) {
   2932 		if (query_is_full(results, limit))
   2933 			break;
   2934 
   2935 		kind = kinds->elements[i];
   2936 		ndb_debug("kind %" PRIu64 "\n", kind);
   2937 		ndb_u64_tsid_init(&tsid, kind, until);
   2938 
   2939 		k.mv_data = &tsid;
   2940 		k.mv_size = sizeof(tsid);
   2941 
   2942 		if (!ndb_cursor_start(cur, &k, &v))
   2943 			continue;
   2944 
   2945 		// for each id in our ids filter, find in the db
   2946 		while (!query_is_full(results, limit)) {
   2947 			ptsid = (struct ndb_u64_tsid *)k.mv_data;
   2948 			if (ptsid->u64 != kind)
   2949 				break;
   2950 
   2951 			note_id = *(uint64_t*)v.mv_data;
   2952 			if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
   2953 				goto next;
   2954 
   2955 			if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_KINDS))
   2956 				goto next;
   2957 
   2958 			ndb_query_result_init(&res, note, note_size, note_id);
   2959 			if (!push_query_result(results, &res))
   2960 				break;
   2961 
   2962 next:
   2963 			if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
   2964 				break;
   2965 		}
   2966 	}
   2967 
   2968 	mdb_cursor_close(cur);
   2969 	return 1;
   2970 }
   2971 
   2972 static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter)
   2973 {
   2974 	struct ndb_filter_elements *ids, *kinds, *authors, *tags;
   2975 
   2976 	ids = ndb_filter_get_elems(filter, NDB_FILTER_IDS);
   2977 	kinds = ndb_filter_get_elems(filter, NDB_FILTER_KINDS);
   2978 	authors = ndb_filter_get_elems(filter, NDB_FILTER_AUTHORS);
   2979 	tags = ndb_filter_get_elems(filter, NDB_FILTER_TAGS);
   2980 
   2981 	// this is rougly similar to the heuristic in strfry's dbscan
   2982 	if (ids) {
   2983 		return NDB_PLAN_IDS;
   2984 	} else if (authors && authors->count <= 5) {
   2985 		// TODO: actually implment author plan and use it
   2986 		//return NDB_PLAN_AUTHORS;
   2987 		return NDB_PLAN_CREATED;
   2988 	} else if (tags && tags->count <= 5) {
   2989 		return NDB_PLAN_TAGS;
   2990 	} else if (kinds) {
   2991 		return NDB_PLAN_KINDS;
   2992 	}
   2993 
   2994 	return NDB_PLAN_CREATED;
   2995 }
   2996 
   2997 static const char *ndb_query_plan_name(int plan_id)
   2998 {
   2999 	switch (plan_id) {
   3000 		case NDB_PLAN_IDS:     return "ids";
   3001 		case NDB_PLAN_KINDS:   return "kinds";
   3002 		case NDB_PLAN_TAGS:    return "tags";
   3003 		case NDB_PLAN_CREATED: return "created";
   3004 		case NDB_PLAN_AUTHORS: return "authors";
   3005 	}
   3006 
   3007 	return "unknown";
   3008 }
   3009 
   3010 static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
   3011 			    struct ndb_query_result *res, int capacity,
   3012 			    int *results_out)
   3013 {
   3014 	struct ndb_query_results results;
   3015 	uint64_t limit, *pint;
   3016 	enum ndb_query_plan plan;
   3017 	limit = capacity;
   3018 
   3019 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT)))
   3020 		limit = *pint;
   3021 
   3022 	limit = min(capacity, limit);
   3023 	make_cursor((unsigned char *)res,
   3024 		    ((unsigned char *)res) + limit * sizeof(*res),
   3025 		    &results.cur);
   3026 
   3027 	plan = ndb_filter_plan(filter);
   3028 	ndb_debug("using query plan '%s'\n", ndb_query_plan_name(plan));
   3029 	switch (plan) {
   3030 	// We have a list of ids, just open a cursor and jump to each once
   3031 	case NDB_PLAN_IDS:
   3032 		if (!ndb_query_plan_execute_ids(txn, filter, &results, limit))
   3033 			return 0;
   3034 		break;
   3035 
   3036 	// We have just kinds, just scan the kind index
   3037 	case NDB_PLAN_KINDS:
   3038 		if (!ndb_query_plan_execute_kinds(txn, filter, &results, limit))
   3039 			return 0;
   3040 		break;
   3041 
   3042 	case NDB_PLAN_TAGS:
   3043 		if (!ndb_query_plan_execute_tags(txn, filter, &results, limit))
   3044 			return 0;
   3045 		break;
   3046 	case NDB_PLAN_CREATED:
   3047 		if (!ndb_query_plan_execute_created_at(txn, filter, &results, limit))
   3048 			return 0;
   3049 		break;
   3050 	case NDB_PLAN_AUTHORS:
   3051 		// TODO: finish authors query plan
   3052 		return 0;
   3053 	}
   3054 
   3055 	*results_out = cursor_count(&results.cur, sizeof(*res));
   3056 	return 1;
   3057 }
   3058 
   3059 int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters,
   3060 	      struct ndb_query_result *results, int result_capacity, int *count)
   3061 {
   3062 	int i, out;
   3063 	struct ndb_query_result *p = results;
   3064 
   3065 	out = 0;
   3066 	*count = 0;
   3067 
   3068 	for (i = 0; i < num_filters; i++) {
   3069 		if (!ndb_query_filter(txn, &filters[i], p,
   3070 				      result_capacity, &out)) {
   3071 			return 0;
   3072 		}
   3073 
   3074 		*count += out;
   3075 		p += out;
   3076 		result_capacity -= out;
   3077 		if (result_capacity <= 0)
   3078 			break;
   3079 	}
   3080 
   3081 	// sort results
   3082 	qsort(results, *count, sizeof(*results), compare_query_results);
   3083 	return 1;
   3084 }
   3085 
   3086 static int ndb_write_note_tag_index(struct ndb_txn *txn, struct ndb_note *note,
   3087 				    uint64_t note_key)
   3088 {
   3089 	unsigned char key_buffer[255];
   3090 	struct ndb_iterator iter;
   3091 	struct ndb_str tkey, tval;
   3092 	char tchar;
   3093 	int len, rc;
   3094 	MDB_val key, val;
   3095 	MDB_dbi tags_db;
   3096 
   3097 	tags_db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS];
   3098 
   3099 	ndb_tags_iterate_start(note, &iter);
   3100 
   3101 	while (ndb_tags_iterate_next(&iter)) {
   3102 		if (iter.tag->count < 2)
   3103 			continue;
   3104 
   3105 		tkey = ndb_tag_str(note, iter.tag, 0);
   3106 
   3107 		// we only write indices for 1-char tags.
   3108 		tchar = tkey.str[0];
   3109 		if (tchar == 0 || tkey.str[1] != 0)
   3110 			continue;
   3111 
   3112 		tval = ndb_tag_str(note, iter.tag, 1);
   3113 		len = ndb_str_len(&tval);
   3114 
   3115 		if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer),
   3116 					       tchar, tval.id, (unsigned char)len,
   3117 					       ndb_note_created_at(note)))) {
   3118 			// this will fail when we try to encode a key that is
   3119 			// too big
   3120 			continue;
   3121 		}
   3122 
   3123 		//ndb_debug("writing tag '%c':'data:%d' to index\n", tchar, len);
   3124 
   3125 		key.mv_data = key_buffer;
   3126 		key.mv_size = len;
   3127 
   3128 		val.mv_data = &note_key;
   3129 		val.mv_size = sizeof(note_key);
   3130 
   3131 		if ((rc = mdb_put(txn->mdb_txn, tags_db, &key, &val, 0))) {
   3132 			ndb_debug("write note tag index to db failed: %s\n",
   3133 					mdb_strerror(rc));
   3134 			return 0;
   3135 		}
   3136 	}
   3137 
   3138 	return 1;
   3139 }
   3140 
   3141 static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note,
   3142 				     uint64_t note_key)
   3143 {
   3144 	struct ndb_u64_tsid tsid;
   3145 	int rc;
   3146 	MDB_val key, val;
   3147 	MDB_dbi kind_db;
   3148 
   3149 	ndb_u64_tsid_init(&tsid, note->kind, note->created_at);
   3150 
   3151 	key.mv_data = &tsid;
   3152 	key.mv_size = sizeof(tsid);
   3153 	val.mv_data = &note_key;
   3154 	val.mv_size = sizeof(note_key);
   3155 
   3156 	kind_db = txn->lmdb->dbs[NDB_DB_NOTE_KIND];
   3157 
   3158 	if ((rc = mdb_put(txn->mdb_txn, kind_db, &key, &val, 0))) {
   3159 		ndb_debug("write note kind index to db failed: %s\n",
   3160 				mdb_strerror(rc));
   3161 		return 0;
   3162 	}
   3163 
   3164 	return 1;
   3165 }
   3166 
   3167 static int ndb_write_word_to_index(struct ndb_txn *txn, const char *word,
   3168 				   int word_len, int word_index,
   3169 				   uint64_t timestamp, uint64_t note_id)
   3170 {
   3171 	// cap to some reasonable key size
   3172 	unsigned char buffer[1024];
   3173 	int keysize, rc;
   3174 	MDB_val k, v;
   3175 	MDB_dbi text_db;
   3176 
   3177 	// build our compressed text index key
   3178 	if (!ndb_make_text_search_key(buffer, sizeof(buffer), word_index,
   3179 				      word_len, word, timestamp, note_id,
   3180 				      &keysize)) {
   3181 		// probably too big
   3182 
   3183 		return 0;
   3184 	}
   3185 
   3186 	k.mv_data = buffer;
   3187 	k.mv_size = keysize;
   3188 
   3189 	v.mv_data = NULL;
   3190 	v.mv_size = 0;
   3191 
   3192 	text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT];
   3193 
   3194 	if ((rc = mdb_put(txn->mdb_txn, text_db, &k, &v, 0))) {
   3195 		ndb_debug("write note text index to db failed: %s\n",
   3196 				mdb_strerror(rc));
   3197 		return 0;
   3198 	}
   3199 
   3200 	return 1;
   3201 }
   3202 
   3203 
   3204 
   3205 // break a string into individual words for querying or for building the
   3206 // fulltext search index. This is callback based so we don't need to
   3207 // build up an intermediate structure
   3208 static int ndb_parse_words(struct cursor *cur, void *ctx, ndb_word_parser_fn fn)
   3209 {
   3210 	int word_len, words;
   3211 	const char *word;
   3212 
   3213 	words = 0;
   3214 
   3215 	while (cur->p < cur->end) {
   3216 		consume_whitespace_or_punctuation(cur);
   3217 		if (cur->p >= cur->end)
   3218 			break;
   3219 		word = (const char *)cur->p;
   3220 
   3221 		if (!consume_until_boundary(cur))
   3222 			break;
   3223 
   3224 		// start of word or end
   3225 		word_len = cur->p - (unsigned char *)word;
   3226 		if (word_len == 0 && cur->p >= cur->end)
   3227 			break;
   3228 
   3229 		if (word_len == 0) {
   3230 			if (!cursor_skip(cur, 1))
   3231 				break;
   3232 			continue;
   3233 		}
   3234 
   3235 		//ndb_debug("writing word index '%.*s'\n", word_len, word);
   3236 
   3237 		if (!fn(ctx, word, word_len, words))
   3238 			continue;
   3239 
   3240 		words++;
   3241 	}
   3242 
   3243 	return 1;
   3244 }
   3245 
   3246 struct ndb_word_writer_ctx
   3247 {
   3248 	struct ndb_txn *txn;
   3249 	struct ndb_note *note;
   3250 	uint64_t note_id;
   3251 };
   3252 
   3253 static int ndb_fulltext_word_writer(void *ctx,
   3254 		const char *word, int word_len, int words)
   3255 {
   3256 	struct ndb_word_writer_ctx *wctx = ctx;
   3257 
   3258 	if (!ndb_write_word_to_index(wctx->txn, word, word_len, words,
   3259 				     wctx->note->created_at, wctx->note_id)) {
   3260 		// too big to write this one, just skip it
   3261 		ndb_debug("failed to write word '%.*s' to index\n", word_len, word);
   3262 
   3263 		return 0;
   3264 	}
   3265 
   3266 	//fprintf(stderr, "wrote '%.*s' to note text index\n", word_len, word);
   3267 	return 1;
   3268 }
   3269 
   3270 static int ndb_write_note_fulltext_index(struct ndb_txn *txn,
   3271 					 struct ndb_note *note,
   3272 					 uint64_t note_id)
   3273 {
   3274 	struct cursor cur;
   3275 	unsigned char *content;
   3276 	struct ndb_str str;
   3277 	struct ndb_word_writer_ctx ctx;
   3278 
   3279 	str = ndb_note_str(note, &note->content);
   3280 	// I don't think this should happen?
   3281 	if (unlikely(str.flag == NDB_PACKED_ID))
   3282 		return 0;
   3283 
   3284 	content = (unsigned char *)str.str;
   3285 
   3286 	make_cursor(content, content + note->content_length, &cur);
   3287 
   3288 	ctx.txn = txn;
   3289 	ctx.note = note;
   3290 	ctx.note_id = note_id;
   3291 
   3292 	ndb_parse_words(&cur, &ctx, ndb_fulltext_word_writer);
   3293 
   3294 	return 1;
   3295 }
   3296 
   3297 static int ndb_parse_search_words(void *ctx, const char *word_str, int word_len, int word_index)
   3298 {
   3299 	(void)word_index;
   3300 	struct ndb_search_words *words = ctx;
   3301 	struct ndb_word *word;
   3302 
   3303 	if (words->num_words + 1 > MAX_TEXT_SEARCH_WORDS)
   3304 		return 0;
   3305 
   3306 	word = &words->words[words->num_words++];
   3307 	word->word = word_str;
   3308 	word->word_len = word_len;
   3309 
   3310 	return 1;
   3311 }
   3312 
   3313 static void ndb_search_words_init(struct ndb_search_words *words)
   3314 {
   3315 	words->num_words = 0;
   3316 }
   3317 
   3318 static int prefix_count(const char *str1, int len1, const char *str2, int len2) {
   3319 	int i, count = 0;
   3320 	int min_len = len1 < len2 ? len1 : len2;
   3321 
   3322 	for (i = 0; i < min_len; i++) {
   3323 		// case insensitive
   3324 		if (tolower(str1[i]) == tolower(str2[i]))
   3325 			count++;
   3326 		else
   3327 			break;
   3328 	}
   3329 
   3330 	return count;
   3331 }
   3332 
   3333 static int ndb_prefix_matches(struct ndb_text_search_result *result,
   3334 			      struct ndb_word *search_word)
   3335 {
   3336 	// Empty strings shouldn't happen but let's
   3337 	if (result->key.str_len < 2 || search_word->word_len < 2)
   3338 		return 0;
   3339 
   3340 	// make sure we at least have two matching prefix characters. exact
   3341 	// matches are nice but range searches allow us to match prefixes as
   3342 	// well. A double-char prefix is suffient, but maybe we could up this
   3343 	// in the future.
   3344 	//
   3345 	// TODO: How are we handling utf-8 prefix matches like
   3346 	// japanese?
   3347 	//
   3348 	if (   result->key.str[0] != tolower(search_word->word[0])
   3349 	    && result->key.str[1] != tolower(search_word->word[1])
   3350 	    )
   3351 		return 0;
   3352 
   3353 	// count the number of prefix-matched characters. This will be used
   3354 	// for ranking search results
   3355 	result->prefix_chars = prefix_count(result->key.str,
   3356 					    result->key.str_len,
   3357 					    search_word->word,
   3358 					    search_word->word_len);
   3359 
   3360 	if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5))
   3361 		return 0;
   3362 
   3363 	return 1;
   3364 }
   3365 
   3366 // This is called when scanning the full text search index. Scanning stops
   3367 // when we no longer have a prefix match for the word
   3368 static int ndb_text_search_next_word(MDB_cursor *cursor, MDB_cursor_op op,
   3369 	MDB_val *k, struct ndb_word *search_word,
   3370 	struct ndb_text_search_result *last_result,
   3371 	struct ndb_text_search_result *result,
   3372 	MDB_cursor_op order_op)
   3373 {
   3374 	struct cursor key_cursor;
   3375 	//struct ndb_text_search_key search_key;
   3376 	MDB_val v;
   3377 	int retries;
   3378 	retries = -1;
   3379 
   3380 	make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor);
   3381 
   3382 	// When op is MDB_SET_RANGE, this initializes the search. Position
   3383 	// the cursor at the next key greater than or equal to the specified
   3384 	// key.
   3385 	//
   3386 	// Subsequent searches should use MDB_NEXT
   3387 	if (mdb_cursor_get(cursor, k, &v, op)) {
   3388 		// we should only do this if we're going in reverse
   3389 		if (op == MDB_SET_RANGE && order_op == MDB_PREV) {
   3390 			// if set range worked and our key exists, it should be
   3391 			// the one right before this one
   3392 			if (mdb_cursor_get(cursor, k, &v, MDB_PREV))
   3393 				return 0;
   3394 		} else {
   3395 			return 0;
   3396 		}
   3397 	}
   3398 
   3399 retry:
   3400 	retries++;
   3401 	/*
   3402 	printf("continuing from ");
   3403 	if (ndb_unpack_text_search_key(k->mv_data, k->mv_size, &search_key)) {
   3404 		ndb_print_text_search_key(&search_key);
   3405 	} else { printf("??"); }
   3406 	printf("\n");
   3407 	*/
   3408 
   3409 	make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor);
   3410 
   3411 	if (unlikely(!ndb_unpack_text_search_key_noteid(&key_cursor, &result->key.note_id))) {
   3412 		fprintf(stderr, "UNUSUAL: failed to unpack text search key note_id\n");
   3413 		return 0;
   3414 	}
   3415 
   3416 	if (last_result) {
   3417 		if (last_result->key.note_id != result->key.note_id)
   3418 			return 0;
   3419 	}
   3420 
   3421 	// On success, this could still be not related at all.
   3422 	// It could just be adjacent to the word. Let's check
   3423 	// if we have a matching prefix at least.
   3424 
   3425 	// Before we unpack the entire key, let's quickly
   3426 	// unpack just the string to check the prefix. We don't
   3427 	// need to unpack the entire key if the prefix doesn't
   3428 	// match
   3429 	if (!ndb_unpack_text_search_key_string(&key_cursor,
   3430 					       &result->key.str,
   3431 					       &result->key.str_len)) {
   3432 		// this should never happen
   3433 		fprintf(stderr, "UNUSUAL: failed to unpack text search key string\n");
   3434 		return 0;
   3435 	}
   3436 
   3437 	if (!ndb_prefix_matches(result, search_word)) {
   3438 		/*
   3439 		printf("result prefix '%.*s' didn't match search word '%.*s'\n",
   3440 			result->key.str_len, result->key.str,
   3441 			search_word->word_len, search_word->word);
   3442 			*/
   3443 		// we should only do this if we're going in reverse
   3444 		if (retries == 0 && op == MDB_SET_RANGE && order_op == MDB_PREV) {
   3445 			// if set range worked and our key exists, it should be
   3446 			// the one right before this one
   3447 			mdb_cursor_get(cursor, k, &v, MDB_PREV);
   3448 			goto retry;
   3449 		} else {
   3450 			return 0;
   3451 		}
   3452 	}
   3453 
   3454 	// Unpack the remaining text search key, we will need this information
   3455 	// when building up our search results.
   3456 	if (!ndb_unpack_remaining_text_search_key(&key_cursor, &result->key)) {
   3457 		// This should never happen
   3458 		fprintf(stderr, "UNUSUAL: failed to unpack text search key\n");
   3459 		return 0;
   3460 	}
   3461 
   3462 			/*
   3463 	if (last_result) {
   3464 		if (result->key.word_index < last_result->key.word_index) {
   3465 			fprintf(stderr, "skipping '%.*s' because it is before last result '%.*s'\n",
   3466 					result->key.str_len, result->key.str,
   3467 					last_result->key.str_len, last_result->key.str);
   3468 			return 0;
   3469 		}
   3470 	}
   3471 					*/
   3472 
   3473 	return 1;
   3474 }
   3475 
   3476 static void ndb_text_search_results_init(
   3477 		struct ndb_text_search_results *results) {
   3478 	results->num_results = 0;
   3479 }
   3480 
   3481 void ndb_default_text_search_config(struct ndb_text_search_config *cfg)
   3482 {
   3483 	cfg->order = NDB_ORDER_DESCENDING;
   3484 	cfg->limit = MAX_TEXT_SEARCH_RESULTS;
   3485 }
   3486 
   3487 void ndb_text_search_config_set_order(struct ndb_text_search_config *cfg,
   3488 				     enum ndb_search_order order)
   3489 {
   3490 	cfg->order = order;
   3491 }
   3492 
   3493 void ndb_text_search_config_set_limit(struct ndb_text_search_config *cfg, int limit)
   3494 {
   3495 	cfg->limit = limit;
   3496 }
   3497 
   3498 int ndb_text_search(struct ndb_txn *txn, const char *query,
   3499 		    struct ndb_text_search_results *results,
   3500 		    struct ndb_text_search_config *config)
   3501 {
   3502 	unsigned char buffer[1024], *buf;
   3503 	unsigned char saved_buf[1024], *saved;
   3504 	struct ndb_text_search_result *result, *last_result;
   3505 	struct ndb_text_search_result candidate, last_candidate;
   3506 	struct ndb_search_words search_words;
   3507 	//struct ndb_text_search_key search_key;
   3508 	struct ndb_word *search_word;
   3509 	struct cursor cur;
   3510 	ndb_text_search_key_order_fn key_order_fn;
   3511 	MDB_dbi text_db;
   3512 	MDB_cursor *cursor;
   3513 	MDB_val k, v;
   3514 	int i, j, keysize, saved_size, limit;
   3515 	MDB_cursor_op op, order_op;
   3516 
   3517 	saved = NULL;
   3518 	ndb_text_search_results_init(results);
   3519 	ndb_search_words_init(&search_words);
   3520 
   3521 	// search config
   3522 	limit = MAX_TEXT_SEARCH_RESULTS;
   3523 	order_op = MDB_PREV;
   3524 	key_order_fn = ndb_make_text_search_key_high;
   3525 	if (config) {
   3526 		if (config->order == NDB_ORDER_ASCENDING) {
   3527 			order_op = MDB_NEXT;
   3528 			key_order_fn = ndb_make_text_search_key_low;
   3529 		}
   3530 		limit = min(limit, config->limit);
   3531 	}
   3532 	// end search config
   3533 
   3534 	text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT];
   3535 	make_cursor((unsigned char *)query, (unsigned char *)query + strlen(query), &cur);
   3536 
   3537 	ndb_parse_words(&cur, &search_words, ndb_parse_search_words);
   3538 	if (search_words.num_words == 0)
   3539 		return 0;
   3540 
   3541 	if ((i = mdb_cursor_open(txn->mdb_txn, text_db, &cursor))) {
   3542 		fprintf(stderr, "nd_text_search: mdb_cursor_open failed, error %d\n", i);
   3543 		return 0;
   3544 	}
   3545 
   3546 	// for each word, we recursively find all of the submatches
   3547 	while (results->num_results < limit) {
   3548 		last_result = NULL;
   3549 		result = &results->results[results->num_results];
   3550 
   3551 		// if we have saved, then we continue from the last root search
   3552 		// sequence
   3553 		if (saved) {
   3554 			buf = saved_buf;
   3555 			saved = NULL;
   3556 			keysize = saved_size;
   3557 
   3558 			k.mv_data = buf;
   3559 			k.mv_size = saved_size;
   3560 
   3561 			// reposition the cursor so we can continue
   3562 			if (mdb_cursor_get(cursor, &k, &v, MDB_SET_RANGE))
   3563 				break;
   3564 
   3565 			op = order_op;
   3566 		} else {
   3567 			// construct a packed fulltext search key using this
   3568 			// word this key doesn't contain any timestamp or index
   3569 			// info, so it should range match instead of exact
   3570 			// match
   3571 			if (!key_order_fn(buffer, sizeof(buffer),
   3572 					  search_words.words[0].word_len,
   3573 					  search_words.words[0].word, &keysize))
   3574 			{
   3575 				// word is too big to fit in 1024-sized key
   3576 				continue;
   3577 			}
   3578 
   3579 			buf = buffer;
   3580 			op = MDB_SET_RANGE;
   3581 		}
   3582 
   3583 		for (j = 0; j < search_words.num_words; j++) {
   3584 			search_word = &search_words.words[j];
   3585 
   3586 			// shouldn't happen but let's be defensive a bit
   3587 			if (search_word->word_len == 0)
   3588 				continue;
   3589 
   3590 			// if we already matched a note in this phrase, make
   3591 			// sure we're including the note id in the query
   3592 			if (last_result) {
   3593 				// we are narrowing down a search.
   3594 				// if we already have this note id, just continue
   3595 				for (i = 0; i < results->num_results; i++) {
   3596 					if (results->results[i].key.note_id == last_result->key.note_id)
   3597 						goto cont;
   3598 				}
   3599 
   3600 				if (!ndb_make_noted_text_search_key(
   3601 					buffer, sizeof(buffer),
   3602 					search_word->word_len,
   3603 					search_word->word,
   3604 					last_result->key.timestamp,
   3605 					last_result->key.note_id,
   3606 					&keysize))
   3607 				{
   3608 					continue;
   3609 				}
   3610 
   3611 				buf = buffer;
   3612 			}
   3613 
   3614 			k.mv_data = buf;
   3615 			k.mv_size = keysize;
   3616 
   3617 			if (!ndb_text_search_next_word(cursor, op, &k,
   3618 						       search_word,
   3619 						       last_result,
   3620 						       &candidate,
   3621 						       order_op)) {
   3622 				break;
   3623 			}
   3624 
   3625 			*result = candidate;
   3626 			op = MDB_SET_RANGE;
   3627 
   3628 			// save the first key match, since we will continue from
   3629 			// this on the next root word result
   3630 			if (j == 0 && !saved) {
   3631 				memcpy(saved_buf, k.mv_data, k.mv_size);
   3632 				saved = saved_buf;
   3633 				saved_size = k.mv_size;
   3634 			}
   3635 
   3636 			last_candidate = *result;
   3637 			last_result = &last_candidate;
   3638 		}
   3639 
   3640 cont:
   3641 		// we matched all of the queries!
   3642 		if (j == search_words.num_words) {
   3643 			results->num_results++;
   3644 		} else if (j == 0) {
   3645 			break;
   3646 		}
   3647 
   3648 	}
   3649 
   3650 	mdb_cursor_close(cursor);
   3651 
   3652 	return 1;
   3653 }
   3654 
   3655 static void ndb_write_blocks(struct ndb_txn *txn, uint64_t note_key,
   3656 			     struct ndb_blocks *blocks)
   3657 {
   3658 	int rc;
   3659 	MDB_val key, val;
   3660 
   3661 	// make sure we're not writing the owned flag to the db
   3662 	blocks->flags &= ~NDB_BLOCK_FLAG_OWNED;
   3663 
   3664 	key.mv_data = &note_key;
   3665 	key.mv_size = sizeof(note_key);
   3666 	val.mv_data = blocks;
   3667 	val.mv_size = ndb_blocks_total_size(blocks);
   3668 	assert((val.mv_size % 8) == 0);
   3669 
   3670 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_BLOCKS], &key, &val, 0))) {
   3671 		ndb_debug("write version to note_blocks failed: %s\n",
   3672 				mdb_strerror(rc));
   3673 		return;
   3674 	}
   3675 }
   3676 
   3677 static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note,
   3678 				uint64_t note_key, unsigned char *scratch,
   3679 				size_t scratch_size)
   3680 {
   3681 	size_t content_len;
   3682 	const char *content;
   3683 	struct ndb_blocks *blocks;
   3684 
   3685 	content_len = ndb_note_content_length(note);
   3686 	content = ndb_note_content(note);
   3687 
   3688 	if (!ndb_parse_content(scratch, scratch_size, content, content_len, &blocks)) {
   3689 		//ndb_debug("failed to parse content '%.*s'\n", content_len, content);
   3690 		return 0;
   3691 	}
   3692 
   3693 	ndb_write_blocks(txn, note_key, blocks);
   3694 	return 1;
   3695 }
   3696 
   3697 static uint64_t ndb_write_note(struct ndb_txn *txn,
   3698 			       struct ndb_writer_note *note,
   3699 			       unsigned char *scratch, size_t scratch_size)
   3700 {
   3701 	int rc;
   3702 	uint64_t note_key;
   3703 	MDB_dbi note_db;
   3704 	MDB_val key, val;
   3705 
   3706 	// let's quickly sanity check if we already have this note
   3707 	if (ndb_get_notekey_by_id(txn, note->note->id))
   3708 		return 0;
   3709 
   3710 	// get dbs
   3711 	note_db = txn->lmdb->dbs[NDB_DB_NOTE];
   3712 
   3713 	// get new key
   3714 	note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1;
   3715 
   3716 	// write note to event store
   3717 	key.mv_data = &note_key;
   3718 	key.mv_size = sizeof(note_key);
   3719 	val.mv_data = note->note;
   3720 	val.mv_size = note->note_len;
   3721 
   3722 	if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) {
   3723 		ndb_debug("write note to db failed: %s\n", mdb_strerror(rc));
   3724 		return 0;
   3725 	}
   3726 
   3727 	ndb_write_note_id_index(txn, note->note, note_key);
   3728 	ndb_write_note_kind_index(txn, note->note, note_key);
   3729 	ndb_write_note_tag_index(txn, note->note, note_key);
   3730 
   3731 	// only parse content and do fulltext index on text and longform notes
   3732 	if (note->note->kind == 1 || note->note->kind == 30023) {
   3733 		if (!ndb_write_note_fulltext_index(txn, note->note, note_key))
   3734 			return 0;
   3735 
   3736 		// write note blocks
   3737 		ndb_write_new_blocks(txn, note->note, note_key, scratch,
   3738 				     scratch_size);
   3739 	}
   3740 
   3741 	if (note->note->kind == 7) {
   3742 		ndb_write_reaction_stats(txn, note->note);
   3743 	}
   3744 
   3745 	return note_key;
   3746 }
   3747 
   3748 // only to be called from the writer thread
   3749 static void ndb_write_version(struct ndb_txn *txn, uint64_t version)
   3750 {
   3751 	int rc;
   3752 	MDB_val key, val;
   3753 	uint64_t version_key;
   3754 
   3755 	version_key = NDB_META_KEY_VERSION;
   3756 
   3757 	key.mv_data = &version_key;
   3758 	key.mv_size = sizeof(version_key);
   3759 	val.mv_data = &version;
   3760 	val.mv_size = sizeof(version);
   3761 
   3762 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) {
   3763 		ndb_debug("write version to ndb_meta failed: %s\n",
   3764 				mdb_strerror(rc));
   3765 		return;
   3766 	}
   3767 
   3768 	//fprintf(stderr, "writing version %" PRIu64 "\n", version);
   3769 }
   3770 
   3771 struct written_note {
   3772 	uint64_t note_id;
   3773 	struct ndb_writer_note *note;
   3774 };
   3775 
   3776 // When the data has been committed to the database, take all of the written
   3777 // notes, check them against subscriptions, and then write to the subscription
   3778 // inbox for all matching notes
   3779 static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
   3780 				     struct written_note *wrote, int num_notes)
   3781 {
   3782 	int i, k;
   3783 	int pushed;
   3784 	struct written_note *written;
   3785 	struct ndb_note *note;
   3786 	struct ndb_subscription *sub;
   3787 
   3788 	for (i = 0; i < monitor->num_subscriptions; i++) {
   3789 		sub = &monitor->subscriptions[i];
   3790 		ndb_debug("checking subscription %d, %d notes\n", i, num_notes);
   3791 
   3792 		pushed = 0;
   3793 		for (k = 0; k < num_notes; k++) {
   3794 			written = &wrote[k];
   3795 			note = written->note->note;
   3796 
   3797 			if (ndb_filter_group_matches(&sub->group, note)) {
   3798 				ndb_debug("pushing note\n");
   3799 
   3800 				if (!prot_queue_push(&sub->inbox, &written->note_id)) {
   3801 					ndb_debug("couldn't push note to subscriber");
   3802 				} else {
   3803 					pushed++;
   3804 				}
   3805 			} else {
   3806 				ndb_debug("not pushing note\n");
   3807 			}
   3808 		}
   3809 
   3810 		// After pushing all of the matching notes, check to see if we
   3811 		// have a registered subscription callback. If so, we call it.
   3812 		// The callback needs to call ndb_poll_for_notes to pull data
   3813 		// that was just pushed to the queue in the for loop above.
   3814 		if (monitor->sub_cb != NULL && pushed > 0) {
   3815 			monitor->sub_cb(monitor->sub_cb_ctx, sub->subid);
   3816 		}
   3817 	}
   3818 }
   3819 
   3820 static void *ndb_writer_thread(void *data)
   3821 {
   3822 	struct ndb_writer *writer = data;
   3823 	struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
   3824 	struct written_note written_notes[THREAD_QUEUE_BATCH];
   3825 	size_t scratch_size;
   3826 	int i, popped, done, any_note, num_notes;
   3827 	uint64_t note_nkey;
   3828 	struct ndb_txn txn;
   3829 	unsigned char *scratch;
   3830 
   3831 	// 8mb scratch buffer for parsing note content
   3832 	scratch_size = 8 * 1024 * 1024;
   3833 	scratch = malloc(scratch_size);
   3834 	MDB_txn *mdb_txn = NULL;
   3835 	ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn);
   3836 
   3837 	done = 0;
   3838 	while (!done) {
   3839 		txn.mdb_txn = NULL;
   3840 		num_notes = 0;
   3841 		popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
   3842 		ndb_debug("writer popped %d items\n", popped);
   3843 
   3844 		any_note = 0;
   3845 		for (i = 0 ; i < popped; i++) {
   3846 			msg = &msgs[i];
   3847 			switch (msg->type) {
   3848 			case NDB_WRITER_NOTE: any_note = 1; break;
   3849 			case NDB_WRITER_PROFILE: any_note = 1; break;
   3850 			case NDB_WRITER_DBMETA: any_note = 1; break;
   3851 			case NDB_WRITER_PROFILE_LAST_FETCH: any_note = 1; break;
   3852 			case NDB_WRITER_BLOCKS: any_note = 1; break;
   3853 			case NDB_WRITER_QUIT: break;
   3854 			}
   3855 		}
   3856 
   3857 		if (any_note && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn))
   3858 		{
   3859 			fprintf(stderr, "writer thread txn_begin failed");
   3860 			// should definitely not happen unless DB is full
   3861 			// or something ?
   3862 			continue;
   3863 		}
   3864 
   3865 		for (i = 0; i < popped; i++) {
   3866 			msg = &msgs[i];
   3867 
   3868 			switch (msg->type) {
   3869 			case NDB_WRITER_QUIT:
   3870 				// quits are handled before this
   3871 				done = 1;
   3872 				continue;
   3873 			case NDB_WRITER_PROFILE:
   3874 				note_nkey =
   3875 					ndb_write_note(&txn, &msg->note,
   3876 						       scratch, scratch_size);
   3877 				if (note_nkey > 0) {
   3878 					written_notes[num_notes++] =
   3879 					(struct written_note){
   3880 						.note_id = note_nkey,
   3881 						.note = &msg->note,
   3882 					};
   3883 				} else {
   3884 					ndb_debug("failed to write note\n");
   3885 				}
   3886 				if (msg->profile.record.builder) {
   3887 					// only write if parsing didn't fail
   3888 					ndb_write_profile(&txn, &msg->profile,
   3889 							  note_nkey);
   3890 				}
   3891 				break;
   3892 			case NDB_WRITER_NOTE:
   3893 				note_nkey = ndb_write_note(&txn, &msg->note,
   3894 							   scratch,
   3895 							   scratch_size);
   3896 
   3897 				if (note_nkey > 0) {
   3898 					written_notes[num_notes++] = (struct written_note){
   3899 						.note_id = note_nkey,
   3900 						.note = &msg->note,
   3901 					};
   3902 				}
   3903 				break;
   3904 			case NDB_WRITER_DBMETA:
   3905 				ndb_write_version(&txn, msg->ndb_meta.version);
   3906 				break;
   3907 			case NDB_WRITER_BLOCKS:
   3908 				ndb_write_blocks(&txn, msg->blocks.note_key,
   3909 						       msg->blocks.blocks);
   3910 				break;
   3911 			case NDB_WRITER_PROFILE_LAST_FETCH:
   3912 				ndb_writer_last_profile_fetch(&txn,
   3913 						msg->last_fetch.pubkey,
   3914 						msg->last_fetch.fetched_at
   3915 						);
   3916 				break;
   3917 			}
   3918 		}
   3919 
   3920 		// commit writes
   3921 		if (any_note) {
   3922 			if (!ndb_end_query(&txn)) {
   3923 				ndb_debug("writer thread txn commit failed\n");
   3924 			} else {
   3925 				ndb_debug("notifying subscriptions, %d notes\n", num_notes);
   3926 				ndb_notify_subscriptions(writer->monitor,
   3927 							 written_notes,
   3928 							 num_notes);
   3929 				// update subscriptions
   3930 			}
   3931 		}
   3932 
   3933 		// free notes
   3934 		for (i = 0; i < popped; i++) {
   3935 			msg = &msgs[i];
   3936 			if (msg->type == NDB_WRITER_NOTE) {
   3937 				free(msg->note.note);
   3938 			} else if (msg->type == NDB_WRITER_PROFILE) {
   3939 				free(msg->profile.note.note);
   3940 				ndb_profile_record_builder_free(&msg->profile.record);
   3941 			}  else if (msg->type == NDB_WRITER_BLOCKS) {
   3942 				ndb_blocks_free(msg->blocks.blocks);
   3943 			}
   3944 		}
   3945 	}
   3946 
   3947 	free(scratch);
   3948 	ndb_debug("quitting writer thread\n");
   3949 	return NULL;
   3950 }
   3951 
   3952 static void *ndb_ingester_thread(void *data)
   3953 {
   3954 	secp256k1_context *ctx;
   3955 	struct thread *thread = data;
   3956 	struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx;
   3957 	struct ndb_lmdb *lmdb = ingester->writer->lmdb;
   3958 	struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg;
   3959 	struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out;
   3960 	int i, to_write, popped, done, any_event;
   3961 	MDB_txn *read_txn = NULL;
   3962 	int rc;
   3963 
   3964 	ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
   3965 	ndb_debug("started ingester thread\n");
   3966 
   3967 	done = 0;
   3968 	while (!done) {
   3969 		to_write = 0;
   3970 		any_event = 0;
   3971 
   3972 		popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH);
   3973 		ndb_debug("ingester popped %d items\n", popped);
   3974 
   3975 		for (i = 0; i < popped; i++) {
   3976 			msg = &msgs[i];
   3977 			if (msg->type == NDB_INGEST_EVENT) {
   3978 				any_event = 1;
   3979 				break;
   3980 			}
   3981 		}
   3982 
   3983 		if (any_event && (rc = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn))) {
   3984 			// this is bad
   3985 			fprintf(stderr, "UNUSUAL ndb_ingester: mdb_txn_begin failed: '%s'\n",
   3986 					mdb_strerror(rc));
   3987 			continue;
   3988 		}
   3989 
   3990 		for (i = 0; i < popped; i++) {
   3991 			msg = &msgs[i];
   3992 			switch (msg->type) {
   3993 			case NDB_INGEST_QUIT:
   3994 				done = 1;
   3995 				break;
   3996 
   3997 			case NDB_INGEST_EVENT:
   3998 				out = &outs[to_write];
   3999 				if (ndb_ingester_process_event(ctx, ingester,
   4000 							       &msg->event, out,
   4001 							       read_txn)) {
   4002 					to_write++;
   4003 				}
   4004 			}
   4005 		}
   4006 
   4007 		if (any_event)
   4008 			mdb_txn_abort(read_txn);
   4009 
   4010 		if (to_write > 0) {
   4011 			ndb_debug("pushing %d events to write queue\n", to_write);
   4012 			if (!ndb_writer_queue_msgs(ingester->writer, outs, to_write)) {
   4013 				ndb_debug("failed pushing %d events to write queue\n", to_write);
   4014 			}
   4015 		}
   4016 	}
   4017 
   4018 	ndb_debug("quitting ingester thread\n");
   4019 	secp256k1_context_destroy(ctx);
   4020 	return NULL;
   4021 }
   4022 
   4023 
   4024 static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb,
   4025 			   struct ndb_monitor *monitor)
   4026 {
   4027 	writer->lmdb = lmdb;
   4028 	writer->monitor = monitor;
   4029 	writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE;
   4030 	writer->queue_buf = malloc(writer->queue_buflen);
   4031 	if (writer->queue_buf == NULL) {
   4032 		fprintf(stderr, "ndb: failed to allocate space for writer queue");
   4033 		return 0;
   4034 	}
   4035 
   4036 	// init the writer queue.
   4037 	prot_queue_init(&writer->inbox, writer->queue_buf,
   4038 			writer->queue_buflen, sizeof(struct ndb_writer_msg));
   4039 
   4040 	// spin up the writer thread
   4041 	if (pthread_create(&writer->thread_id, NULL, ndb_writer_thread, writer))
   4042 	{
   4043 		fprintf(stderr, "ndb writer thread failed to create\n");
   4044 		return 0;
   4045 	}
   4046 
   4047 	return 1;
   4048 }
   4049 
   4050 // initialize the ingester queue and then spawn the thread
   4051 static int ndb_ingester_init(struct ndb_ingester *ingester,
   4052 			     struct ndb_writer *writer,
   4053 			     const struct ndb_config *config)
   4054 {
   4055 	int elem_size, num_elems;
   4056 	static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT };
   4057 
   4058 	// TODO: configurable queue sizes
   4059 	elem_size = sizeof(struct ndb_ingester_msg);
   4060 	num_elems = DEFAULT_QUEUE_SIZE;
   4061 
   4062 	ingester->writer = writer;
   4063 	ingester->flags = config->flags;
   4064 	ingester->filter = config->ingest_filter;
   4065 	ingester->filter_context = config->filter_context;
   4066 
   4067 	if (!threadpool_init(&ingester->tp, config->ingester_threads,
   4068 			     elem_size, num_elems, &quit_msg, ingester,
   4069 			     ndb_ingester_thread))
   4070 	{
   4071 		fprintf(stderr, "ndb ingester threadpool failed to init\n");
   4072 		return 0;
   4073 	}
   4074 
   4075 	return 1;
   4076 }
   4077 
   4078 static int ndb_writer_destroy(struct ndb_writer *writer)
   4079 {
   4080 	struct ndb_writer_msg msg;
   4081 
   4082 	// kill thread
   4083 	msg.type = NDB_WRITER_QUIT;
   4084 	if (!prot_queue_push(&writer->inbox, &msg)) {
   4085 		// queue is too full to push quit message. just kill it.
   4086 		pthread_exit(&writer->thread_id);
   4087 	} else {
   4088 		pthread_join(writer->thread_id, NULL);
   4089 	}
   4090 
   4091 	// cleanup
   4092 	prot_queue_destroy(&writer->inbox);
   4093 
   4094 	free(writer->queue_buf);
   4095 
   4096 	return 1;
   4097 }
   4098 
   4099 static int ndb_ingester_destroy(struct ndb_ingester *ingester)
   4100 {
   4101 	threadpool_destroy(&ingester->tp);
   4102 	return 1;
   4103 }
   4104 
   4105 static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
   4106 				    char *json, unsigned len, unsigned client)
   4107 {
   4108 	struct ndb_ingester_msg msg;
   4109 	msg.type = NDB_INGEST_EVENT;
   4110 
   4111 	msg.event.json = json;
   4112 	msg.event.len = len;
   4113 	msg.event.client = client;
   4114 
   4115 	return threadpool_dispatch(&ingester->tp, &msg);
   4116 }
   4117 
   4118 static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize)
   4119 {
   4120 	int rc;
   4121 	MDB_txn *txn;
   4122 
   4123 	if ((rc = mdb_env_create(&lmdb->env))) {
   4124 		fprintf(stderr, "mdb_env_create failed, error %d\n", rc);
   4125 		return 0;
   4126 	}
   4127 
   4128 	if ((rc = mdb_env_set_mapsize(lmdb->env, mapsize))) {
   4129 		fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc);
   4130 		return 0;
   4131 	}
   4132 
   4133 	if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) {
   4134 		fprintf(stderr, "mdb_env_set_maxdbs failed, error %d\n", rc);
   4135 		return 0;
   4136 	}
   4137 
   4138 	if ((rc = mdb_env_open(lmdb->env, filename, 0, 0664))) {
   4139 		fprintf(stderr, "mdb_env_open failed, error %d\n", rc);
   4140 		return 0;
   4141 	}
   4142 
   4143 	// Initialize DBs
   4144 	if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) {
   4145 		fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc);
   4146 		return 0;
   4147 	}
   4148 
   4149 	// note flatbuffer db
   4150 	if ((rc = mdb_dbi_open(txn, "note", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NOTE]))) {
   4151 		fprintf(stderr, "mdb_dbi_open event failed, error %d\n", rc);
   4152 		return 0;
   4153 	}
   4154 
   4155 	// note metadata db
   4156 	if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) {
   4157 		fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc);
   4158 		return 0;
   4159 	}
   4160 
   4161 	// profile flatbuffer db
   4162 	if ((rc = mdb_dbi_open(txn, "profile", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_PROFILE]))) {
   4163 		fprintf(stderr, "mdb_dbi_open profile failed, error %d\n", rc);
   4164 		return 0;
   4165 	}
   4166 
   4167 	// profile search db
   4168 	if ((rc = mdb_dbi_open(txn, "profile_search", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_SEARCH]))) {
   4169 		fprintf(stderr, "mdb_dbi_open profile_search failed, error %d\n", rc);
   4170 		return 0;
   4171 	}
   4172 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], ndb_search_key_cmp);
   4173 
   4174 	// ndb metadata (db version, etc)
   4175 	if ((rc = mdb_dbi_open(txn, "ndb_meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NDB_META]))) {
   4176 		fprintf(stderr, "mdb_dbi_open ndb_meta failed, error %d\n", rc);
   4177 		return 0;
   4178 	}
   4179 
   4180 	// profile last fetches
   4181 	if ((rc = mdb_dbi_open(txn, "profile_last_fetch", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH]))) {
   4182 		fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc);
   4183 		return 0;
   4184 	}
   4185 
   4186 	// id+ts index flags
   4187 	unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED;
   4188 
   4189 	// index dbs
   4190 	if ((rc = mdb_dbi_open(txn, "note_id", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_ID]))) {
   4191 		fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
   4192 		return 0;
   4193 	}
   4194 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare);
   4195 
   4196 	if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) {
   4197 		fprintf(stderr, "mdb_dbi_open profile_pk failed: %s\n", mdb_strerror(rc));
   4198 		return 0;
   4199 	}
   4200 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare);
   4201 
   4202 	if ((rc = mdb_dbi_open(txn, "note_kind",
   4203 			       MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED,
   4204 			       &lmdb->dbs[NDB_DB_NOTE_KIND]))) {
   4205 		fprintf(stderr, "mdb_dbi_open note_kind failed: %s\n", mdb_strerror(rc));
   4206 		return 0;
   4207 	}
   4208 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_tsid_compare);
   4209 
   4210 	if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT,
   4211 			       &lmdb->dbs[NDB_DB_NOTE_TEXT]))) {
   4212 		fprintf(stderr, "mdb_dbi_open note_text failed: %s\n", mdb_strerror(rc));
   4213 		return 0;
   4214 	}
   4215 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare);
   4216 
   4217 	if ((rc = mdb_dbi_open(txn, "note_blocks", MDB_CREATE | MDB_INTEGERKEY,
   4218 			       &lmdb->dbs[NDB_DB_NOTE_BLOCKS]))) {
   4219 		fprintf(stderr, "mdb_dbi_open note_blocks failed: %s\n", mdb_strerror(rc));
   4220 		return 0;
   4221 	}
   4222 
   4223 	if ((rc = mdb_dbi_open(txn, "note_tags", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED,
   4224 			       &lmdb->dbs[NDB_DB_NOTE_TAGS]))) {
   4225 		fprintf(stderr, "mdb_dbi_open note_tags failed: %s\n", mdb_strerror(rc));
   4226 		return 0;
   4227 	}
   4228 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TAGS], ndb_tag_key_compare);
   4229 
   4230 	// Commit the transaction
   4231 	if ((rc = mdb_txn_commit(txn))) {
   4232 		fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc);
   4233 		return 0;
   4234 	}
   4235 
   4236 	return 1;
   4237 }
   4238 
   4239 static int ndb_queue_write_version(struct ndb *ndb, uint64_t version)
   4240 {
   4241 	struct ndb_writer_msg msg;
   4242 	msg.type = NDB_WRITER_DBMETA;
   4243 	msg.ndb_meta.version = version;
   4244 	return ndb_writer_queue_msg(&ndb->writer, &msg);
   4245 }
   4246 
   4247 static int ndb_run_migrations(struct ndb *ndb)
   4248 {
   4249 	int64_t version, latest_version, i;
   4250 
   4251 	latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]);
   4252 
   4253 	if ((version = ndb_db_version(ndb)) == -1) {
   4254 		ndb_debug("run_migrations: no version found, assuming new db\n");
   4255 		version = latest_version;
   4256 
   4257 		// no version found. fresh db?
   4258 		if (!ndb_queue_write_version(ndb, version)) {
   4259 			fprintf(stderr, "run_migrations: failed writing db version");
   4260 			return 0;
   4261 		}
   4262 
   4263 		return 1;
   4264 	} else {
   4265 		ndb_debug("ndb: version %" PRIu64 " found\n", version);
   4266 	}
   4267 
   4268 	if (version < latest_version)
   4269 		ndb_debug("nostrdb: migrating v%d -> v%d\n",
   4270 				(int)version, (int)latest_version);
   4271 
   4272 	for (i = version; i < latest_version; i++) {
   4273 		if (!MIGRATIONS[i].fn(ndb)) {
   4274 			fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1));
   4275 			return 0;
   4276 		}
   4277 
   4278 		if (!ndb_queue_write_version(ndb, i+1)) {
   4279 			fprintf(stderr, "run_migrations: failed writing db version");
   4280 			return 0;
   4281 		}
   4282 
   4283 		version = i+1;
   4284 	}
   4285 
   4286 	ndb->version = version;
   4287 
   4288 	return 1;
   4289 }
   4290 
   4291 static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb,
   4292 			     void *sub_cb_ctx)
   4293 {
   4294 	monitor->num_subscriptions = 0;
   4295 	monitor->sub_cb = cb;
   4296 	monitor->sub_cb_ctx = sub_cb_ctx;
   4297 }
   4298 
   4299 void ndb_filter_group_destroy(struct ndb_filter_group *group)
   4300 {
   4301 	struct ndb_filter *filter;
   4302 	int i;
   4303 	for (i = 0; i < group->num_filters; i++) {
   4304 		filter = &group->filters[i];
   4305 		ndb_filter_destroy(filter);
   4306 	}
   4307 }
   4308 
   4309 static void ndb_subscription_destroy(struct ndb_subscription *sub)
   4310 {
   4311 	ndb_filter_group_destroy(&sub->group);
   4312 	prot_queue_destroy(&sub->inbox);
   4313 	sub->subid = 0;
   4314 }
   4315 
   4316 static void ndb_monitor_destroy(struct ndb_monitor *monitor)
   4317 {
   4318 	int i;
   4319 
   4320 	for (i = 0; i < monitor->num_subscriptions; i++) {
   4321 		ndb_subscription_destroy(&monitor->subscriptions[i]);
   4322 	}
   4323 }
   4324 
   4325 int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config)
   4326 {
   4327 	struct ndb *ndb;
   4328 	//MDB_dbi ind_id; // TODO: ind_pk, etc
   4329 
   4330 	ndb = *pndb = calloc(1, sizeof(struct ndb));
   4331 	ndb->flags = config->flags;
   4332 
   4333 	if (ndb == NULL) {
   4334 		fprintf(stderr, "ndb_init: malloc failed\n");
   4335 		return 0;
   4336 	}
   4337 
   4338 	if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize))
   4339 		return 0;
   4340 
   4341 	ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx);
   4342 
   4343 	if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor)) {
   4344 		fprintf(stderr, "ndb_writer_init failed\n");
   4345 		return 0;
   4346 	}
   4347 
   4348 	if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) {
   4349 		fprintf(stderr, "failed to initialize %d ingester thread(s)\n",
   4350 				config->ingester_threads);
   4351 		return 0;
   4352 	}
   4353 
   4354 	if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE) &&
   4355 			  !ndb_run_migrations(ndb)) {
   4356 		fprintf(stderr, "failed to run migrations\n");
   4357 		return 0;
   4358 	}
   4359 
   4360 	// Initialize LMDB environment and spin up threads
   4361 	return 1;
   4362 }
   4363 
   4364 void ndb_destroy(struct ndb *ndb)
   4365 {
   4366 	if (ndb == NULL)
   4367 		return;
   4368 
   4369 	// ingester depends on writer and must be destroyed first
   4370 	ndb_ingester_destroy(&ndb->ingester);
   4371 	ndb_writer_destroy(&ndb->writer);
   4372 	ndb_monitor_destroy(&ndb->monitor);
   4373 
   4374 	mdb_env_close(ndb->lmdb.env);
   4375 
   4376 	free(ndb);
   4377 }
   4378 
   4379 // Process a nostr event from a client
   4380 //
   4381 // ie: ["EVENT", {"content":"..."} ...]
   4382 //
   4383 // The client-sent variation of ndb_process_event
   4384 int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
   4385 {
   4386 	// Since we need to return as soon as possible, and we're not
   4387 	// making any assumptions about the lifetime of the string, we
   4388 	// definitely need to copy the json here. In the future once we
   4389 	// have our thread that manages a websocket connection, we can
   4390 	// avoid the copy and just use the buffer we get from that
   4391 	// thread.
   4392 	char *json_copy = strdupn(json, len);
   4393 	if (json_copy == NULL)
   4394 		return 0;
   4395 
   4396 	return ndb_ingester_queue_event(&ndb->ingester, json_copy, len, 1);
   4397 }
   4398 
   4399 // Process anostr event from a relay,
   4400 //
   4401 // ie: ["EVENT", "subid", {"content":"..."}...]
   4402 //
   4403 // This function returns as soon as possible, first copying the passed
   4404 // json and then queueing it up for processing. Worker threads then take
   4405 // the json and process it.
   4406 //
   4407 // Processing:
   4408 //
   4409 // 1. The event is parsed into ndb_notes and the signature is validated
   4410 // 2. A quick lookup is made on the database to see if we already have
   4411 //    the note id, if we do we don't need to waste time on json parsing
   4412 //    or note validation.
   4413 // 3. Once validation is done we pass it to the writer queue for writing
   4414 //    to LMDB.
   4415 //
   4416 int ndb_process_event(struct ndb *ndb, const char *json, int json_len)
   4417 {
   4418 	// Since we need to return as soon as possible, and we're not
   4419 	// making any assumptions about the lifetime of the string, we
   4420 	// definitely need to copy the json here. In the future once we
   4421 	// have our thread that manages a websocket connection, we can
   4422 	// avoid the copy and just use the buffer we get from that
   4423 	// thread.
   4424 	char *json_copy = strdupn(json, json_len);
   4425 	if (json_copy == NULL)
   4426 		return 0;
   4427 
   4428 	return ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len, 0);
   4429 }
   4430 
   4431 
   4432 int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client)
   4433 {
   4434 	const char *start, *end, *very_end;
   4435 	start = ldjson;
   4436 	end = start + json_len;
   4437 	very_end = ldjson + json_len;
   4438 	int (* process)(struct ndb *, const char *, int);
   4439 #if DEBUG
   4440 	int processed = 0;
   4441 #endif
   4442 	process = client ? ndb_process_client_event : ndb_process_event;
   4443 
   4444 	while ((end = fast_strchr(start, '\n', very_end - start))) {
   4445 		//printf("processing '%.*s'\n", (int)(end-start), start);
   4446 		if (!process(ndb, start, end - start)) {
   4447 			ndb_debug("ndb_process_client_event failed\n");
   4448 			return 0;
   4449 		}
   4450 		start = end + 1;
   4451 #if DEBUG
   4452 		processed++;
   4453 #endif
   4454 	}
   4455 
   4456 #if DEBUG
   4457 	ndb_debug("ndb_process_events: processed %d events\n", processed);
   4458 #endif
   4459 
   4460 	return 1;
   4461 }
   4462 
   4463 int ndb_process_events_stream(struct ndb *ndb, FILE* fp)
   4464 {
   4465 	char *line = NULL;
   4466 	size_t len = 0;
   4467 	ssize_t nread;
   4468 
   4469 	while ((nread = getline(&line, &len, fp)) != -1) {
   4470 		if (line == NULL)
   4471 			break;
   4472 		ndb_process_event(ndb, line, len);
   4473 	}
   4474 
   4475 	if (line)
   4476 		free(line);
   4477 
   4478 	return 1;
   4479 }
   4480 
   4481 int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len)
   4482 {
   4483 	return _ndb_process_events(ndb, ldjson, json_len, 1);
   4484 }
   4485 
   4486 int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len)
   4487 {
   4488 	return _ndb_process_events(ndb, ldjson, json_len, 0);
   4489 }
   4490 
   4491 static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag)
   4492 {
   4493 	return cursor_push_u16(cur, tag->count);
   4494 }
   4495 
   4496 int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf,
   4497 		     size_t bufsize)
   4498 {
   4499 	struct ndb_note *note;
   4500 	int half, size, str_indices_size;
   4501 
   4502 	// come on bruh
   4503 	if (bufsize < sizeof(struct ndb_note) * 2)
   4504 		return 0;
   4505 
   4506 	str_indices_size = bufsize / 32;
   4507 	size = bufsize - str_indices_size;
   4508 	half = size / 2;
   4509 
   4510 	//debug("size %d half %d str_indices %d\n", size, half, str_indices_size);
   4511 
   4512 	// make a safe cursor of our available memory
   4513 	make_cursor(buf, buf + bufsize, &builder->mem);
   4514 
   4515 	note = builder->note = (struct ndb_note *)buf;
   4516 
   4517 	// take slices of the memory into subcursors
   4518 	if (!(cursor_slice(&builder->mem, &builder->note_cur, half) &&
   4519 	      cursor_slice(&builder->mem, &builder->strings, half) &&
   4520 	      cursor_slice(&builder->mem, &builder->str_indices, str_indices_size))) {
   4521 		return 0;
   4522 	}
   4523 
   4524 	memset(note, 0, sizeof(*note));
   4525 	builder->note_cur.p += sizeof(*note);
   4526 
   4527 	note->strings = builder->strings.start - buf;
   4528 	note->version = 1;
   4529 
   4530 	return 1;
   4531 }
   4532 
   4533 
   4534 
   4535 static inline int ndb_json_parser_init(struct ndb_json_parser *p,
   4536 				       const char *json, int json_len,
   4537 				       unsigned char *buf, int bufsize)
   4538 {
   4539 	int half = bufsize / 2;
   4540 
   4541 	unsigned char *tok_start = buf + half;
   4542 	unsigned char *tok_end = buf + bufsize;
   4543 
   4544 	p->toks = (jsmntok_t*)tok_start;
   4545 	p->toks_end = (jsmntok_t*)tok_end;
   4546 	p->num_tokens = 0;
   4547 	p->json = json;
   4548 	p->json_len = json_len;
   4549 
   4550 	// ndb_builder gets the first half of the buffer, and jsmn gets the
   4551 	// second half. I like this way of alloating memory (without actually
   4552 	// dynamically allocating memory). You get one big chunk upfront and
   4553 	// then submodules can recursively subdivide it. Maybe you could do
   4554 	// something even more clever like golden-ratio style subdivision where
   4555 	// the more important stuff gets a larger chunk and then it spirals
   4556 	// downward into smaller chunks. Thanks for coming to my TED talk.
   4557 
   4558 	if (!ndb_builder_init(&p->builder, buf, half))
   4559 		return 0;
   4560 
   4561 	jsmn_init(&p->json_parser);
   4562 
   4563 	return 1;
   4564 }
   4565 
   4566 static inline int ndb_json_parser_parse(struct ndb_json_parser *p,
   4567 					struct ndb_id_cb *cb)
   4568 {
   4569 	jsmntok_t *tok;
   4570 	int cap = ((unsigned char *)p->toks_end - (unsigned char*)p->toks)/sizeof(*p->toks);
   4571 	int res =
   4572 		jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap, cb != NULL);
   4573 
   4574 	// got an ID!
   4575 	if (res == -42) {
   4576 		tok = &p->toks[p->json_parser.toknext-1];
   4577 
   4578 		switch (cb->fn(cb->data, p->json + tok->start)) {
   4579 		case NDB_IDRES_CONT:
   4580 			res = jsmn_parse(&p->json_parser, p->json, p->json_len,
   4581 					 p->toks, cap, 0);
   4582 			break;
   4583 		case NDB_IDRES_STOP:
   4584 			return -42;
   4585 		}
   4586 	} else if (res == 0) {
   4587 		return 0;
   4588 	}
   4589 
   4590 	p->num_tokens = res;
   4591 	p->i = 0;
   4592 
   4593 	return 1;
   4594 }
   4595 
   4596 static inline int toksize(jsmntok_t *tok)
   4597 {
   4598 	return tok->end - tok->start;
   4599 }
   4600 
   4601 
   4602 
   4603 static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2)
   4604 {
   4605 	switch (c2) {
   4606 	case 't':  return cursor_push_byte(cur, '\t');
   4607 	case 'n':  return cursor_push_byte(cur, '\n');
   4608 	case 'r':  return cursor_push_byte(cur, '\r');
   4609 	case 'b':  return cursor_push_byte(cur, '\b');
   4610 	case 'f':  return cursor_push_byte(cur, '\f');
   4611 	case '\\': return cursor_push_byte(cur, '\\');
   4612 	case '/':  return cursor_push_byte(cur, '/');
   4613 	case '"':  return cursor_push_byte(cur, '"');
   4614 	case 'u':
   4615 		// these aren't handled yet
   4616 		return 0;
   4617 	default:
   4618 		return cursor_push_byte(cur, c1) && cursor_push_byte(cur, c2);
   4619 	}
   4620 }
   4621 
   4622 static int cursor_push_escaped_char(struct cursor *cur, char c)
   4623 {
   4624         switch (c) {
   4625         case '"':  return cursor_push_str(cur, "\\\"");
   4626         case '\\': return cursor_push_str(cur, "\\\\");
   4627         case '\b': return cursor_push_str(cur, "\\b");
   4628         case '\f': return cursor_push_str(cur, "\\f");
   4629         case '\n': return cursor_push_str(cur, "\\n");
   4630         case '\r': return cursor_push_str(cur, "\\r");
   4631         case '\t': return cursor_push_str(cur, "\\t");
   4632         // TODO: \u hex hex hex hex
   4633         }
   4634         return cursor_push_byte(cur, c);
   4635 }
   4636 
   4637 static int cursor_push_hex_str(struct cursor *cur, unsigned char *buf, int len)
   4638 {
   4639 	int i;
   4640 
   4641 	if (len % 2 != 0)
   4642 		return 0;
   4643 
   4644         if (!cursor_push_byte(cur, '"'))
   4645                 return 0;
   4646 
   4647 	for (i = 0; i < len; i++) {
   4648 		unsigned int c = ((const unsigned char *)buf)[i];
   4649 		if (!cursor_push_byte(cur, hexchar(c >> 4)))
   4650 			return 0;
   4651 		if (!cursor_push_byte(cur, hexchar(c & 0xF)))
   4652 			return 0;
   4653 	}
   4654 
   4655         if (!cursor_push_byte(cur, '"'))
   4656                 return 0;
   4657 
   4658 	return 1;
   4659 }
   4660 
   4661 static int cursor_push_jsonstr(struct cursor *cur, const char *str)
   4662 {
   4663 	int i;
   4664         int len;
   4665 
   4666 	len = strlen(str);
   4667 
   4668         if (!cursor_push_byte(cur, '"'))
   4669                 return 0;
   4670 
   4671         for (i = 0; i < len; i++) {
   4672                 if (!cursor_push_escaped_char(cur, str[i]))
   4673                         return 0;
   4674         }
   4675 
   4676         if (!cursor_push_byte(cur, '"'))
   4677                 return 0;
   4678 
   4679         return 1;
   4680 }
   4681 
   4682 
   4683 static inline int cursor_push_json_tag_str(struct cursor *cur, struct ndb_str str)
   4684 {
   4685 	if (str.flag == NDB_PACKED_ID)
   4686 		return cursor_push_hex_str(cur, str.id, 32);
   4687 
   4688 	return cursor_push_jsonstr(cur, str.str);
   4689 }
   4690 
   4691 static int cursor_push_json_tag(struct cursor *cur, struct ndb_note *note,
   4692 				struct ndb_tag *tag)
   4693 {
   4694         int i;
   4695 
   4696         if (!cursor_push_byte(cur, '['))
   4697                 return 0;
   4698 
   4699         for (i = 0; i < tag->count; i++) {
   4700                 if (!cursor_push_json_tag_str(cur, ndb_tag_str(note, tag, i)))
   4701                         return 0;
   4702                 if (i != tag->count-1 && !cursor_push_byte(cur, ','))
   4703 			return 0;
   4704         }
   4705 
   4706         return cursor_push_byte(cur, ']');
   4707 }
   4708 
   4709 static int cursor_push_json_tags(struct cursor *cur, struct ndb_note *note)
   4710 {
   4711 	int i;
   4712 	struct ndb_iterator iter, *it = &iter;
   4713 	ndb_tags_iterate_start(note, it);
   4714 
   4715         if (!cursor_push_byte(cur, '['))
   4716                 return 0;
   4717 
   4718 	i = 0;
   4719 	while (ndb_tags_iterate_next(it)) {
   4720 		if (!cursor_push_json_tag(cur, note, it->tag))
   4721 			return 0;
   4722                 if (i != note->tags.count-1 && !cursor_push_str(cur, ","))
   4723 			return 0;
   4724 		i++;
   4725 	}
   4726 
   4727         if (!cursor_push_byte(cur, ']'))
   4728                 return 0;
   4729 
   4730 	return 1;
   4731 }
   4732 
   4733 static int ndb_event_commitment(struct ndb_note *ev, unsigned char *buf, int buflen)
   4734 {
   4735 	char timebuf[16] = {0};
   4736 	char kindbuf[16] = {0};
   4737 	char pubkey[65];
   4738 	struct cursor cur;
   4739 	int ok;
   4740 
   4741 	if (!hex_encode(ev->pubkey, sizeof(ev->pubkey), pubkey))
   4742 		return 0;
   4743 
   4744 	make_cursor(buf, buf + buflen, &cur);
   4745 
   4746 	// TODO: update in 2106 ...
   4747 	snprintf(timebuf, sizeof(timebuf), "%d", (uint32_t)ev->created_at);
   4748 	snprintf(kindbuf, sizeof(kindbuf), "%d", ev->kind);
   4749 
   4750 	ok =
   4751 		cursor_push_str(&cur, "[0,\"") &&
   4752 		cursor_push_str(&cur, pubkey) &&
   4753 		cursor_push_str(&cur, "\",") &&
   4754 		cursor_push_str(&cur, timebuf) &&
   4755 		cursor_push_str(&cur, ",") &&
   4756 		cursor_push_str(&cur, kindbuf) &&
   4757 		cursor_push_str(&cur, ",") &&
   4758 		cursor_push_json_tags(&cur, ev) &&
   4759 		cursor_push_str(&cur, ",") &&
   4760 		cursor_push_jsonstr(&cur, ndb_note_str(ev, &ev->content).str) &&
   4761 		cursor_push_str(&cur, "]");
   4762 
   4763 	if (!ok)
   4764 		return 0;
   4765 
   4766 	return cur.p - cur.start;
   4767 }
   4768 
   4769 static int cursor_push_hex(struct cursor *c, unsigned char *bytes, int len)
   4770 {
   4771 	int i;
   4772 	unsigned char chr;
   4773 	if (c->p + (len * 2) >= c->end)
   4774 		return 0;
   4775 
   4776 	for (i = 0; i < len; i++) {
   4777 		chr = bytes[i];
   4778 
   4779 		*(c->p++) = hexchar(chr >> 4);
   4780 		*(c->p++) = hexchar(chr & 0xF);
   4781 	}
   4782 
   4783 	return 1;
   4784 }
   4785 
   4786 static int cursor_push_int_str(struct cursor *c, uint64_t num)
   4787 {
   4788 	char timebuf[16] = {0};
   4789 	snprintf(timebuf, sizeof(timebuf), "%" PRIu64, num);
   4790 	return cursor_push_str(c, timebuf);
   4791 }
   4792 
   4793 int ndb_note_json(struct ndb_note *note, char *buf, int buflen)
   4794 {
   4795 	struct cursor cur, *c = &cur;
   4796 
   4797 	make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, &cur);
   4798 
   4799 	int ok = cursor_push_str(c, "{\"id\":\"") &&
   4800 	       cursor_push_hex(c, ndb_note_id(note), 32) &&
   4801 	       cursor_push_str(c, "\",\"pubkey\":\"") &&
   4802 	       cursor_push_hex(c, ndb_note_pubkey(note), 32) &&
   4803 	       cursor_push_str(c, "\",\"created_at\":") &&
   4804 	       cursor_push_int_str(c, ndb_note_created_at(note)) &&
   4805 	       cursor_push_str(c, ",\"kind\":") &&
   4806 	       cursor_push_int_str(c, ndb_note_kind(note)) &&
   4807 	       cursor_push_str(c, ",\"tags\":") &&
   4808 	       cursor_push_json_tags(c, note) &&
   4809 	       cursor_push_str(c, ",\"content\":") &&
   4810 	       cursor_push_jsonstr(c, ndb_note_content(note)) &&
   4811 	       cursor_push_str(c, ",\"sig\":\"") &&
   4812 	       cursor_push_hex(c, ndb_note_sig(note), 64) &&
   4813 	       cursor_push_c_str(c, "\"}");
   4814 
   4815 	if (!ok) {
   4816 		return 0;
   4817 	}
   4818 
   4819 	return cur.p - cur.start;
   4820 }
   4821 
   4822 static int cursor_push_json_elem_array(struct cursor *cur,
   4823 				       const struct ndb_filter *filter,
   4824 				       struct ndb_filter_elements *elems)
   4825 {
   4826 	int i;
   4827 	unsigned char *id;
   4828 	const char *str;
   4829 	uint64_t val;
   4830 
   4831 	if (!cursor_push_byte(cur, '['))
   4832 		return 0;
   4833 
   4834 	for (i = 0; i < elems->count; i++) {
   4835 
   4836 		switch (elems->field.elem_type)  {
   4837 		case NDB_ELEMENT_STRING:
   4838 			str = ndb_filter_get_string_element(filter, elems, i);
   4839 			if (!cursor_push_jsonstr(cur, str))
   4840 				return 0;
   4841 			break;
   4842 		case NDB_ELEMENT_ID:
   4843 			id = ndb_filter_get_id_element(filter, elems, i);
   4844 			if (!cursor_push_hex_str(cur, id, 32))
   4845 				return 0;
   4846 			break;
   4847 		case NDB_ELEMENT_INT:
   4848 			val = ndb_filter_get_int_element(elems, i);
   4849 			if (!cursor_push_int_str(cur, val))
   4850 				return 0;
   4851 			break;
   4852 		case NDB_ELEMENT_UNKNOWN:
   4853 			ndb_debug("unknown element in cursor_push_json_elem_array");
   4854 			return 0;
   4855 		}
   4856 
   4857 		if (i != elems->count-1) {
   4858 			if (!cursor_push_byte(cur, ','))
   4859 				return 0;
   4860 		}
   4861 	}
   4862 
   4863 	if (!cursor_push_str(cur, "]"))
   4864 		return 0;
   4865 
   4866 	return 1;
   4867 }
   4868 
   4869 int ndb_filter_json(const struct ndb_filter *filter, char *buf, int buflen)
   4870 {
   4871 	struct cursor cur, *c = &cur;
   4872 	struct ndb_filter_elements *elems;
   4873 	int i;
   4874 
   4875 	if (!filter->finalized) {
   4876 		ndb_debug("filter not finalized in ndb_filter_json\n");
   4877 		return 0;
   4878 	}
   4879 
   4880 	make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, c);
   4881 
   4882 	if (!cursor_push_str(c, "{"))
   4883 		return 0;
   4884 
   4885 	for (i = 0; i < filter->num_elements; i++) {
   4886 		elems = ndb_filter_get_elements(filter, i);
   4887 		switch (elems->field.type) {
   4888 		case NDB_FILTER_IDS:
   4889 			if (!cursor_push_str(c, "\"ids\":"))
   4890 				return 0;
   4891 			if (!cursor_push_json_elem_array(c, filter, elems))
   4892 				return 0;
   4893 			break;
   4894 		case NDB_FILTER_AUTHORS:
   4895 			if (!cursor_push_str(c, "\"authors\":"))
   4896 				return 0;
   4897 			if (!cursor_push_json_elem_array(c, filter, elems))
   4898 				return 0;
   4899 			break;
   4900 		case NDB_FILTER_KINDS:
   4901 			if (!cursor_push_str(c, "\"kinds\":"))
   4902 				return 0;
   4903 			if (!cursor_push_json_elem_array(c, filter, elems))
   4904 				return 0;
   4905 			break;
   4906 		case NDB_FILTER_TAGS:
   4907 			if (!cursor_push_str(c, "\"#"))
   4908 				return 0;
   4909 			if (!cursor_push_byte(c, elems->field.tag))
   4910 				return 0;
   4911 			if (!cursor_push_str(c, "\":"))
   4912 				return 0;
   4913 			if (!cursor_push_json_elem_array(c, filter, elems))
   4914 				return 0;
   4915 			break;
   4916 		case NDB_FILTER_SINCE:
   4917 			if (!cursor_push_str(c, "\"since\":"))
   4918 				return 0;
   4919 			if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0)))
   4920 				return 0;
   4921 			break;
   4922 		case NDB_FILTER_UNTIL:
   4923 			if (!cursor_push_str(c, "\"until\":"))
   4924 				return 0;
   4925 			if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0)))
   4926 				return 0;
   4927 			break;
   4928 		case NDB_FILTER_LIMIT:
   4929 			if (!cursor_push_str(c, "\"limit\":"))
   4930 				return 0;
   4931 			if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0)))
   4932 				return 0;
   4933 			break;
   4934 		}
   4935 
   4936 		if (i != filter->num_elements-1) {
   4937 			if (!cursor_push_byte(c, ',')) {
   4938 				return 0;
   4939 			}
   4940 		}
   4941 
   4942 	}
   4943 
   4944 	if (!cursor_push_c_str(c, "}"))
   4945 		return 0;
   4946 
   4947 	return cur.p - cur.start;
   4948 }
   4949 
   4950 int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen) {
   4951 	int len;
   4952 
   4953 	if (!(len = ndb_event_commitment(note, buf, buflen)))
   4954 		return 0;
   4955 
   4956 	//fprintf(stderr, "%.*s\n", len, buf);
   4957 
   4958 	sha256((struct sha256*)note->id, buf, len);
   4959 
   4960 	return 1;
   4961 }
   4962 
   4963 int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32],
   4964 		unsigned char sig[64])
   4965 {
   4966 	unsigned char aux[32];
   4967 	secp256k1_keypair *pair = (secp256k1_keypair*) keypair->pair;
   4968 
   4969 	if (!fill_random(aux, sizeof(aux)))
   4970 		return 0;
   4971 
   4972 	secp256k1_context *ctx =
   4973 		secp256k1_context_create(SECP256K1_CONTEXT_NONE);
   4974 
   4975 	return secp256k1_schnorrsig_sign32(ctx, sig, id, pair, aux);
   4976 }
   4977 
   4978 int ndb_create_keypair(struct ndb_keypair *kp)
   4979 {
   4980 	secp256k1_keypair *keypair = (secp256k1_keypair*)kp->pair;
   4981 	secp256k1_xonly_pubkey pubkey;
   4982 
   4983 	secp256k1_context *ctx =
   4984 		secp256k1_context_create(SECP256K1_CONTEXT_NONE);;
   4985 
   4986 	/* Try to create a keypair with a valid context, it should only
   4987 	 * fail if the secret key is zero or out of range. */
   4988 	if (!secp256k1_keypair_create(ctx, keypair, kp->secret))
   4989 		return 0;
   4990 
   4991 	if (!secp256k1_keypair_xonly_pub(ctx, &pubkey, NULL, keypair))
   4992 		return 0;
   4993 
   4994 	/* Serialize the public key. Should always return 1 for a valid public key. */
   4995 	return secp256k1_xonly_pubkey_serialize(ctx, kp->pubkey, &pubkey);
   4996 }
   4997 
   4998 int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair)
   4999 {
   5000 	if (!hex_decode(secstr, strlen(secstr), keypair->secret, 32)) {
   5001 		fprintf(stderr, "could not hex decode secret key\n");
   5002 		return 0;
   5003 	}
   5004 
   5005 	return ndb_create_keypair(keypair);
   5006 }
   5007 
   5008 int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note,
   5009 			 struct ndb_keypair *keypair)
   5010 {
   5011 	int strings_len = builder->strings.p - builder->strings.start;
   5012 	unsigned char *note_end = builder->note_cur.p + strings_len;
   5013 	int total_size = note_end - builder->note_cur.start;
   5014 
   5015 	// move the strings buffer next to the end of our ndb_note
   5016 	memmove(builder->note_cur.p, builder->strings.start, strings_len);
   5017 
   5018 	// set the strings location
   5019 	builder->note->strings = builder->note_cur.p - builder->note_cur.start;
   5020 
   5021 	// record the total size
   5022 	//builder->note->size = total_size;
   5023 
   5024 	*note = builder->note;
   5025 
   5026 	// generate id and sign if we're building this manually
   5027 	if (keypair) {
   5028 		// use the remaining memory for building our id buffer
   5029 		unsigned char *end   = builder->mem.end;
   5030 		unsigned char *start = (unsigned char*)(*note) + total_size;
   5031 
   5032 		ndb_builder_set_pubkey(builder, keypair->pubkey);
   5033 
   5034 		if (!ndb_calculate_id(builder->note, start, end - start))
   5035 			return 0;
   5036 
   5037 		if (!ndb_sign_id(keypair, (*note)->id, (*note)->sig))
   5038 			return 0;
   5039 	}
   5040 
   5041 	// make sure we're aligned as a whole
   5042 	total_size = (total_size + 7) & ~7;
   5043 	assert((total_size % 8) == 0);
   5044 	return total_size;
   5045 }
   5046 
   5047 struct ndb_note * ndb_builder_note(struct ndb_builder *builder)
   5048 {
   5049 	return builder->note;
   5050 }
   5051 
   5052 static union ndb_packed_str ndb_offset_str(uint32_t offset)
   5053 {
   5054 	// ensure accidents like -1 don't corrupt our packed_str
   5055 	union ndb_packed_str str;
   5056 	// most significant byte is reserved for ndb_packtype
   5057 	str.offset = offset & 0xFFFFFF;
   5058 	return str;
   5059 }
   5060 
   5061 
   5062 /// find an existing string via str_indices. these indices only exist in the
   5063 /// builder phase just for this purpose.
   5064 static inline int ndb_builder_find_str(struct ndb_builder *builder,
   5065 				       const char *str, int len,
   5066 				       union ndb_packed_str *pstr)
   5067 {
   5068 	// find existing matching string to avoid duplicate strings
   5069 	int indices = cursor_count(&builder->str_indices, sizeof(uint32_t));
   5070 	for (int i = 0; i < indices; i++) {
   5071 		uint32_t index = ((uint32_t*)builder->str_indices.start)[i];
   5072 		const char *some_str = (const char*)builder->strings.start + index;
   5073 
   5074 		if (!memcmp(some_str, str, len) && some_str[len] == '\0') {
   5075 			// found an existing matching str, use that index
   5076 			*pstr = ndb_offset_str(index);
   5077 			return 1;
   5078 		}
   5079 	}
   5080 
   5081 	return 0;
   5082 }
   5083 
   5084 static int ndb_builder_push_str(struct ndb_builder *builder, const char *str,
   5085 				int len, union ndb_packed_str *pstr)
   5086 {
   5087 	uint32_t loc;
   5088 
   5089 	// no string found, push a new one
   5090 	loc = builder->strings.p - builder->strings.start;
   5091 	if (!(cursor_push(&builder->strings, (unsigned char*)str, len) &&
   5092 	      cursor_push_byte(&builder->strings, '\0'))) {
   5093 		return 0;
   5094 	}
   5095 
   5096 	*pstr = ndb_offset_str(loc);
   5097 
   5098 	// record in builder indices. ignore return value, if we can't cache it
   5099 	// then whatever
   5100 	cursor_push_u32(&builder->str_indices, loc);
   5101 
   5102 	return 1;
   5103 }
   5104 
   5105 static int ndb_builder_push_packed_id(struct ndb_builder *builder,
   5106 				      unsigned char *id,
   5107 				      union ndb_packed_str *pstr)
   5108 {
   5109 	// Don't both find id duplicates. very rarely are they duplicated
   5110 	// and it slows things down quite a bit. If we really care about this
   5111 	// We can switch to a hash table.
   5112 	//if (ndb_builder_find_str(builder, (const char*)id, 32, pstr)) {
   5113 	//	pstr->packed.flag = NDB_PACKED_ID;
   5114 	//	return 1;
   5115 	//}
   5116 
   5117 	if (ndb_builder_push_str(builder, (const char*)id, 32, pstr)) {
   5118 		pstr->packed.flag = NDB_PACKED_ID;
   5119 		return 1;
   5120 	}
   5121 
   5122 	return 0;
   5123 }
   5124 
   5125 union ndb_packed_str ndb_chars_to_packed_str(char c1, char c2)
   5126 {
   5127 	union ndb_packed_str str;
   5128 	str.packed.flag = NDB_PACKED_STR;
   5129 	str.packed.str[0] = c1;
   5130 	str.packed.str[1] = c2;
   5131 	str.packed.str[2] = '\0';
   5132 	return str;
   5133 }
   5134 
   5135 static union ndb_packed_str ndb_char_to_packed_str(char c)
   5136 {
   5137 	union ndb_packed_str str;
   5138 	str.packed.flag = NDB_PACKED_STR;
   5139 	str.packed.str[0] = c;
   5140 	str.packed.str[1] = '\0';
   5141 	return str;
   5142 }
   5143 
   5144 
   5145 /// Check for small strings to pack
   5146 static inline int ndb_builder_try_compact_str(struct ndb_builder *builder,
   5147 					      const char *str, int len,
   5148 					      union ndb_packed_str *pstr,
   5149 					      int pack_ids)
   5150 {
   5151 	unsigned char id_buf[32];
   5152 
   5153 	if (len == 0) {
   5154 		*pstr = ndb_char_to_packed_str(0);
   5155 		return 1;
   5156 	} else if (len == 1) {
   5157 		*pstr = ndb_char_to_packed_str(str[0]);
   5158 		return 1;
   5159 	} else if (len == 2) {
   5160 		*pstr = ndb_chars_to_packed_str(str[0], str[1]);
   5161 		return 1;
   5162 	} else if (pack_ids && len == 64 && hex_decode(str, 64, id_buf, 32)) {
   5163 		return ndb_builder_push_packed_id(builder, id_buf, pstr);
   5164 	}
   5165 
   5166 	return 0;
   5167 }
   5168 
   5169 
   5170 static int ndb_builder_push_unpacked_str(struct ndb_builder *builder,
   5171 					 const char *str, int len,
   5172 					 union ndb_packed_str *pstr)
   5173 {
   5174 	if (ndb_builder_find_str(builder, str, len, pstr))
   5175 		return 1;
   5176 
   5177 	return ndb_builder_push_str(builder, str, len, pstr);
   5178 }
   5179 
   5180 int ndb_builder_make_str(struct ndb_builder *builder, const char *str, int len,
   5181 			 union ndb_packed_str *pstr, int pack_ids)
   5182 {
   5183 	if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids))
   5184 		return 1;
   5185 
   5186 	return ndb_builder_push_unpacked_str(builder, str, len, pstr);
   5187 }
   5188 
   5189 int ndb_builder_set_content(struct ndb_builder *builder, const char *content,
   5190 			    int len)
   5191 {
   5192 	int pack_ids = 0;
   5193 	builder->note->content_length = len;
   5194 	return ndb_builder_make_str(builder, content, len,
   5195 				    &builder->note->content, pack_ids);
   5196 }
   5197 
   5198 
   5199 static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len,
   5200 			 const char *s)
   5201 {
   5202 	if (tok->type == JSMN_STRING && (int)strlen(s) == tok_len &&
   5203 	    memcmp(json + tok->start, s, tok_len) == 0) {
   5204 		return 1;
   5205 	}
   5206 	return 0;
   5207 }
   5208 
   5209 static int ndb_builder_finalize_tag(struct ndb_builder *builder,
   5210 				    union ndb_packed_str offset)
   5211 {
   5212 	if (!cursor_push_u32(&builder->note_cur, offset.offset))
   5213 		return 0;
   5214 	builder->current_tag->count++;
   5215 	return 1;
   5216 }
   5217 
   5218 /// Unescape and push json strings
   5219 static int ndb_builder_make_json_str(struct ndb_builder *builder,
   5220 				     const char *str, int len,
   5221 				     union ndb_packed_str *pstr,
   5222 				     int *written, int pack_ids)
   5223 {
   5224 	// let's not care about de-duping these. we should just unescape
   5225 	// in-place directly into the strings table.
   5226 	if (written)
   5227 		*written = len;
   5228 
   5229 	const char *p, *end, *start;
   5230 	unsigned char *builder_start;
   5231 
   5232 	// always try compact strings first
   5233 	if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids))
   5234 		return 1;
   5235 
   5236 	end = str + len;
   5237 	start = str; // Initialize start to the beginning of the string
   5238 
   5239 	*pstr = ndb_offset_str(builder->strings.p - builder->strings.start);
   5240 	builder_start = builder->strings.p;
   5241 
   5242 	for (p = str; p < end; p++) {
   5243 		if (*p == '\\' && p+1 < end) {
   5244 			// Push the chunk of unescaped characters before this escape sequence
   5245 			if (start < p && !cursor_push(&builder->strings,
   5246 						(unsigned char *)start,
   5247 						p - start)) {
   5248 				return 0;
   5249 			}
   5250 
   5251 			if (!cursor_push_unescaped_char(&builder->strings, *p, *(p+1)))
   5252 				return 0;
   5253 
   5254 			p++; // Skip the character following the backslash
   5255 			start = p + 1; // Update the start pointer to the next character
   5256 		}
   5257 	}
   5258 
   5259 	// Handle the last chunk after the last escape sequence (or if there are no escape sequences at all)
   5260 	if (start < p && !cursor_push(&builder->strings, (unsigned char *)start,
   5261 				      p - start)) {
   5262 		return 0;
   5263 	}
   5264 
   5265 	if (written)
   5266 		*written = builder->strings.p - builder_start;
   5267 
   5268 	// TODO: dedupe these!?
   5269 	return cursor_push_byte(&builder->strings, '\0');
   5270 }
   5271 
   5272 static int ndb_builder_push_json_tag(struct ndb_builder *builder,
   5273 				     const char *str, int len)
   5274 {
   5275 	union ndb_packed_str pstr;
   5276 	int pack_ids = 1;
   5277 	if (!ndb_builder_make_json_str(builder, str, len, &pstr, NULL, pack_ids))
   5278 		return 0;
   5279 	return ndb_builder_finalize_tag(builder, pstr);
   5280 }
   5281 
   5282 // Push a json array into an ndb tag ["p", "abcd..."] -> struct ndb_tag
   5283 static int ndb_builder_tag_from_json_array(struct ndb_json_parser *p,
   5284 					   jsmntok_t *array)
   5285 {
   5286 	jsmntok_t *str_tok;
   5287 	const char *str;
   5288 
   5289 	if (array->size == 0)
   5290 		return 0;
   5291 
   5292 	if (!ndb_builder_new_tag(&p->builder))
   5293 		return 0;
   5294 
   5295 	for (int i = 0; i < array->size; i++) {
   5296 		str_tok = &array[i+1];
   5297 		str = p->json + str_tok->start;
   5298 
   5299 		if (!ndb_builder_push_json_tag(&p->builder, str,
   5300 					       toksize(str_tok))) {
   5301 			return 0;
   5302 		}
   5303 	}
   5304 
   5305 	return 1;
   5306 }
   5307 
   5308 // Push json tags into ndb data
   5309 //   [["t", "hashtag"], ["p", "abcde..."]] -> struct ndb_tags
   5310 static inline int ndb_builder_process_json_tags(struct ndb_json_parser *p,
   5311 						jsmntok_t *array)
   5312 {
   5313 	jsmntok_t *tag = array;
   5314 
   5315 	if (array->size == 0)
   5316 		return 1;
   5317 
   5318 	for (int i = 0; i < array->size; i++) {
   5319 		if (!ndb_builder_tag_from_json_array(p, &tag[i+1]))
   5320 			return 0;
   5321 		tag += tag[i+1].size;
   5322 	}
   5323 
   5324 	return 1;
   5325 }
   5326 
   5327 static int parse_unsigned_int(const char *start, int len, unsigned int *num)
   5328 {
   5329 	unsigned int number = 0;
   5330 	const char *p = start, *end = start + len;
   5331 	int digits = 0;
   5332 
   5333 	while (p < end) {
   5334 		char c = *p;
   5335 
   5336 		if (c < '0' || c > '9')
   5337 			break;
   5338 
   5339 		// Check for overflow
   5340 		char digit = c - '0';
   5341 		if (number > (UINT_MAX - digit) / 10)
   5342 			return 0; // Overflow detected
   5343 
   5344 		number = number * 10 + digit;
   5345 
   5346 		p++;
   5347 		digits++;
   5348 	}
   5349 
   5350 	if (digits == 0)
   5351 		return 0;
   5352 
   5353 	*num = number;
   5354 	return 1;
   5355 }
   5356 
   5357 int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce,
   5358 			       unsigned char *buf, int bufsize, struct ndb_id_cb *cb)
   5359 {
   5360 	jsmntok_t *tok = NULL;
   5361 	int tok_len, res;
   5362 	struct ndb_json_parser parser;
   5363 
   5364 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
   5365 
   5366 	if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
   5367 		return res;
   5368 
   5369 	if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY)
   5370 		return 0;
   5371 
   5372 	parser.i = 1;
   5373 	tok = &parser.toks[parser.i++];
   5374 	tok_len = toksize(tok);
   5375 	if (tok->type != JSMN_STRING)
   5376 		return 0;
   5377 
   5378 	if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
   5379 		fce->evtype = NDB_FCE_EVENT;
   5380 		struct ndb_event *ev = &fce->event;
   5381 		return ndb_parse_json_note(&parser, &ev->note);
   5382 	}
   5383 
   5384 	return 0;
   5385 }
   5386 
   5387 
   5388 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
   5389 			   unsigned char *buf, int bufsize,
   5390 			   struct ndb_id_cb *cb)
   5391 {
   5392 	jsmntok_t *tok = NULL;
   5393 	int tok_len, res;
   5394 	struct ndb_json_parser parser;
   5395 
   5396 	tce->subid_len = 0;
   5397 	tce->subid = "";
   5398 
   5399 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
   5400 
   5401 	if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
   5402 		return res;
   5403 
   5404 	if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY)
   5405 		return 0;
   5406 
   5407 	parser.i = 1;
   5408 	tok = &parser.toks[parser.i++];
   5409 	tok_len = toksize(tok);
   5410 	if (tok->type != JSMN_STRING)
   5411 		return 0;
   5412 
   5413 	if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
   5414 		tce->evtype = NDB_TCE_EVENT;
   5415 		struct ndb_event *ev = &tce->event;
   5416 
   5417 		tok = &parser.toks[parser.i++];
   5418 		if (tok->type != JSMN_STRING)
   5419 			return 0;
   5420 
   5421 		tce->subid = json + tok->start;
   5422 		tce->subid_len = toksize(tok);
   5423 
   5424 		return ndb_parse_json_note(&parser, &ev->note);
   5425 	} else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) {
   5426 		tce->evtype = NDB_TCE_EOSE;
   5427 
   5428 		tok = &parser.toks[parser.i++];
   5429 		if (tok->type != JSMN_STRING)
   5430 			return 0;
   5431 
   5432 		tce->subid = json + tok->start;
   5433 		tce->subid_len = toksize(tok);
   5434 		return 1;
   5435 	} else if (tok_len == 2 && !memcmp("OK", json + tok->start, 2)) {
   5436 		if (parser.num_tokens != 5)
   5437 			return 0;
   5438 
   5439 		struct ndb_command_result *cr = &tce->command_result;
   5440 
   5441 		tce->evtype = NDB_TCE_OK;
   5442 
   5443 		tok = &parser.toks[parser.i++];
   5444 		if (tok->type != JSMN_STRING)
   5445 			return 0;
   5446 
   5447 		tce->subid = json + tok->start;
   5448 		tce->subid_len = toksize(tok);
   5449 
   5450 		tok = &parser.toks[parser.i++];
   5451 		if (tok->type != JSMN_PRIMITIVE || toksize(tok) == 0)
   5452 			return 0;
   5453 
   5454 		cr->ok = (json + tok->start)[0] == 't';
   5455 
   5456 		tok = &parser.toks[parser.i++];
   5457 		if (tok->type != JSMN_STRING)
   5458 			return 0;
   5459 
   5460 		tce->command_result.msg = json + tok->start;
   5461 		tce->command_result.msglen = toksize(tok);
   5462 
   5463 		return 1;
   5464 	} else if (tok_len == 4 && !memcmp("AUTH", json + tok->start, 4)) {
   5465 		tce->evtype = NDB_TCE_AUTH;
   5466 
   5467 		tok = &parser.toks[parser.i++];
   5468 		if (tok->type != JSMN_STRING)
   5469 			return 0;
   5470 
   5471 		tce->subid = json + tok->start;
   5472 		tce->subid_len = toksize(tok);
   5473 
   5474 		return 1;
   5475 	}
   5476 
   5477 	return 0;
   5478 }
   5479 
   5480 static enum ndb_filter_fieldtype
   5481 ndb_filter_parse_field(const char *tok, int len, char *tagchar)
   5482 {
   5483 	*tagchar = 0;
   5484 
   5485 	if (len == 0)
   5486 		return 0;
   5487 
   5488 	if (len == 7 && !strncmp(tok, "authors", 7)) {
   5489 		return NDB_FILTER_AUTHORS;
   5490 	} else if (len == 3 && !strncmp(tok, "ids", 3)) {
   5491 		return NDB_FILTER_IDS;
   5492 	} else if (len == 5 && !strncmp(tok, "kinds", 5)) {
   5493 		return NDB_FILTER_KINDS;
   5494 	} else if (len == 2 && tok[0] == '#') {
   5495 		*tagchar = tok[1];
   5496 		return NDB_FILTER_TAGS;
   5497 	} else if (len == 5 && !strncmp(tok, "since", 5)) {
   5498 		return NDB_FILTER_SINCE;
   5499 	} else if (len == 5 && !strncmp(tok, "until", 5)) {
   5500 		return NDB_FILTER_UNTIL;
   5501 	} else if (len == 5 && !strncmp(tok, "limit", 5)) {
   5502 		return NDB_FILTER_LIMIT;
   5503 	}
   5504 
   5505 	return 0;
   5506 }
   5507 
   5508 static int ndb_filter_parse_json_ids(struct ndb_json_parser *parser,
   5509 				     struct ndb_filter *filter)
   5510 {
   5511 	jsmntok_t *tok;
   5512 	const char *start;
   5513 	unsigned char hexbuf[32];
   5514 	int tok_len, i, size;
   5515 
   5516 	tok = &parser->toks[parser->i++];
   5517 
   5518 	if (tok->type != JSMN_ARRAY) {
   5519 		ndb_debug("parse_json_ids: not an array\n");
   5520 		return 0;
   5521 	}
   5522 
   5523 	size = tok->size;
   5524 
   5525 	for (i = 0; i < size; parser->i++, i++) {
   5526 		tok = &parser->toks[parser->i];
   5527 		start = parser->json + tok->start;
   5528 		tok_len = toksize(tok);
   5529 
   5530 		if (tok->type != JSMN_STRING) {
   5531 			ndb_debug("parse_json_ids: not a string '%d'\n", tok->type);
   5532 			return 0;
   5533 		}
   5534 
   5535 		if (tok_len != 64) {
   5536 			ndb_debug("parse_json_ids: not len 64: '%.*s'\n", tok_len, start);
   5537 			return 0;
   5538 		}
   5539 
   5540 		// id
   5541 		if (!hex_decode(start, tok_len, hexbuf, sizeof(hexbuf))) {
   5542 			ndb_debug("parse_json_ids: hex decode failed\n");
   5543 			return 0;
   5544 		}
   5545 
   5546 		ndb_debug("adding id elem\n");
   5547 		if (!ndb_filter_add_id_element(filter, hexbuf)) {
   5548 			ndb_debug("parse_json_ids: failed to add id element\n");
   5549 			return 0;
   5550 		}
   5551 	}
   5552 
   5553 	parser->i--;
   5554 	return 1;
   5555 }
   5556 
   5557 static int ndb_filter_parse_json_elems(struct ndb_json_parser *parser,
   5558 				       struct ndb_filter *filter)
   5559 {
   5560 	jsmntok_t *tok;
   5561 	const char *start;
   5562 	int tok_len;
   5563 	unsigned char hexbuf[32];
   5564 	enum ndb_generic_element_type typ;
   5565 	tok = NULL;
   5566 	int i, size;
   5567 
   5568 	tok = &parser->toks[parser->i++];
   5569 
   5570 	if (tok->type != JSMN_ARRAY)
   5571 		return 0;
   5572 
   5573 	size = tok->size;
   5574 
   5575 	for (i = 0; i < size; i++, parser->i++) {
   5576 		tok = &parser->toks[parser->i];
   5577 		start = parser->json + tok->start;
   5578 		tok_len = toksize(tok);
   5579 
   5580 		if (tok->type != JSMN_STRING)
   5581 			return 0;
   5582 
   5583 		if (i == 0) {
   5584 			if (tok_len == 64 && hex_decode(start, 64, hexbuf, sizeof(hexbuf))) {
   5585 				typ = NDB_ELEMENT_ID;
   5586 				if (!ndb_filter_add_id_element(filter, hexbuf)) {
   5587 					ndb_debug("failed to push id elem\n");
   5588 					return 0;
   5589 				}
   5590 			} else {
   5591 				typ = NDB_ELEMENT_STRING;
   5592 				if (!ndb_filter_add_str_element_len(filter, start, tok_len))
   5593 					return 0;
   5594 			}
   5595 		} else if (typ == NDB_ELEMENT_ID) {
   5596 			if (!hex_decode(start, 64, hexbuf, sizeof(hexbuf)))
   5597 				return 0;
   5598 			if (!ndb_filter_add_id_element(filter, hexbuf))
   5599 				return 0;
   5600 		} else if (typ == NDB_ELEMENT_STRING) {
   5601 			if (!ndb_filter_add_str_element_len(filter, start, tok_len))
   5602 				return 0;
   5603 		} else {
   5604 			// ???
   5605 			return 0;
   5606 		}
   5607 	}
   5608 
   5609 	parser->i--;
   5610 	return 1;
   5611 }
   5612 
   5613 static int ndb_filter_parse_json_int(struct ndb_json_parser *parser,
   5614 				     struct ndb_filter *filter)
   5615 {
   5616 	jsmntok_t *tok;
   5617 	const char *start;
   5618 	int tok_len;
   5619 	unsigned int value;
   5620 
   5621 	tok = &parser->toks[parser->i];
   5622 	start = parser->json + tok->start;
   5623 	tok_len = toksize(tok);
   5624 
   5625 	if (tok->type != JSMN_PRIMITIVE)
   5626 		return 0;
   5627 
   5628 	if (!parse_unsigned_int(start, tok_len, &value))
   5629 		return 0;
   5630 
   5631 	if (!ndb_filter_add_int_element(filter, (uint64_t)value))
   5632 		return 0;
   5633 
   5634 	ndb_debug("added int elem %d\n", value);
   5635 
   5636 	return 1;
   5637 }
   5638 
   5639 
   5640 static int ndb_filter_parse_json_ints(struct ndb_json_parser *parser,
   5641 				      struct ndb_filter *filter)
   5642 {
   5643 	jsmntok_t *tok;
   5644 	int size, i;
   5645 
   5646 	tok = &parser->toks[parser->i++];
   5647 
   5648 	if (tok->type != JSMN_ARRAY)
   5649 		return 0;
   5650 
   5651 	size = tok->size;
   5652 
   5653 	for (i = 0; i < size; parser->i++, i++) {
   5654 		if (!ndb_filter_parse_json_int(parser, filter))
   5655 			return 0;
   5656 	}
   5657 
   5658 	parser->i--;
   5659 	return 1;
   5660 }
   5661 
   5662 
   5663 static int ndb_filter_parse_json(struct ndb_json_parser *parser,
   5664 				 struct ndb_filter *filter)
   5665 {
   5666 	jsmntok_t *tok = NULL;
   5667 	const char *json = parser->json;
   5668 	const char *start;
   5669 	char tag;
   5670 	int tok_len;
   5671 	enum ndb_filter_fieldtype field;
   5672 
   5673 	if (parser->toks[parser->i++].type != JSMN_OBJECT)
   5674 		return 0;
   5675 
   5676 	for (; parser->i < parser->num_tokens; parser->i++) {
   5677 		tok = &parser->toks[parser->i++];
   5678 		start = json + tok->start;
   5679 		tok_len = toksize(tok);
   5680 
   5681 		if (!(field = ndb_filter_parse_field(start, tok_len, &tag))) {
   5682 			ndb_debug("failed field '%.*s'\n", tok_len, start);
   5683 			continue;
   5684 		}
   5685 
   5686 		if (tag) {
   5687 			ndb_debug("starting tag field '%c'\n", tag);
   5688 			if (!ndb_filter_start_tag_field(filter, tag)) {
   5689 				ndb_debug("failed to start tag field '%c'\n", tag);
   5690 				return 0;
   5691 			}
   5692 		} else if (!ndb_filter_start_field(filter, field)) {
   5693 			ndb_debug("field already started\n");
   5694 			return 0;
   5695 		}
   5696 
   5697 		// we parsed a top-level field
   5698 		switch(field) {
   5699 		case NDB_FILTER_AUTHORS:
   5700 		case NDB_FILTER_IDS:
   5701 			if (!ndb_filter_parse_json_ids(parser, filter)) {
   5702 				ndb_debug("failed to parse filter ids/authors\n");
   5703 				return 0;
   5704 			}
   5705 			break;
   5706 		case NDB_FILTER_SINCE:
   5707 		case NDB_FILTER_UNTIL:
   5708 		case NDB_FILTER_LIMIT:
   5709 			if (!ndb_filter_parse_json_int(parser, filter)) {
   5710 				ndb_debug("failed to parse filter since/until/limit\n");
   5711 				return 0;
   5712 			}
   5713 			break;
   5714 		case NDB_FILTER_KINDS:
   5715 			if (!ndb_filter_parse_json_ints(parser, filter)) {
   5716 				ndb_debug("failed to parse filter kinds\n");
   5717 				return 0;
   5718 			}
   5719 			break;
   5720 		case NDB_FILTER_TAGS:
   5721 			if (!ndb_filter_parse_json_elems(parser, filter)) {
   5722 				ndb_debug("failed to parse filter tags\n");
   5723 				return 0;
   5724 			}
   5725 			break;
   5726 		}
   5727 
   5728 		ndb_filter_end_field(filter);
   5729 	}
   5730 
   5731 	return ndb_filter_end(filter);
   5732 }
   5733 
   5734 int ndb_parse_json_note(struct ndb_json_parser *parser, struct ndb_note **note)
   5735 {
   5736 	jsmntok_t *tok = NULL;
   5737 	unsigned char hexbuf[64];
   5738 	const char *json = parser->json;
   5739 	const char *start;
   5740 	int i, tok_len, parsed;
   5741 
   5742 	parsed = 0;
   5743 
   5744 	if (parser->toks[parser->i].type != JSMN_OBJECT)
   5745 		return 0;
   5746 
   5747 	// TODO: build id buffer and verify at end
   5748 
   5749 	for (i = parser->i + 1; i < parser->num_tokens; i++) {
   5750 		tok = &parser->toks[i];
   5751 		start = json + tok->start;
   5752 		tok_len = toksize(tok);
   5753 
   5754 		//printf("toplevel %.*s %d\n", tok_len, json + tok->start, tok->type);
   5755 		if (tok_len == 0 || i + 1 >= parser->num_tokens)
   5756 			continue;
   5757 
   5758 		if (start[0] == 'p' && jsoneq(json, tok, tok_len, "pubkey")) {
   5759 			// pubkey
   5760 			tok = &parser->toks[i+1];
   5761 			hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf));
   5762 			parsed |= NDB_PARSED_PUBKEY;
   5763 			ndb_builder_set_pubkey(&parser->builder, hexbuf);
   5764 		} else if (tok_len == 2 && start[0] == 'i' && start[1] == 'd') {
   5765 			// id
   5766 			tok = &parser->toks[i+1];
   5767 			hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf));
   5768 			parsed |= NDB_PARSED_ID;
   5769 			ndb_builder_set_id(&parser->builder, hexbuf);
   5770 		} else if (tok_len == 3 && start[0] == 's' && start[1] == 'i' && start[2] == 'g') {
   5771 			// sig
   5772 			tok = &parser->toks[i+1];
   5773 			hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf));
   5774 			parsed |= NDB_PARSED_SIG;
   5775 			ndb_builder_set_sig(&parser->builder, hexbuf);
   5776 		} else if (start[0] == 'k' && jsoneq(json, tok, tok_len, "kind")) {
   5777 			// kind
   5778 			tok = &parser->toks[i+1];
   5779 			start = json + tok->start;
   5780 			if (tok->type != JSMN_PRIMITIVE || tok_len <= 0)
   5781 				return 0;
   5782 			if (!parse_unsigned_int(start, toksize(tok),
   5783 						&parser->builder.note->kind))
   5784 					return 0;
   5785 			parsed |= NDB_PARSED_KIND;
   5786 		} else if (start[0] == 'c') {
   5787 			if (jsoneq(json, tok, tok_len, "created_at")) {
   5788 				// created_at
   5789 				tok = &parser->toks[i+1];
   5790 				start = json + tok->start;
   5791 				if (tok->type != JSMN_PRIMITIVE || tok_len <= 0)
   5792 					return 0;
   5793 				// TODO: update to int64 in 2106 ... xD
   5794 				unsigned int bigi;
   5795 				if (!parse_unsigned_int(start, toksize(tok), &bigi))
   5796 					return 0;
   5797 				parser->builder.note->created_at = bigi;
   5798 				parsed |= NDB_PARSED_CREATED_AT;
   5799 			} else if (jsoneq(json, tok, tok_len, "content")) {
   5800 				// content
   5801 				tok = &parser->toks[i+1];
   5802 				union ndb_packed_str pstr;
   5803 				tok_len = toksize(tok);
   5804 				int written, pack_ids = 0;
   5805 				if (!ndb_builder_make_json_str(&parser->builder,
   5806 							json + tok->start,
   5807 							tok_len, &pstr,
   5808 							&written, pack_ids)) {
   5809 					ndb_debug("ndb_builder_make_json_str failed\n");
   5810 					return 0;
   5811 				}
   5812 				parser->builder.note->content_length = written;
   5813 				parser->builder.note->content = pstr;
   5814 				parsed |= NDB_PARSED_CONTENT;
   5815 			}
   5816 		} else if (start[0] == 't' && jsoneq(json, tok, tok_len, "tags")) {
   5817 			tok = &parser->toks[i+1];
   5818 			ndb_builder_process_json_tags(parser, tok);
   5819 			i += tok->size;
   5820 			parsed |= NDB_PARSED_TAGS;
   5821 		}
   5822 	}
   5823 
   5824 	//ndb_debug("parsed %d = %d, &->%d", parsed, NDB_PARSED_ALL, parsed & NDB_PARSED_ALL);
   5825 	if (parsed != NDB_PARSED_ALL)
   5826 		return 0;
   5827 
   5828 	return ndb_builder_finalize(&parser->builder, note, NULL);
   5829 }
   5830 
   5831 int ndb_filter_from_json(const char *json, int len, struct ndb_filter *filter,
   5832 			 unsigned char *buf, int bufsize)
   5833 {
   5834 	struct ndb_json_parser parser;
   5835 	int res;
   5836 
   5837 	if (filter->finalized)
   5838 		return 0;
   5839 
   5840 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
   5841 	if ((res = ndb_json_parser_parse(&parser, NULL)) < 0)
   5842 		return res;
   5843 
   5844 	if (parser.num_tokens < 1)
   5845 		return 0;
   5846 
   5847 	return ndb_filter_parse_json(&parser, filter);
   5848 }
   5849 
   5850 int ndb_note_from_json(const char *json, int len, struct ndb_note **note,
   5851 		       unsigned char *buf, int bufsize)
   5852 {
   5853 	struct ndb_json_parser parser;
   5854 	int res;
   5855 
   5856 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
   5857 	if ((res = ndb_json_parser_parse(&parser, NULL)) < 0)
   5858 		return res;
   5859 
   5860 	if (parser.num_tokens < 1)
   5861 		return 0;
   5862 
   5863 	return ndb_parse_json_note(&parser, note);
   5864 }
   5865 
   5866 void ndb_builder_set_pubkey(struct ndb_builder *builder, unsigned char *pubkey)
   5867 {
   5868 	memcpy(builder->note->pubkey, pubkey, 32);
   5869 }
   5870 
   5871 void ndb_builder_set_id(struct ndb_builder *builder, unsigned char *id)
   5872 {
   5873 	memcpy(builder->note->id, id, 32);
   5874 }
   5875 
   5876 void ndb_builder_set_sig(struct ndb_builder *builder, unsigned char *sig)
   5877 {
   5878 	memcpy(builder->note->sig, sig, 64);
   5879 }
   5880 
   5881 void ndb_builder_set_kind(struct ndb_builder *builder, uint32_t kind)
   5882 {
   5883 	builder->note->kind = kind;
   5884 }
   5885 
   5886 void ndb_builder_set_created_at(struct ndb_builder *builder, uint64_t created_at)
   5887 {
   5888 	builder->note->created_at = created_at;
   5889 }
   5890 
   5891 int ndb_builder_new_tag(struct ndb_builder *builder)
   5892 {
   5893 	builder->note->tags.count++;
   5894 	struct ndb_tag tag = {0};
   5895 	builder->current_tag = (struct ndb_tag *)builder->note_cur.p;
   5896 	return cursor_push_tag(&builder->note_cur, &tag);
   5897 }
   5898 
   5899 void ndb_stat_counts_init(struct ndb_stat_counts *counts)
   5900 {
   5901 	counts->count = 0;
   5902 	counts->key_size = 0;
   5903 	counts->value_size = 0;
   5904 }
   5905 
   5906 static void ndb_stat_init(struct ndb_stat *stat)
   5907 {
   5908 	// init stats
   5909 	int i;
   5910 
   5911 	for (i = 0; i < NDB_CKIND_COUNT; i++) {
   5912 		ndb_stat_counts_init(&stat->common_kinds[i]);
   5913 	}
   5914 
   5915 	for (i = 0; i < NDB_DBS; i++) {
   5916 		ndb_stat_counts_init(&stat->dbs[i]);
   5917 	}
   5918 
   5919 	ndb_stat_counts_init(&stat->other_kinds);
   5920 }
   5921 
   5922 int ndb_stat(struct ndb *ndb, struct ndb_stat *stat)
   5923 {
   5924 	int rc;
   5925 	MDB_cursor *cur;
   5926 	MDB_val k, v;
   5927 	MDB_dbi db;
   5928 	struct ndb_txn txn;
   5929 	struct ndb_note *note;
   5930 	int i;
   5931 	enum ndb_common_kind common_kind;
   5932 
   5933 	// initialize to 0
   5934 	ndb_stat_init(stat);
   5935 
   5936 	if (!ndb_begin_query(ndb, &txn)) {
   5937 		fprintf(stderr, "ndb_stat failed at ndb_begin_query\n");
   5938 		return 0;
   5939 	}
   5940 
   5941 	// stat each dbi in the database
   5942 	for (i = 0; i < NDB_DBS; i++)
   5943 	{
   5944 		db = ndb->lmdb.dbs[i];
   5945 
   5946 		if ((rc = mdb_cursor_open(txn.mdb_txn, db, &cur))) {
   5947 			fprintf(stderr, "ndb_stat: mdb_cursor_open failed, error '%s'\n",
   5948 					mdb_strerror(rc));
   5949 			return 0;
   5950 		}
   5951 
   5952 		// loop over every entry and count kv sizes
   5953 		while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   5954 			// we gather more detailed per-kind stats if we're in
   5955 			// the notes db
   5956 			if (i == NDB_DB_NOTE) {
   5957 				note = v.mv_data;
   5958 				common_kind = ndb_kind_to_common_kind(note->kind);
   5959 
   5960 				// uncommon kind? just count them in bulk
   5961 				if ((int)common_kind == -1) {
   5962 					stat->other_kinds.count++;
   5963 					stat->other_kinds.key_size += k.mv_size;
   5964 					stat->other_kinds.value_size += v.mv_size;
   5965 				} else {
   5966 					stat->common_kinds[common_kind].count++;
   5967 					stat->common_kinds[common_kind].key_size += k.mv_size;
   5968 					stat->common_kinds[common_kind].value_size += v.mv_size;
   5969 				}
   5970 			}
   5971 
   5972 			stat->dbs[i].count++;
   5973 			stat->dbs[i].key_size += k.mv_size;
   5974 			stat->dbs[i].value_size += v.mv_size;
   5975 		}
   5976 
   5977 		// close the cursor, they are per-dbi
   5978 		mdb_cursor_close(cur);
   5979 	}
   5980 
   5981 	ndb_end_query(&txn);
   5982 
   5983 	return 1;
   5984 }
   5985 
   5986 /// Push an element to the current tag
   5987 ///
   5988 /// Basic idea is to call ndb_builder_new_tag
   5989 inline int ndb_builder_push_tag_str(struct ndb_builder *builder,
   5990 				    const char *str, int len)
   5991 {
   5992 	union ndb_packed_str pstr;
   5993 	int pack_ids = 1;
   5994 	if (!ndb_builder_make_str(builder, str, len, &pstr, pack_ids))
   5995 		return 0;
   5996 	return ndb_builder_finalize_tag(builder, pstr);
   5997 }
   5998 
   5999 //
   6000 // CONFIG
   6001 //
   6002 void ndb_default_config(struct ndb_config *config)
   6003 {
   6004 	int cores = get_cpu_cores();
   6005 	config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB
   6006 	config->ingester_threads = cores == -1 ? 4 : cores;
   6007 	config->flags = 0;
   6008 	config->ingest_filter = NULL;
   6009 	config->filter_context = NULL;
   6010 	config->sub_cb_ctx = NULL;
   6011 	config->sub_cb = NULL;
   6012 }
   6013 
   6014 void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *context)
   6015 {
   6016 	config->sub_cb_ctx = context;
   6017 	config->sub_cb = fn;
   6018 }
   6019 
   6020 void ndb_config_set_ingest_threads(struct ndb_config *config, int threads)
   6021 {
   6022 	config->ingester_threads = threads;
   6023 }
   6024 
   6025 void ndb_config_set_flags(struct ndb_config *config, int flags)
   6026 {
   6027 	config->flags = flags;
   6028 }
   6029 
   6030 void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize)
   6031 {
   6032 	config->mapsize = mapsize;
   6033 }
   6034 
   6035 void ndb_config_set_ingest_filter(struct ndb_config *config,
   6036 				  ndb_ingest_filter_fn fn, void *filter_ctx)
   6037 {
   6038 	config->ingest_filter = fn;
   6039 	config->filter_context = filter_ctx;
   6040 }
   6041 
   6042 int ndb_print_tag_index(struct ndb_txn *txn)
   6043 {
   6044 	MDB_cursor *cur;
   6045 	MDB_val k, v;
   6046 	int i;
   6047 
   6048 	if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TAGS], &cur))
   6049 		return 0;
   6050 
   6051 	i = 1;
   6052 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   6053 		printf("%d ", i);
   6054 		print_tag_kv(txn, &k, &v);
   6055 		i++;
   6056 	}
   6057 
   6058 	return 1;
   6059 }
   6060 
   6061 int ndb_print_kind_keys(struct ndb_txn *txn)
   6062 {
   6063 	MDB_cursor *cur;
   6064 	MDB_val k, v;
   6065 	int i;
   6066 	struct ndb_u64_tsid *tsid;
   6067 
   6068 	if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_KIND], &cur))
   6069 		return 0;
   6070 
   6071 	i = 1;
   6072 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   6073 		tsid = k.mv_data;
   6074 		printf("%d note_kind %" PRIu64 " %" PRIu64 "\n",
   6075 			i, tsid->u64, tsid->timestamp);
   6076 
   6077 		i++;
   6078 	}
   6079 
   6080 	return 1;
   6081 }
   6082 
   6083 // used by ndb.c
   6084 int ndb_print_search_keys(struct ndb_txn *txn)
   6085 {
   6086 	MDB_cursor *cur;
   6087 	MDB_val k, v;
   6088 	int i;
   6089 	struct ndb_text_search_key search_key;
   6090 
   6091 	if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TEXT], &cur))
   6092 		return 0;
   6093 
   6094 	i = 1;
   6095 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   6096 		if (!ndb_unpack_text_search_key(k.mv_data, k.mv_size, &search_key)) {
   6097 			fprintf(stderr, "error decoding key %d\n", i);
   6098 			continue;
   6099 		}
   6100 
   6101 		ndb_print_text_search_key(&search_key);
   6102 		printf("\n");
   6103 
   6104 		i++;
   6105 	}
   6106 
   6107 	return 1;
   6108 }
   6109 
   6110 struct ndb_tags *ndb_note_tags(struct ndb_note *note)
   6111 {
   6112 	return &note->tags;
   6113 }
   6114 
   6115 struct ndb_str ndb_note_str(struct ndb_note *note, union ndb_packed_str *pstr)
   6116 {
   6117 	struct ndb_str str;
   6118 	str.flag = pstr->packed.flag;
   6119 
   6120 	if (str.flag == NDB_PACKED_STR) {
   6121 		str.str = pstr->packed.str;
   6122 		return str;
   6123 	}
   6124 
   6125 	str.str = ((const char *)note) + note->strings + (pstr->offset & 0xFFFFFF);
   6126 	return str;
   6127 }
   6128 
   6129 struct ndb_str ndb_tag_str(struct ndb_note *note, struct ndb_tag *tag, int ind)
   6130 {
   6131 	return ndb_note_str(note, &tag->strs[ind]);
   6132 }
   6133 
   6134 int ndb_str_len(struct ndb_str *str)
   6135 {
   6136 	if (str->flag == NDB_PACKED_ID)
   6137 		return 32;
   6138 	return strlen(str->str);
   6139 }
   6140 
   6141 struct ndb_str ndb_iter_tag_str(struct ndb_iterator *iter, int ind)
   6142 {
   6143 	return ndb_tag_str(iter->note, iter->tag, ind);
   6144 }
   6145 
   6146 unsigned char * ndb_note_id(struct ndb_note *note)
   6147 {
   6148 	return note->id;
   6149 }
   6150 
   6151 unsigned char * ndb_note_pubkey(struct ndb_note *note)
   6152 {
   6153 	return note->pubkey;
   6154 }
   6155 
   6156 unsigned char * ndb_note_sig(struct ndb_note *note)
   6157 {
   6158 	return note->sig;
   6159 }
   6160 
   6161 uint32_t ndb_note_created_at(struct ndb_note *note)
   6162 {
   6163 	return note->created_at;
   6164 }
   6165 
   6166 uint32_t ndb_note_kind(struct ndb_note *note)
   6167 {
   6168 	return note->kind;
   6169 }
   6170 
   6171 void _ndb_note_set_kind(struct ndb_note *note, uint32_t kind)
   6172 {
   6173 	note->kind = kind;
   6174 }
   6175 
   6176 const char *ndb_note_content(struct ndb_note *note)
   6177 {
   6178 	return ndb_note_str(note, &note->content).str;
   6179 }
   6180 
   6181 uint32_t ndb_note_content_length(struct ndb_note *note)
   6182 {
   6183 	return note->content_length;
   6184 }
   6185 
   6186 struct ndb_note * ndb_note_from_bytes(unsigned char *bytes)
   6187 {
   6188 	struct ndb_note *note = (struct ndb_note *)bytes;
   6189 	if (note->version != 1)
   6190 		return 0;
   6191 	return note;
   6192 }
   6193 
   6194 void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter)
   6195 {
   6196 	iter->note = note;
   6197 	iter->tag = NULL;
   6198 	iter->index = -1;
   6199 }
   6200 
   6201 int ndb_tags_iterate_next(struct ndb_iterator *iter)
   6202 {
   6203 	struct ndb_tags *tags;
   6204 
   6205 	if (iter->tag == NULL || iter->index == -1) {
   6206 		iter->tag = iter->note->tags.tag;
   6207 		iter->index = 0;
   6208 		return iter->note->tags.count != 0;
   6209 	}
   6210 
   6211 	tags = &iter->note->tags;
   6212 
   6213 	if (++iter->index < tags->count) {
   6214 		uint32_t tag_data_size = iter->tag->count * sizeof(iter->tag->strs[0]);
   6215 		iter->tag = (struct ndb_tag *)(iter->tag->strs[0].bytes + tag_data_size);
   6216 		return 1;
   6217 	}
   6218 
   6219 	return 0;
   6220 }
   6221 
   6222 uint16_t ndb_tags_count(struct ndb_tags *tags)
   6223 {
   6224 	return tags->count;
   6225 }
   6226 
   6227 uint16_t ndb_tag_count(struct ndb_tag *tags)
   6228 {
   6229 	return tags->count;
   6230 }
   6231 
   6232 enum ndb_common_kind ndb_kind_to_common_kind(int kind)
   6233 {
   6234 	switch (kind)
   6235 	{
   6236 		case 0:     return NDB_CKIND_PROFILE;
   6237 		case 1:     return NDB_CKIND_TEXT;
   6238 		case 3:     return NDB_CKIND_CONTACTS;
   6239 		case 4:     return NDB_CKIND_DM;
   6240 		case 5:     return NDB_CKIND_DELETE;
   6241 		case 6:     return NDB_CKIND_REPOST;
   6242 		case 7:     return NDB_CKIND_REACTION;
   6243 		case 9735:  return NDB_CKIND_ZAP;
   6244 		case 9734:  return NDB_CKIND_ZAP_REQUEST;
   6245 		case 23194: return NDB_CKIND_NWC_REQUEST;
   6246 		case 23195: return NDB_CKIND_NWC_RESPONSE;
   6247 		case 27235: return NDB_CKIND_HTTP_AUTH;
   6248 		case 30000: return NDB_CKIND_LIST;
   6249 		case 30023: return NDB_CKIND_LONGFORM;
   6250 		case 30315: return NDB_CKIND_STATUS;
   6251 	}
   6252 
   6253 	return -1;
   6254 }
   6255 
   6256 const char *ndb_kind_name(enum ndb_common_kind ck)
   6257 {
   6258 	switch (ck) {
   6259 		case NDB_CKIND_PROFILE:      return "profile";
   6260 		case NDB_CKIND_TEXT:         return "text";
   6261 		case NDB_CKIND_CONTACTS:     return "contacts";
   6262 		case NDB_CKIND_DM:           return "dm";
   6263 		case NDB_CKIND_DELETE:       return "delete";
   6264 		case NDB_CKIND_REPOST:       return "repost";
   6265 		case NDB_CKIND_REACTION:     return "reaction";
   6266 		case NDB_CKIND_ZAP:          return "zap";
   6267 		case NDB_CKIND_ZAP_REQUEST:  return "zap_request";
   6268 		case NDB_CKIND_NWC_REQUEST:  return "nwc_request";
   6269 		case NDB_CKIND_NWC_RESPONSE: return "nwc_response";
   6270 		case NDB_CKIND_HTTP_AUTH:    return "http_auth";
   6271 		case NDB_CKIND_LIST:         return "list";
   6272 		case NDB_CKIND_LONGFORM:     return "longform";
   6273 		case NDB_CKIND_STATUS:       return "status";
   6274 		case NDB_CKIND_COUNT:        return "unknown";
   6275 	}
   6276 
   6277 	return "unknown";
   6278 }
   6279 
   6280 const char *ndb_db_name(enum ndb_dbs db)
   6281 {
   6282 	switch (db) {
   6283 		case NDB_DB_NOTE:
   6284 			return "note";
   6285 		case NDB_DB_META:
   6286 			return "note_metadata";
   6287 		case NDB_DB_PROFILE:
   6288 			return "profile";
   6289 		case NDB_DB_NOTE_ID:
   6290 			return "note_index";
   6291 		case NDB_DB_PROFILE_PK:
   6292 			return "profile_pubkey_index";
   6293 		case NDB_DB_NDB_META:
   6294 			return "nostrdb_metadata";
   6295 		case NDB_DB_PROFILE_SEARCH:
   6296 			return "profile_search";
   6297 		case NDB_DB_PROFILE_LAST_FETCH:
   6298 			return "profile_last_fetch";
   6299 		case NDB_DB_NOTE_KIND:
   6300 			return "note_kind_index";
   6301 		case NDB_DB_NOTE_TEXT:
   6302 			return "note_fulltext";
   6303 		case NDB_DB_NOTE_BLOCKS:
   6304 			return "note_blocks";
   6305 		case NDB_DB_NOTE_TAGS:
   6306 			return "note_tags";
   6307 		case NDB_DBS:
   6308 			return "count";
   6309 	}
   6310 
   6311 	return "unknown";
   6312 }
   6313 
   6314 static struct ndb_blocks *ndb_note_to_blocks(struct ndb_note *note)
   6315 {
   6316 	const char *content;
   6317 	size_t content_len;
   6318 	struct ndb_blocks *blocks;
   6319 
   6320 	content = ndb_note_content(note);
   6321 	content_len = ndb_note_content_length(note);
   6322 
   6323 	// something weird is going on
   6324 	if (content_len >= INT32_MAX)
   6325 		return NULL;
   6326 
   6327 	unsigned char *buffer = malloc(content_len);
   6328 	if (!buffer)
   6329 		return NULL;
   6330 
   6331 	if (!ndb_parse_content(buffer, content_len, content, content_len, &blocks)) {
   6332 		free(buffer);
   6333 		return NULL;
   6334 	}
   6335 
   6336 	//blocks = realloc(blocks, ndb_blocks_total_size(blocks));
   6337 	//if (blocks == NULL)
   6338 		//return NULL;
   6339 
   6340 	blocks->flags |= NDB_BLOCK_FLAG_OWNED;
   6341 
   6342 	return blocks;
   6343 }
   6344 
   6345 struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, uint64_t note_key)
   6346 {
   6347 	struct ndb_blocks *blocks, *blocks_to_writer;
   6348 	size_t blocks_size;
   6349 	struct ndb_note *note;
   6350 	size_t note_len;
   6351 
   6352 	if ((blocks = ndb_lookup_by_key(txn, note_key, NDB_DB_NOTE_BLOCKS, &note_len))) {
   6353 		return blocks;
   6354 	}
   6355 
   6356 	// If we don't have note blocks, let's lazily generate them. This is
   6357 	// migration-friendly instead of doing them all at once
   6358 	if (!(note = ndb_get_note_by_key(txn, note_key, &note_len))) {
   6359 		// no note found, can't return note blocks
   6360 		return NULL;
   6361 	}
   6362 
   6363 	 if (!(blocks = ndb_note_to_blocks(note)))
   6364 		 return NULL;
   6365 
   6366 	 // send a copy to the writer
   6367 	 blocks_size = ndb_blocks_total_size(blocks);
   6368 	 blocks_to_writer = malloc(blocks_size);
   6369 	 memcpy(blocks_to_writer, blocks, blocks_size);
   6370 	 assert(blocks->flags & NDB_BLOCK_FLAG_OWNED);
   6371 
   6372 	 // we generated new blocks, let's store them in the DB
   6373 	 struct ndb_writer_blocks write_blocks = {
   6374 		 .blocks = blocks_to_writer,
   6375 		 .note_key = note_key
   6376 	 };
   6377 
   6378 	 assert(write_blocks.blocks != blocks);
   6379 
   6380 	 struct ndb_writer_msg msg = { .type = NDB_WRITER_BLOCKS };
   6381 	 msg.blocks = write_blocks;
   6382 
   6383 	 ndb_writer_queue_msg(&ndb->writer, &msg);
   6384 
   6385 	 return blocks;
   6386 }
   6387 
   6388 struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid, int *index)
   6389 {
   6390 	struct ndb_subscription *sub, *tsub;
   6391 	int i;
   6392 
   6393 	for (i = 0, sub = NULL; i < ndb->monitor.num_subscriptions; i++) {
   6394 		tsub = &ndb->monitor.subscriptions[i];
   6395 		if (tsub->subid == subid) {
   6396 			sub = tsub;
   6397 			if (index)
   6398 				*index = i;
   6399 			break;
   6400 		}
   6401 	}
   6402 
   6403 	return sub;
   6404 }
   6405 
   6406 int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
   6407 		       int note_id_capacity)
   6408 {
   6409 	struct ndb_subscription *sub;
   6410 
   6411 	if (subid == 0)
   6412 		return 0;
   6413 
   6414 	if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
   6415 		return 0;
   6416 
   6417 	return prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity);
   6418 }
   6419 
   6420 int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
   6421 		       int note_id_capacity)
   6422 {
   6423 	struct ndb_subscription *sub;
   6424 
   6425 	// this is not a valid subscription id
   6426 	if (subid == 0)
   6427 		return 0;
   6428 
   6429 	if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
   6430 		return 0;
   6431 
   6432 	return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity);
   6433 }
   6434 
   6435 int ndb_unsubscribe(struct ndb *ndb, uint64_t subid)
   6436 {
   6437 	struct ndb_subscription *sub;
   6438 	int index, elems_to_move;
   6439 
   6440 	if (!(sub = ndb_find_subscription(ndb, subid, &index)))
   6441 		return 0;
   6442 
   6443 	ndb_subscription_destroy(sub);
   6444 
   6445 	elems_to_move = (--ndb->monitor.num_subscriptions) - index;
   6446 
   6447 	memmove(&ndb->monitor.subscriptions[index],
   6448 		&ndb->monitor.subscriptions[index+1],
   6449 		elems_to_move * sizeof(*sub));
   6450 
   6451 	return 1;
   6452 }
   6453 
   6454 int ndb_num_subscriptions(struct ndb *ndb)
   6455 {
   6456 	return ndb->monitor.num_subscriptions;
   6457 }
   6458 
   6459 uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filters)
   6460 {
   6461 	static uint64_t subids = 0;
   6462 	struct ndb_subscription *sub;
   6463 	int index;
   6464 	size_t buflen;
   6465 	uint64_t subid;
   6466 	char *buf;
   6467 
   6468 	if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) {
   6469 		fprintf(stderr, "too many subscriptions\n");
   6470 		return 0;
   6471 	}
   6472 
   6473 	index = ndb->monitor.num_subscriptions++;
   6474 	sub = &ndb->monitor.subscriptions[index];
   6475 	subid = ++subids;
   6476 	sub->subid = subid;
   6477 
   6478 	ndb_filter_group_init(&sub->group);
   6479 	if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters))
   6480 		return 0;
   6481 
   6482 	// 500k ought to be enough for anyone
   6483 	buflen = sizeof(uint64_t) * 65536;
   6484 	buf = malloc(buflen);
   6485 
   6486 	if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) {
   6487 		fprintf(stderr, "failed to push prot queue\n");
   6488 		return 0;
   6489 	}
   6490 
   6491 	return subid;
   6492 }