nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

nostrdb.c (113574B)


      1 
      2 #include "nostrdb.h"
      3 #include "jsmn.h"
      4 #include "hex.h"
      5 #include "cursor.h"
      6 #include "random.h"
      7 #include "sha256.h"
      8 #include "lmdb.h"
      9 #include "util.h"
     10 #include "cpu.h"
     11 #include "threadpool.h"
     12 #include "protected_queue.h"
     13 #include "memchr.h"
     14 #include <stdlib.h>
     15 #include <limits.h>
     16 #include <assert.h>
     17 
     18 #include "bindings/c/profile_json_parser.h"
     19 #include "bindings/c/profile_builder.h"
     20 #include "bindings/c/meta_builder.h"
     21 #include "bindings/c/meta_reader.h"
     22 #include "bindings/c/profile_verifier.h"
     23 #include "secp256k1.h"
     24 #include "secp256k1_ecdh.h"
     25 #include "secp256k1_schnorrsig.h"
     26 
     27 #define max(a,b) ((a) > (b) ? (a) : (b))
     28 #define min(a,b) ((a) < (b) ? (a) : (b))
     29 
     30 // the maximum number of things threads pop and push in bulk
     31 static const int THREAD_QUEUE_BATCH = 4096;
     32 
     33 // the maximum size of inbox queues
     34 static const int DEFAULT_QUEUE_SIZE = 1000000;
     35 
     36 // increase if we need bigger filters
     37 #define NDB_FILTER_PAGES 64
     38 
     39 #define ndb_flag_set(flags, f) ((flags & f) == f)
     40 
     41 #define NDB_PARSED_ID           (1 << 0)
     42 #define NDB_PARSED_PUBKEY       (1 << 1)
     43 #define NDB_PARSED_SIG          (1 << 2)
     44 #define NDB_PARSED_CREATED_AT   (1 << 3)
     45 #define NDB_PARSED_KIND         (1 << 4)
     46 #define NDB_PARSED_CONTENT      (1 << 5)
     47 #define NDB_PARSED_TAGS         (1 << 6)
     48 #define NDB_PARSED_ALL          (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS)
     49 
     50 typedef int (*ndb_migrate_fn)(struct ndb *);
     51 typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len,
     52 				  int word_index);
     53 
     54 // these must be byte-aligned, they are directly accessing the serialized data
     55 // representation
     56 #pragma pack(push, 1)
     57 
     58 
     59 union ndb_packed_str {
     60 	struct {
     61 		char str[3];
     62 		// we assume little endian everywhere. sorry not sorry.
     63 		unsigned char flag; // NDB_PACKED_STR, etc
     64 	} packed;
     65 
     66 	uint32_t offset;
     67 	unsigned char bytes[4];
     68 };
     69 
     70 struct ndb_tag {
     71 	uint16_t count;
     72 	union ndb_packed_str strs[0];
     73 };
     74 
     75 struct ndb_tags {
     76 	uint16_t padding;
     77 	uint16_t count;
     78 	struct ndb_tag tag[0];
     79 };
     80 
     81 // v1
     82 struct ndb_note {
     83 	unsigned char version;    // v=1
     84 	unsigned char padding[3]; // keep things aligned
     85 	unsigned char id[32];
     86 	unsigned char pubkey[32];
     87 	unsigned char sig[64];
     88 
     89 	uint64_t created_at;
     90 	uint32_t kind;
     91 	uint32_t content_length;
     92 	union ndb_packed_str content;
     93 	uint32_t strings;
     94 	// nothing can come after tags since it contains variadic data
     95 	struct ndb_tags tags;
     96 };
     97 
     98 #pragma pack(pop)
     99 
    100 struct ndb_migration {
    101 	ndb_migrate_fn fn;
    102 };
    103 
    104 struct ndb_profile_record_builder {
    105 	flatcc_builder_t *builder;
    106 	void *flatbuf;
    107 };
    108 
    109 // controls whether to continue or stop the json parser
    110 enum ndb_idres {
    111 	NDB_IDRES_CONT,
    112 	NDB_IDRES_STOP,
    113 };
    114 
    115 // closure data for the id-detecting ingest controller
    116 struct ndb_ingest_controller
    117 {
    118 	MDB_txn *read_txn;
    119 	struct ndb_lmdb *lmdb;
    120 };
    121 
    122 enum ndb_writer_msgtype {
    123 	NDB_WRITER_QUIT, // kill thread immediately
    124 	NDB_WRITER_NOTE, // write a note to the db
    125 	NDB_WRITER_PROFILE, // write a profile to the db
    126 	NDB_WRITER_DBMETA, // write ndb metadata
    127 	NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched
    128 };
    129 
    130 // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META)
    131 enum ndb_meta_key {
    132 	NDB_META_KEY_VERSION = 1
    133 };
    134 
    135 struct ndb_json_parser {
    136 	const char *json;
    137 	int json_len;
    138 	struct ndb_builder builder;
    139 	jsmn_parser json_parser;
    140 	jsmntok_t *toks, *toks_end;
    141 	int i;
    142 	int num_tokens;
    143 };
    144 
    145 // useful to pass to threads on its own
    146 struct ndb_lmdb {
    147 	MDB_env *env;
    148 	MDB_dbi dbs[NDB_DBS];
    149 };
    150 
    151 struct ndb_writer {
    152 	struct ndb_lmdb *lmdb;
    153 
    154 	void *queue_buf;
    155 	int queue_buflen;
    156 	pthread_t thread_id;
    157 
    158 	struct prot_queue inbox;
    159 };
    160 
    161 struct ndb_ingester {
    162 	uint32_t flags;
    163 	struct threadpool tp;
    164 	struct ndb_writer *writer;
    165 	void *filter_context;
    166 	ndb_ingest_filter_fn filter;
    167 };
    168 
    169 
    170 struct ndb {
    171 	struct ndb_lmdb lmdb;
    172 	struct ndb_ingester ingester;
    173 	struct ndb_writer writer;
    174 	int version;
    175 	uint32_t flags; // setting flags
    176 	// lmdb environ handles, etc
    177 };
    178 
    179 
    180 // A clustered key with an id and a timestamp
    181 struct ndb_tsid {
    182 	unsigned char id[32];
    183 	uint64_t timestamp;
    184 };
    185 
    186 // A u64 + timestamp id. Just using this for kinds at the moment.
    187 struct ndb_u64_tsid {
    188 	uint64_t u64; // kind, etc
    189 	uint64_t timestamp;
    190 };
    191 
    192 struct ndb_word
    193 {
    194 	const char *word;
    195 	int word_len;
    196 };
    197 
    198 struct ndb_search_words
    199 {
    200 	struct ndb_word words[MAX_TEXT_SEARCH_WORDS];
    201 	int num_words;
    202 };
    203 
    204 // ndb_text_search_key
    205 //
    206 // This is compressed when in lmdb:
    207 //
    208 //   note_id:    varint
    209 //   strlen:     varint
    210 //   str:        cstr
    211 //   timestamp:  varint
    212 //   word_index: varint
    213 //   
    214 static int ndb_make_text_search_key(unsigned char *buf, int bufsize,
    215 				    int word_index, int word_len, const char *str,
    216 				    uint64_t timestamp, uint64_t note_id,
    217 				    int *keysize)
    218 {
    219 	struct cursor cur;
    220 	int size, pad;
    221 	make_cursor(buf, buf + bufsize, &cur);
    222 
    223 	// TODO: need update this to uint64_t
    224 	// we push this first because our query function can pull this off
    225 	// quicky to check matches
    226 	if (!push_varint(&cur, (int32_t)note_id))
    227 		return 0;
    228 
    229 	// string length
    230 	if (!push_varint(&cur, word_len))
    231 		return 0;
    232 
    233 	// non-null terminated, lowercase string
    234 	if (!cursor_push_lowercase(&cur, str, word_len))
    235 		return 0;
    236 
    237 	// TODO: need update this to uint64_t
    238 	if (!push_varint(&cur, (int)timestamp))
    239 		return 0;
    240 
    241 	// the index of the word in the content so that we can do more accurate
    242 	// phrase searches
    243 	if (!push_varint(&cur, word_index))
    244 		return 0;
    245 
    246 	size = cur.p - cur.start;
    247 
    248 	// pad to 8-byte alignment
    249 	pad = ((size + 7) & ~7) - size;
    250 	if (pad > 0) {
    251 		if (!cursor_memset(&cur, 0, pad)) {
    252 			return 0;
    253 		}
    254 	}
    255 
    256 	*keysize = cur.p - cur.start;
    257 	assert((*keysize % 8) == 0);
    258 
    259 	return 1;
    260 }
    261 
    262 static int ndb_make_noted_text_search_key(unsigned char *buf, int bufsize,
    263 					  int wordlen, const char *word,
    264 					  uint64_t timestamp, uint64_t note_id,
    265 					  int *keysize)
    266 {
    267 	return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word,
    268 					timestamp, note_id, keysize);
    269 }
    270 
    271 static int ndb_make_text_search_key_low(unsigned char *buf, int bufsize,
    272 					int wordlen, const char *word,
    273 					int *keysize)
    274 {
    275 	uint64_t timestamp, note_id;
    276 	timestamp = 0;
    277 	note_id = 0;
    278 	return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word,
    279 					timestamp, note_id, keysize);
    280 }
    281 
    282 static int ndb_make_text_search_key_high(unsigned char *buf, int bufsize,
    283 					 int wordlen, const char *word,
    284 					 int *keysize)
    285 {
    286 	uint64_t timestamp, note_id;
    287 	timestamp = INT32_MAX;
    288 	note_id = INT32_MAX;
    289 	return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word,
    290 					timestamp, note_id, keysize);
    291 }
    292 
    293 typedef int (*ndb_text_search_key_order_fn)(unsigned char *buf, int bufsize, int wordlen, const char *word, int *keysize);
    294 
    295 /** From LMDB: Compare two items lexically */
    296 static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) {
    297 	int diff;
    298 	ssize_t len_diff;
    299 	unsigned int len;
    300 
    301 	len = a->mv_size;
    302 	len_diff = (ssize_t) a->mv_size - (ssize_t) b->mv_size;
    303 	if (len_diff > 0) {
    304 		len = b->mv_size;
    305 		len_diff = 1;
    306 	}
    307 
    308 	diff = memcmp(a->mv_data, b->mv_data, len);
    309 	return diff ? diff : len_diff<0 ? -1 : len_diff;
    310 }
    311 
    312 static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b)
    313 {
    314 	struct cursor ca, cb;
    315 	int sa, sb;
    316 	int nid_a, nid_b;
    317 	MDB_val a2, b2;
    318 
    319 	make_cursor(a->mv_data, a->mv_data + a->mv_size, &ca);
    320 	make_cursor(b->mv_data, b->mv_data + b->mv_size, &cb);
    321 
    322 	// note_id
    323 	if (unlikely(!pull_varint(&ca, &nid_a) || !pull_varint(&cb, &nid_b)))
    324 		return 0;
    325 
    326 	// string size
    327 	if (unlikely(!pull_varint(&ca, &sa) || !pull_varint(&cb, &sb)))
    328 		return 0;
    329 
    330 	a2.mv_data = ca.p;
    331 	a2.mv_size = sa;
    332 
    333 	b2.mv_data = cb.p;
    334 	b2.mv_size = sb;
    335 
    336 	int cmp = mdb_cmp_memn(&a2, &b2);
    337 	if (cmp) return cmp;
    338 
    339 	// skip over string
    340 	ca.p += sa;
    341 	cb.p += sb;
    342 
    343 	// timestamp
    344 	if (unlikely(!pull_varint(&ca, &sa) || !pull_varint(&cb, &sb)))
    345 		return 0;
    346 
    347 	if      (sa < sb) return -1;
    348 	else if (sa > sb) return 1;
    349 
    350 	// note_id
    351 	if      (nid_a < nid_b) return -1;
    352 	else if (nid_a > nid_b) return 1;
    353 
    354 	// word index
    355 	if (unlikely(!pull_varint(&ca, &sa) || !pull_varint(&cb, &sb)))
    356 		return 0;
    357 
    358 	if      (sa < sb) return -1;
    359 	else if (sa > sb) return 1;
    360 
    361 	return 0;
    362 }
    363 
    364 static inline int ndb_unpack_text_search_key_noteid(
    365 		struct cursor *cur, uint64_t *note_id)
    366 {
    367 	int inote_id;
    368 	if (!pull_varint(cur, &inote_id))
    369 		return 0;
    370 
    371 	*note_id = inote_id;
    372 	return 1;
    373 }
    374 
    375 // faster peek of just the string instead of unpacking everything
    376 // this is used to quickly discard range query matches if there is no
    377 // common prefix
    378 static inline int ndb_unpack_text_search_key_string(struct cursor *cur,
    379 						    const char **str,
    380 						    int *str_len)
    381 {
    382 	if (!pull_varint(cur, str_len))
    383 		return 0;
    384 
    385 	*str = (const char *)cur->p;
    386 
    387 	if (!cursor_skip(cur, *str_len))
    388 		return 0;
    389 	
    390 	return 1;
    391 }
    392 
    393 // should be called after ndb_unpack_text_search_key_string. It continues
    394 // the unpacking of a text search key if we've already started it.
    395 static inline int
    396 ndb_unpack_remaining_text_search_key(struct cursor *cur,
    397 				     struct ndb_text_search_key *key)
    398 {
    399 	int timestamp;
    400 
    401 	if (!pull_varint(cur, &timestamp))
    402 		return 0;
    403 
    404 	if (!pull_varint(cur, &key->word_index))
    405 		return 0;
    406 
    407 	key->timestamp = timestamp;
    408 
    409 	return 1;
    410 }
    411 
    412 // unpack a fulltext search key
    413 //
    414 // full version of string + unpack remaining. This is split up because text
    415 // searching only requires to pull the string for prefix searching, and the
    416 // remaining is optional
    417 static inline int ndb_unpack_text_search_key(unsigned char *p, int len,
    418 				      struct ndb_text_search_key *key)
    419 {
    420 	struct cursor c;
    421 	make_cursor(p, p + len, &c);
    422 
    423 	if (!ndb_unpack_text_search_key_noteid(&c, &key->note_id))
    424 		return 0;
    425 
    426 	if (!ndb_unpack_text_search_key_string(&c, &key->str, &key->str_len))
    427 		return 0;
    428 
    429 	return ndb_unpack_remaining_text_search_key(&c, key);
    430 }
    431 
    432 // Copies only lowercase characters to the destination string and fills the rest with null bytes.
    433 // `dst` and `src` are pointers to the destination and source strings, respectively.
    434 // `n` is the maximum number of characters to copy.
    435 static void lowercase_strncpy(char *dst, const char *src, int n) {
    436 	int j = 0, i = 0;
    437 
    438 	if (!dst || !src || n == 0) {
    439 		return;
    440 	}
    441 
    442 	while (src[i] != '\0' && j < n) {
    443 		dst[j++] = tolower(src[i++]);
    444 	}
    445 
    446 	// Null-terminate and fill the destination string
    447 	while (j < n) {
    448 		dst[j++] = '\0';
    449 	}
    450 }
    451 
    452 int ndb_filter_init(struct ndb_filter *filter)
    453 {
    454 	struct cursor cur;
    455 	int page_size, elem_pages, data_pages, buf_size;
    456 
    457 	page_size = 4096; // assuming this, not a big deal if we're wrong
    458 	elem_pages = NDB_FILTER_PAGES / 4;
    459 	data_pages = NDB_FILTER_PAGES - elem_pages;
    460 	buf_size = page_size * NDB_FILTER_PAGES;
    461 
    462 	unsigned char *buf = malloc(buf_size);
    463 	if (!buf)
    464 		return 0;
    465 
    466 	// init memory arena for the cursor
    467 	make_cursor(buf, buf + buf_size, &cur);
    468 
    469 	cursor_slice(&cur, &filter->elem_buf, page_size * elem_pages);
    470 	cursor_slice(&cur, &filter->data_buf, page_size * data_pages);
    471 
    472 	// make sure we are fully allocated
    473 	assert(cur.p == cur.end);
    474 
    475 	// make sure elem_buf is the start of the buffer
    476 	assert(filter->elem_buf.start == cur.start);
    477 
    478 	filter->num_elements = 0;
    479 	filter->elements[0] = (struct ndb_filter_elements*) buf;
    480 	filter->current = NULL;
    481 
    482 	return 1;
    483 }
    484 
    485 void ndb_filter_reset(struct ndb_filter *filter)
    486 {
    487 	filter->num_elements = 0;
    488 	filter->elem_buf.p = filter->elem_buf.start;
    489 	filter->data_buf.p = filter->data_buf.start;
    490 	filter->current = NULL;
    491 }
    492 
    493 void ndb_filter_free(struct ndb_filter *filter)
    494 {
    495 	if (filter->elem_buf.start)
    496 		free(filter->elem_buf.start);
    497 
    498 	memset(filter, 0, sizeof(*filter));
    499 }
    500 
    501 static const char *ndb_filter_field_name(enum ndb_filter_fieldtype field)
    502 {
    503 	switch (field) {
    504 	case NDB_FILTER_IDS: return "ids";
    505 	case NDB_FILTER_AUTHORS: return "authors";
    506 	case NDB_FILTER_KINDS: return "kinds";
    507 	case NDB_FILTER_GENERIC: return "generic";
    508 	case NDB_FILTER_SINCE: return "since";
    509 	case NDB_FILTER_UNTIL: return "until";
    510 	case NDB_FILTER_LIMIT: return "limit";
    511 	}
    512 
    513 	return "unknown";
    514 }
    515 
    516 static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filter_fieldtype field, char generic)
    517 {
    518 	int i;
    519 	struct ndb_filter_elements *els, *el;
    520 
    521 	if (filter->current) {
    522 		fprintf(stderr, "ndb_filter_start_field: filter field already in progress, did you forget to call ndb_filter_end_field?\n");
    523 		return 0;
    524 	}
    525 
    526 	// you can only start and end fields once
    527 	for (i = 0; i < filter->num_elements; i++) {
    528 		el = filter->elements[i];
    529 		if (el->field.type == field) {
    530 			fprintf(stderr, "ndb_filter_start_field: field '%s' already exists\n",
    531 					ndb_filter_field_name(field));
    532 			return 0;
    533 		}
    534 	}
    535 
    536 	els = (struct ndb_filter_elements *) filter->elem_buf.p ;
    537 	filter->current = els;
    538 
    539 	// advance elem buffer to the variable data section
    540 	if (!cursor_skip(&filter->elem_buf, sizeof(struct ndb_filter_elements))) {
    541 		fprintf(stderr, "ndb_filter_start_field: '%s' oom (todo: realloc?)\n",
    542 				ndb_filter_field_name(field));
    543 		return 0;
    544 	}
    545 
    546 	els->field.type = field;
    547 	els->field.generic = generic;
    548 	els->field.elem_type = 0;
    549 	els->count = 0;
    550 
    551 	return 1;
    552 }
    553 
    554 int ndb_filter_start_field(struct ndb_filter *filter, enum ndb_filter_fieldtype field)
    555 {
    556 	return ndb_filter_start_field_impl(filter, field, 0);
    557 }
    558 
    559 int ndb_filter_start_generic_field(struct ndb_filter *filter, char tag)
    560 {
    561 	return ndb_filter_start_field_impl(filter, NDB_FILTER_GENERIC, tag);
    562 }
    563 
    564 static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_element el)
    565 {
    566 	unsigned char *data;
    567 	const char *str;
    568 
    569 	if (!filter->current)
    570 		return 0;
    571 
    572 	data = filter->data_buf.p;
    573 
    574 	switch (filter->current->field.type) {
    575 	case NDB_FILTER_IDS:
    576 	case NDB_FILTER_AUTHORS:
    577 		if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32))
    578 			return 0;
    579 		el.id = data;
    580 		break;
    581 	case NDB_FILTER_KINDS:
    582 		break;
    583 	case NDB_FILTER_SINCE:
    584 	case NDB_FILTER_UNTIL:
    585 	case NDB_FILTER_LIMIT:
    586 		// only one allowed for since/until
    587 		if (filter->current->count != 0)
    588 			return 0;
    589 		break;
    590 	case NDB_FILTER_GENERIC:
    591 		str = (const char *)filter->data_buf.p;
    592 		if (!cursor_push_c_str(&filter->data_buf, el.string))
    593 			return 0;
    594 		// push a pointer of the string in the databuf as an element
    595 		el.string = str;
    596 		break;
    597 	}
    598 
    599 	if (!cursor_push(&filter->elem_buf, (unsigned char*)&el, sizeof(el)))
    600 		return 0;
    601 
    602 	filter->current->count++;
    603 
    604 	return 1;
    605 }
    606 
    607 static int ndb_filter_set_elem_type(struct ndb_filter *filter,
    608 				    enum ndb_generic_element_type elem_type)
    609 {
    610 	enum ndb_generic_element_type current_elem_type;
    611 
    612 	if (!filter->current)
    613 		return 0;
    614 
    615 	current_elem_type = filter->current->field.elem_type;
    616 
    617 	// element types must be uniform
    618 	if (current_elem_type != elem_type && current_elem_type != NDB_ELEMENT_UNKNOWN) {
    619 		fprintf(stderr, "ndb_filter_set_elem_type: element types must be uniform\n");
    620 		return 0;
    621 	}
    622 
    623 	filter->current->field.elem_type = elem_type;
    624 
    625 	return 1;
    626 }
    627 
    628 int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str)
    629 {
    630 	union ndb_filter_element el;
    631 
    632 	if (!filter->current)
    633 		return 0;
    634 
    635 	// only generic queries are allowed to have strings
    636 	switch (filter->current->field.type) {
    637 	case NDB_FILTER_SINCE:
    638 	case NDB_FILTER_UNTIL:
    639 	case NDB_FILTER_LIMIT:
    640 	case NDB_FILTER_IDS:
    641 	case NDB_FILTER_AUTHORS:
    642 	case NDB_FILTER_KINDS:
    643 		return 0;
    644 	case NDB_FILTER_GENERIC:
    645 		break;
    646 	}
    647 
    648 	if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_STRING))
    649 		return 0;
    650 
    651 	el.string = str;
    652 	return ndb_filter_add_element(filter, el);
    653 }
    654 
    655 int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer)
    656 {
    657 	union ndb_filter_element el;
    658 	if (!filter->current)
    659 		return 0;
    660 
    661 	switch (filter->current->field.type) {
    662 	case NDB_FILTER_IDS:
    663 	case NDB_FILTER_AUTHORS:
    664 	case NDB_FILTER_GENERIC:
    665 		return 0;
    666 	case NDB_FILTER_KINDS:
    667 	case NDB_FILTER_SINCE:
    668 	case NDB_FILTER_UNTIL:
    669 	case NDB_FILTER_LIMIT:
    670 		break;
    671 	}
    672 
    673 	el.integer = integer;
    674 
    675 	return ndb_filter_add_element(filter, el);
    676 }
    677 
    678 int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id)
    679 {
    680 	union ndb_filter_element el;
    681 
    682 	if (!filter->current)
    683 		return 0;
    684 
    685 	// only certain filter types allow pushing id elements
    686 	switch (filter->current->field.type) {
    687 	case NDB_FILTER_SINCE:
    688 	case NDB_FILTER_UNTIL:
    689 	case NDB_FILTER_LIMIT:
    690 		return 0;
    691 	case NDB_FILTER_IDS:
    692 	case NDB_FILTER_AUTHORS:
    693 	case NDB_FILTER_KINDS:
    694 	case NDB_FILTER_GENERIC:
    695 		break;
    696 	}
    697 
    698 	if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_ID))
    699 		return 0;
    700 
    701 	// this is needed so that generic filters know its an id
    702 	el.id = id;
    703 
    704 	return ndb_filter_add_element(filter, el);
    705 }
    706 
    707 // TODO: build a hashtable so this is O(1)
    708 static int ndb_generic_filter_matches(struct ndb_filter_elements *els,
    709 				      struct ndb_note *note)
    710 {
    711 	int i;
    712 	union ndb_filter_element el;
    713 	struct ndb_iterator iter, *it = &iter;
    714 	struct ndb_str str;
    715 
    716 	ndb_tags_iterate_start(note, it);
    717 
    718 	while (ndb_tags_iterate_next(it)) {
    719 		// we're looking for tags with 2 or more entries: ["p", id], etc
    720 		if (it->tag->count < 2)
    721 			continue;
    722 
    723 		str = ndb_tag_str(note, it->tag, 0);
    724 
    725 		// we only care about packed strings (single char, etc)
    726 		if (str.flag != NDB_PACKED_STR)
    727 			continue;
    728 
    729 		// do we have #e matching e (or p, etc)
    730 		if (str.str[0] != els->field.generic || str.str[1] != 0)
    731 			continue;
    732 
    733 		str = ndb_tag_str(note, it->tag, 1);
    734 
    735 		switch (els->field.elem_type) {
    736 		case NDB_ELEMENT_ID:
    737 			// if our filter element type is an id, then we
    738 			// expect a packed id in the tag, otherwise skip
    739 			if (str.flag != NDB_PACKED_ID)
    740 				continue;
    741 			break;
    742 		case NDB_ELEMENT_STRING:
    743 			// if our filter element type is a string, then
    744 			// we should not expect an id
    745 			if (str.flag == NDB_PACKED_ID)
    746 				continue;
    747 			break;
    748 		case NDB_ELEMENT_UNKNOWN:
    749 		default:
    750 			// For some reason the element type is not set. It's
    751 			// possible nothing was added to the generic filter?
    752 			// Let's just fail here and log a note for debugging
    753 			fprintf(stderr, "UNUSUAL ndb_generic_filter_matches: have unknown element type %d\n", els->field.elem_type);
    754 			return 0;
    755 		}
    756 
    757 		for (i = 0; i < els->count; i++) {
    758 			el = els->elements[i];
    759 			switch (els->field.elem_type) {
    760 			case NDB_ELEMENT_ID:
    761 				if (!memcmp(el.id, str.id, 32))
    762 					return 1;
    763 				break;
    764 			case NDB_ELEMENT_STRING:
    765 				if (!strcmp(el.string, str.str))
    766 					return 1;
    767 				break;
    768 			case NDB_ELEMENT_UNKNOWN:
    769 				return 0;
    770 			}
    771 		}
    772 	}
    773 
    774 	return 0;
    775 }
    776 
    777 // returns 1 if a filter matches a note
    778 int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note)
    779 {
    780 	int i, j;
    781 	struct ndb_filter_elements *els;
    782 
    783 	for (i = 0; i < filter->num_elements; i++) {
    784 		els = filter->elements[i];
    785 
    786 		switch (els->field.type) {
    787 		case NDB_FILTER_KINDS:
    788 			for (j = 0; j < els->count; j++) {
    789 				if ((unsigned int)els->elements[j].integer == note->kind)
    790 					goto cont;
    791 			}
    792 			break;
    793 		// TODO: add filter hashtable for large id lists
    794 		case NDB_FILTER_IDS:
    795 			for (j = 0; j < els->count; j++) {
    796 				if (!memcmp(els->elements[j].id, note->id, 32))
    797 					goto cont;
    798 			}
    799 			break;
    800 		case NDB_FILTER_AUTHORS:
    801 			for (j = 0; j < els->count; j++) {
    802 				if (!memcmp(els->elements[j].id, note->pubkey, 32))
    803 					goto cont;
    804 			}
    805 			break;
    806 		case NDB_FILTER_GENERIC:
    807 			if (ndb_generic_filter_matches(els, note))
    808 				continue;
    809 			break;
    810 		case NDB_FILTER_SINCE:
    811 			assert(els->count == 1);
    812 			if (note->created_at >= els->elements[0].integer)
    813 				continue;
    814 			break;
    815 		case NDB_FILTER_UNTIL:
    816 			assert(els->count == 1);
    817 			if (note->created_at < els->elements[0].integer)
    818 				continue;
    819 		case NDB_FILTER_LIMIT:
    820 cont:
    821 			continue;
    822 		}
    823 
    824 		// all need to match
    825 		return 0;
    826 	}
    827 
    828 	return 1;
    829 }
    830 
    831 void ndb_filter_end_field(struct ndb_filter *filter)
    832 {
    833 	filter->elements[filter->num_elements++] = filter->current;
    834 	filter->current = NULL;
    835 }
    836 
    837 static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id,
    838 			        uint64_t timestamp, const char *search)
    839 {
    840 	memcpy(key->id, id, 32);
    841 	key->timestamp = timestamp;
    842 	lowercase_strncpy(key->search, search, sizeof(key->search) - 1);
    843 	key->search[sizeof(key->search) - 1] = '\0';
    844 }
    845 
    846 static int ndb_write_profile_search_index(struct ndb_txn *txn,
    847 					  struct ndb_search_key *index_key,
    848 					  uint64_t profile_key)
    849 {
    850 	int rc;
    851 	MDB_val key, val;
    852 	
    853 	key.mv_data = index_key;
    854 	key.mv_size = sizeof(*index_key);
    855 	val.mv_data = &profile_key;
    856 	val.mv_size = sizeof(profile_key);
    857 
    858 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH],
    859 			  &key, &val, 0)))
    860 	{
    861 		ndb_debug("ndb_write_profile_search_index failed: %s\n",
    862 			  mdb_strerror(rc));
    863 		return 0;
    864 	}
    865 
    866 	return 1;
    867 }
    868 
    869 
    870 // map usernames and display names to profile keys for user searching
    871 static int ndb_write_profile_search_indices(struct ndb_txn *txn,
    872 					    struct ndb_note *note,
    873 					    uint64_t profile_key,
    874 					    void *profile_root)
    875 {
    876 	struct ndb_search_key index;
    877 	NdbProfileRecord_table_t profile_record;
    878 	NdbProfile_table_t profile;
    879 
    880 	profile_record = NdbProfileRecord_as_root(profile_root);
    881 	profile = NdbProfileRecord_profile_get(profile_record);
    882 
    883 	const char *name = NdbProfile_name_get(profile);
    884 	const char *display_name = NdbProfile_display_name_get(profile);
    885 
    886 	// words + pubkey + created
    887 	if (name) {
    888 		ndb_make_search_key(&index, note->pubkey, note->created_at,
    889 				    name);
    890 		if (!ndb_write_profile_search_index(txn, &index, profile_key))
    891 			return 0;
    892 	}
    893 
    894 	if (display_name) {
    895 		// don't write the same name/display_name twice
    896 		if (name && !strcmp(display_name, name)) {
    897 			return 1;
    898 		}
    899 		ndb_make_search_key(&index, note->pubkey, note->created_at,
    900 				    display_name);
    901 		if (!ndb_write_profile_search_index(txn, &index, profile_key))
    902 			return 0;
    903 	}
    904 
    905 	return 1;
    906 }
    907 
    908 
    909 static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags)
    910 {
    911 	txn->lmdb = &ndb->lmdb;
    912 	MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn;
    913 	if (!txn->lmdb->env)
    914 		return 0;
    915 	return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0;
    916 }
    917 
    918 int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn)
    919 {
    920 	return _ndb_begin_query(ndb, txn, MDB_RDONLY);
    921 }
    922 
    923 // this should only be used in migrations, etc
    924 static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn)
    925 {
    926 	return _ndb_begin_query(ndb, txn, 0);
    927 }
    928 
    929 
    930 // Migrations
    931 //
    932 
    933 static int ndb_migrate_user_search_indices(struct ndb *ndb)
    934 {
    935 	int rc;
    936 	MDB_cursor *cur;
    937 	MDB_val k, v;
    938 	void *profile_root;
    939 	NdbProfileRecord_table_t record;
    940 	struct ndb_txn txn;
    941 	struct ndb_note *note;
    942 	uint64_t note_key, profile_key;
    943 	size_t len;
    944 	int count;
    945 
    946 	if (!ndb_begin_rw_query(ndb, &txn)) {
    947 		fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_rw_query failed\n");
    948 		return 0;
    949 	}
    950 
    951 	if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) {
    952 		fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc);
    953 		return 0;
    954 	}
    955 
    956 	count = 0;
    957 
    958 	// loop through all profiles and write search indices
    959 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
    960 		profile_root = v.mv_data;
    961 		profile_key = *((uint64_t*)k.mv_data);
    962 		record = NdbProfileRecord_as_root(profile_root);
    963 		note_key = NdbProfileRecord_note_key(record);
    964 		note = ndb_get_note_by_key(&txn, note_key, &len);
    965 
    966 		if (note == NULL) {
    967 			fprintf(stderr, "ndb_migrate_user_search_indices: note lookup failed\n");
    968 			return 0;
    969 		}
    970 
    971 		if (!ndb_write_profile_search_indices(&txn, note, profile_key,
    972 						      profile_root)) {
    973 
    974 			fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n");
    975 			return 0;
    976 		}
    977 
    978 		count++;
    979 	}
    980 
    981 	fprintf(stderr, "migrated %d profiles to include search indices\n", count);
    982 
    983 	mdb_cursor_close(cur);
    984 
    985 	ndb_end_query(&txn);
    986 
    987 	return 1;
    988 }
    989 
    990 static int ndb_migrate_lower_user_search_indices(struct ndb *ndb)
    991 {
    992 	MDB_txn *txn;
    993 
    994 	if (mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn)) {
    995 		fprintf(stderr, "ndb_migrate_lower_user_search_indices: ndb_txn_begin failed\n");
    996 		return 0;
    997 	}
    998 
    999 	// just drop the search db so we can rebuild it
   1000 	if (mdb_drop(txn, ndb->lmdb.dbs[NDB_DB_PROFILE_SEARCH], 0)) {
   1001 		fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n");
   1002 		return 0;
   1003 	}
   1004 
   1005 	mdb_txn_commit(txn);
   1006 
   1007 	return ndb_migrate_user_search_indices(ndb);
   1008 }
   1009 
   1010 int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile);
   1011 
   1012 
   1013 int ndb_db_version(struct ndb *ndb)
   1014 {
   1015 	int rc;
   1016 	uint64_t version, version_key;
   1017 	MDB_val k, v;
   1018 	MDB_txn *txn;
   1019 
   1020 	version_key = NDB_META_KEY_VERSION;
   1021 	k.mv_data = &version_key;
   1022 	k.mv_size = sizeof(version_key);
   1023 
   1024 	if ((rc = mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn))) {
   1025 		fprintf(stderr, "ndb_db_version: mdb_txn_begin failed, error %d\n", rc);
   1026 		return -1;
   1027 	}
   1028 
   1029 	if (mdb_get(txn, ndb->lmdb.dbs[NDB_DB_NDB_META], &k, &v)) {
   1030 		version = -1;
   1031 	} else {
   1032 		if (v.mv_size != 8) {
   1033 			fprintf(stderr, "run_migrations: invalid version size?");
   1034 			return 0;
   1035 		}
   1036 		version = *((uint64_t*)v.mv_data);
   1037 	}
   1038 
   1039 	mdb_txn_abort(txn);
   1040 	return version;
   1041 }
   1042 
   1043 // custom kind+timestamp comparison function. This is used by lmdb to perform
   1044 // b+ tree searches over the kind+timestamp index
   1045 static int ndb_u64_tsid_compare(const MDB_val *a, const MDB_val *b)
   1046 {
   1047 	struct ndb_u64_tsid *tsa, *tsb;
   1048 	tsa = a->mv_data;
   1049 	tsb = b->mv_data;
   1050 
   1051 	if (tsa->u64 < tsb->u64)
   1052 		return -1;
   1053 	else if (tsa->u64 > tsb->u64)
   1054 		return 1;
   1055 
   1056 	if (tsa->timestamp < tsb->timestamp)
   1057 		return -1;
   1058 	else if (tsa->timestamp > tsb->timestamp)
   1059 		return 1;
   1060 
   1061 	return 0;
   1062 }
   1063 
   1064 static int ndb_tsid_compare(const MDB_val *a, const MDB_val *b)
   1065 {
   1066 	struct ndb_tsid *tsa, *tsb;
   1067 	MDB_val a2 = *a, b2 = *b;
   1068 
   1069 	a2.mv_size = sizeof(tsa->id);
   1070 	b2.mv_size = sizeof(tsb->id);
   1071 
   1072 	int cmp = mdb_cmp_memn(&a2, &b2);
   1073 	if (cmp) return cmp;
   1074 
   1075 	tsa = a->mv_data;
   1076 	tsb = b->mv_data;
   1077 
   1078 	if (tsa->timestamp < tsb->timestamp)
   1079 		return -1;
   1080 	else if (tsa->timestamp > tsb->timestamp)
   1081 		return 1;
   1082 	return 0;
   1083 }
   1084 
   1085 static inline void ndb_tsid_low(struct ndb_tsid *key, unsigned char *id)
   1086 {
   1087 	memcpy(key->id, id, 32);
   1088 	key->timestamp = 0;
   1089 }
   1090 
   1091 static inline void ndb_tsid_init(struct ndb_tsid *key, unsigned char *id,
   1092 				 uint64_t timestamp)
   1093 {
   1094 	memcpy(key->id, id, 32);
   1095 	key->timestamp = timestamp;
   1096 }
   1097 
   1098 static inline void ndb_u64_tsid_init(struct ndb_u64_tsid *key, uint64_t integer,
   1099 				     uint64_t timestamp)
   1100 {
   1101 	key->u64 = integer;
   1102 	key->timestamp = timestamp;
   1103 }
   1104 
   1105 // useful for range-searching for the latest key with a clustered created_at timen
   1106 static inline void ndb_tsid_high(struct ndb_tsid *key, const unsigned char *id)
   1107 {
   1108 	memcpy(key->id, id, 32);
   1109 	key->timestamp = UINT64_MAX;
   1110 }
   1111 
   1112 enum ndb_ingester_msgtype {
   1113 	NDB_INGEST_EVENT, // write json to the ingester queue for processing 
   1114 	NDB_INGEST_QUIT,  // kill ingester thread immediately
   1115 };
   1116 
   1117 struct ndb_ingester_event {
   1118 	char *json;
   1119 	unsigned client : 1; // ["EVENT", {...}] messages
   1120 	unsigned len : 31;
   1121 };
   1122 
   1123 struct ndb_writer_note {
   1124 	struct ndb_note *note;
   1125 	size_t note_len;
   1126 };
   1127 
   1128 struct ndb_writer_profile {
   1129 	struct ndb_writer_note note;
   1130 	struct ndb_profile_record_builder record;
   1131 };
   1132 
   1133 struct ndb_ingester_msg {
   1134 	enum ndb_ingester_msgtype type;
   1135 	union {
   1136 		struct ndb_ingester_event event;
   1137 	};
   1138 };
   1139 
   1140 struct ndb_writer_ndb_meta {
   1141 	// these are 64 bit because I'm paranoid of db-wide alignment issues
   1142 	uint64_t version;
   1143 };
   1144 
   1145 // Used in the writer thread when writing ndb_profile_fetch_record's
   1146 //   kv = pubkey: recor
   1147 struct ndb_writer_last_fetch {
   1148 	unsigned char pubkey[32];
   1149 	uint64_t fetched_at;
   1150 };
   1151 
   1152 // The different types of messages that the writer thread can write to the
   1153 // database
   1154 struct ndb_writer_msg {
   1155 	enum ndb_writer_msgtype type;
   1156 	union {
   1157 		struct ndb_writer_note note;
   1158 		struct ndb_writer_profile profile;
   1159 		struct ndb_writer_ndb_meta ndb_meta;
   1160 		struct ndb_writer_last_fetch last_fetch;
   1161 	};
   1162 };
   1163 
   1164 static inline int ndb_writer_queue_msg(struct ndb_writer *writer,
   1165 				       struct ndb_writer_msg *msg)
   1166 {
   1167 	return prot_queue_push(&writer->inbox, msg);
   1168 }
   1169 
   1170 static int ndb_migrate_utf8_profile_names(struct ndb *ndb)
   1171 {
   1172 	int rc;
   1173 	MDB_cursor *cur;
   1174 	MDB_val k, v;
   1175 	void *profile_root;
   1176 	NdbProfileRecord_table_t record;
   1177 	struct ndb_txn txn;
   1178 	struct ndb_note *note, *copied_note;
   1179 	uint64_t note_key;
   1180 	size_t len;
   1181 	int count, failed;
   1182 	struct ndb_writer_msg out;
   1183 
   1184 	if (!ndb_begin_rw_query(ndb, &txn)) {
   1185 		fprintf(stderr, "ndb_migrate_utf8_profile_names: ndb_begin_rw_query failed\n");
   1186 		return 0;
   1187 	}
   1188 
   1189 	if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) {
   1190 		fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc);
   1191 		return 0;
   1192 	}
   1193 
   1194 	count = 0;
   1195 	failed = 0;
   1196 
   1197 	// loop through all profiles and write search indices
   1198 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   1199 		profile_root = v.mv_data;
   1200 		record = NdbProfileRecord_as_root(profile_root);
   1201 		note_key = NdbProfileRecord_note_key(record);
   1202 		note = ndb_get_note_by_key(&txn, note_key, &len);
   1203 
   1204 		if (note == NULL) {
   1205 			fprintf(stderr, "ndb_migrate_utf8_profile_names: note lookup failed\n");
   1206 			return 0;
   1207 		}
   1208 
   1209 		struct ndb_profile_record_builder *b = &out.profile.record;
   1210 
   1211 		// reprocess profile
   1212 		if (!ndb_process_profile_note(note, b)) {
   1213 			failed++;
   1214 			continue;
   1215 		}
   1216 
   1217 		// the writer needs to own this note, and its expected to free it
   1218 		copied_note = malloc(len);
   1219 		memcpy(copied_note, note, len);
   1220 
   1221 		out.type = NDB_WRITER_PROFILE;
   1222 		out.profile.note.note = copied_note;
   1223 		out.profile.note.note_len = len;
   1224 
   1225 		ndb_writer_queue_msg(&ndb->writer, &out);
   1226 
   1227 		count++;
   1228 	}
   1229 
   1230 	fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count);
   1231 
   1232 	if (failed != 0) {
   1233 		fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed);
   1234 	}
   1235 
   1236 	mdb_cursor_close(cur);
   1237 
   1238 	ndb_end_query(&txn);
   1239 
   1240 	return 1;
   1241 }
   1242 
   1243 static struct ndb_migration MIGRATIONS[] = {
   1244 	{ .fn = ndb_migrate_user_search_indices },
   1245 	{ .fn = ndb_migrate_lower_user_search_indices },
   1246 	{ .fn = ndb_migrate_utf8_profile_names }
   1247 };
   1248 
   1249 
   1250 int ndb_end_query(struct ndb_txn *txn)
   1251 {
   1252 	// this works on read or write queries. 
   1253 	return mdb_txn_commit(txn->mdb_txn) == 0;
   1254 }
   1255 
   1256 int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32],
   1257 		    unsigned char sig[64])
   1258 {
   1259 	secp256k1_xonly_pubkey xonly_pubkey;
   1260 	int ok;
   1261 
   1262 	ok = secp256k1_xonly_pubkey_parse((secp256k1_context*)ctx, &xonly_pubkey,
   1263 					  pubkey) != 0;
   1264 	if (!ok) return 0;
   1265 
   1266 	ok = secp256k1_schnorrsig_verify((secp256k1_context*)ctx, sig, id, 32,
   1267 					 &xonly_pubkey) > 0;
   1268 	if (!ok) return 0;
   1269 
   1270 	return 1;
   1271 }
   1272 
   1273 static inline int ndb_writer_queue_msgs(struct ndb_writer *writer,
   1274 					struct ndb_writer_msg *msgs,
   1275 					int num_msgs)
   1276 {
   1277 	return prot_queue_push_all(&writer->inbox, msgs, num_msgs);
   1278 }
   1279 
   1280 static int ndb_writer_queue_note(struct ndb_writer *writer,
   1281 				 struct ndb_note *note, size_t note_len)
   1282 {
   1283 	struct ndb_writer_msg msg;
   1284 	msg.type = NDB_WRITER_NOTE;
   1285 
   1286 	msg.note.note = note;
   1287 	msg.note.note_len = note_len;
   1288 
   1289 	return prot_queue_push(&writer->inbox, &msg);
   1290 }
   1291 
   1292 static void ndb_writer_last_profile_fetch(struct ndb_txn *txn,
   1293 					  const unsigned char *pubkey,
   1294 					  uint64_t fetched_at)
   1295 {
   1296 	int rc;
   1297 	MDB_val key, val;
   1298 	
   1299 	key.mv_data = (unsigned char*)pubkey;
   1300 	key.mv_size = 32;
   1301 	val.mv_data = &fetched_at;
   1302 	val.mv_size = sizeof(fetched_at);
   1303 
   1304 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH],
   1305 			  &key, &val, 0)))
   1306 	{
   1307 		ndb_debug("write version to ndb_meta failed: %s\n",
   1308 				mdb_strerror(rc));
   1309 		return;
   1310 	}
   1311 
   1312 	//fprintf(stderr, "writing version %" PRIu64 "\n", version);
   1313 }
   1314 
   1315 
   1316 // We just received a profile that we haven't processed yet, but it could
   1317 // be an older one! Make sure we only write last fetched profile if it's a new
   1318 // one
   1319 //
   1320 // To do this, we first check the latest profile in the database. If the
   1321 // created_date for this profile note is newer, then we write a
   1322 // last_profile_fetch record, otherwise we do not.
   1323 //
   1324 // WARNING: This function is only valid when called from the writer thread
   1325 static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn,
   1326 					       struct ndb_note *note)
   1327 {
   1328 	size_t len;
   1329 	uint64_t profile_key, note_key;
   1330 	void *root;
   1331 	struct ndb_note *last_profile;
   1332 	NdbProfileRecord_table_t record;
   1333 
   1334 	if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) {
   1335 		record = NdbProfileRecord_as_root(root);
   1336 		note_key = NdbProfileRecord_note_key(record);
   1337 		last_profile = ndb_get_note_by_key(txn, note_key, &len);
   1338 		if (last_profile == NULL) {
   1339 			return 0;
   1340 		}
   1341 
   1342 		// found profile, let's see if it's newer than ours
   1343 		if (note->created_at > last_profile->created_at) {
   1344 			// this is a new profile note, record last fetched time
   1345 			ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL));
   1346 		}
   1347 	} else {
   1348 		// couldn't fetch profile. record last fetched time
   1349 		ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL));
   1350 	}
   1351 
   1352 	return 1;
   1353 }
   1354 
   1355 int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey,
   1356 				 uint64_t fetched_at)
   1357 {
   1358 	struct ndb_writer_msg msg;
   1359 	msg.type = NDB_WRITER_PROFILE_LAST_FETCH;
   1360 	memcpy(&msg.last_fetch.pubkey[0], pubkey, 32);
   1361 	msg.last_fetch.fetched_at = fetched_at;
   1362 
   1363 	return ndb_writer_queue_msg(&ndb->writer, &msg);
   1364 }
   1365 
   1366 // get some value based on a clustered id key
   1367 int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id,
   1368 		 MDB_val *val)
   1369 {
   1370 	MDB_val k, v;
   1371 	MDB_cursor *cur;
   1372 	int success = 0, rc;
   1373 	struct ndb_tsid tsid;
   1374 
   1375 	// position at the most recent
   1376 	ndb_tsid_high(&tsid, id);
   1377 
   1378 	k.mv_data = &tsid;
   1379 	k.mv_size = sizeof(tsid);
   1380 
   1381 	if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur))) {
   1382 		ndb_debug("ndb_get_tsid: failed to open cursor: '%s'\n", mdb_strerror(rc));
   1383 		return 0;
   1384 	}
   1385 
   1386 	// Position cursor at the next key greater than or equal to the specified key
   1387 	if (mdb_cursor_get(cur, &k, &v, MDB_SET_RANGE)) {
   1388 		// Failed :(. It could be the last element?
   1389 		if (mdb_cursor_get(cur, &k, &v, MDB_LAST))
   1390 			goto cleanup;
   1391 	} else {
   1392 		// if set range worked and our key exists, it should be
   1393 		// the one right before this one
   1394 		if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
   1395 			goto cleanup;
   1396 	}
   1397 
   1398 	if (memcmp(k.mv_data, id, 32) == 0) {
   1399 		*val = v;
   1400 		success = 1;
   1401 	}
   1402 
   1403 cleanup:
   1404 	mdb_cursor_close(cur);
   1405 	return success;
   1406 }
   1407 
   1408 static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key,
   1409 			       enum ndb_dbs store, size_t *len)
   1410 {
   1411 	MDB_val k, v;
   1412 
   1413 	k.mv_data = &key;
   1414 	k.mv_size = sizeof(key);
   1415 
   1416 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) {
   1417 		ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
   1418 		return NULL;
   1419 	}
   1420 
   1421 	if (len)
   1422 		*len = v.mv_size;
   1423 
   1424 	return v.mv_data;
   1425 }
   1426 
   1427 static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind,
   1428 			     enum ndb_dbs store, const unsigned char *pk,
   1429 			     size_t *len, uint64_t *primkey)
   1430 {
   1431 	MDB_val k, v;
   1432 	void *res = NULL;
   1433 	if (len)
   1434 		*len = 0;
   1435 
   1436 	if (!ndb_get_tsid(txn, ind, pk, &k)) {
   1437 		//ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n");
   1438 		return 0;
   1439 	}
   1440 
   1441 	if (primkey)
   1442 		*primkey = *(uint64_t*)k.mv_data;
   1443 
   1444 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) {
   1445 		ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
   1446 		return 0;
   1447 	}
   1448 
   1449 	res = v.mv_data;
   1450 	assert(((uint64_t)res % 4) == 0);
   1451 	if (len)
   1452 		*len = v.mv_size;
   1453 	return res;
   1454 }
   1455 
   1456 void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pk, size_t *len, uint64_t *key)
   1457 {
   1458 	return ndb_lookup_tsid(txn, NDB_DB_PROFILE_PK, NDB_DB_PROFILE, pk, len, key);
   1459 }
   1460 
   1461 struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *key)
   1462 {
   1463 	return ndb_lookup_tsid(txn, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len, key);
   1464 }
   1465 
   1466 static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn,
   1467 					      enum ndb_dbs db,
   1468 					      const unsigned char *id)
   1469 {
   1470 	MDB_val k;
   1471 
   1472 	if (!ndb_get_tsid(txn, db, id, &k))
   1473 		return 0;
   1474 
   1475 	return *(uint32_t*)k.mv_data;
   1476 }
   1477 
   1478 uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id)
   1479 {
   1480 	return ndb_get_indexkey_by_id(txn, NDB_DB_NOTE_ID, id);
   1481 }
   1482 
   1483 uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id)
   1484 {
   1485 	return ndb_get_indexkey_by_id(txn, NDB_DB_PROFILE_PK, id);
   1486 }
   1487 
   1488 struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len)
   1489 {
   1490 	return ndb_lookup_by_key(txn, key, NDB_DB_NOTE, len);
   1491 }
   1492 
   1493 void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len)
   1494 {
   1495 	return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len);
   1496 }
   1497 
   1498 uint64_t
   1499 ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey)
   1500 {
   1501 	MDB_val k, v;
   1502 
   1503 	k.mv_data = (unsigned char*)pubkey;
   1504 	k.mv_size = 32;
   1505 
   1506 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) {
   1507 		//ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n");
   1508 		return 0;
   1509 	}
   1510 
   1511 	return *((uint64_t*)v.mv_data);
   1512 }
   1513 
   1514 
   1515 static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id)
   1516 {
   1517 	MDB_val val;
   1518 
   1519 	if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val))
   1520 		return 0;
   1521 
   1522 	return 1;
   1523 }
   1524 
   1525 static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb,
   1526 			     MDB_txn *mdb_txn)
   1527 {
   1528 	txn->lmdb = lmdb;
   1529 	txn->mdb_txn = mdb_txn;
   1530 }
   1531 
   1532 static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid)
   1533 {
   1534 	unsigned char id[32];
   1535 	struct ndb_ingest_controller *c = data;
   1536 	struct ndb_txn txn;
   1537 
   1538 	hex_decode(hexid, 64, id, sizeof(id));
   1539 
   1540 	// let's see if we already have it
   1541 
   1542 	ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn);
   1543 	if (!ndb_has_note(&txn, id))
   1544 		return NDB_IDRES_CONT;
   1545 
   1546 	return NDB_IDRES_STOP;
   1547 }
   1548 
   1549 static int ndbprofile_parse_json(flatcc_builder_t *B,
   1550         const char *buf, size_t bufsiz, int flags, NdbProfile_ref_t *profile)
   1551 {
   1552 	flatcc_json_parser_t parser, *ctx = &parser;
   1553 	flatcc_json_parser_init(ctx, B, buf, buf + bufsiz, flags);
   1554 
   1555 	if (flatcc_builder_start_buffer(B, 0, 0, 0))
   1556 		return 0;
   1557 
   1558 	NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile);
   1559 	if (ctx->error)
   1560 		return 0;
   1561  
   1562 	if (!flatcc_builder_end_buffer(B, *profile))
   1563 		return 0;
   1564 
   1565 	ctx->end_loc = buf;
   1566 
   1567 
   1568 	return 1;
   1569 }
   1570 
   1571 void ndb_profile_record_builder_init(struct ndb_profile_record_builder *b)
   1572 {
   1573 	b->builder = malloc(sizeof(*b->builder));
   1574 	b->flatbuf = NULL;
   1575 }
   1576 
   1577 void ndb_profile_record_builder_free(struct ndb_profile_record_builder *b)
   1578 {
   1579 	if (b->builder)
   1580 		free(b->builder);
   1581 	if (b->flatbuf)
   1582 		free(b->flatbuf);
   1583 
   1584 	b->builder = NULL;
   1585 	b->flatbuf = NULL;
   1586 }
   1587 
   1588 int ndb_process_profile_note(struct ndb_note *note,
   1589 			     struct ndb_profile_record_builder *profile)
   1590 {
   1591 	int res;
   1592 
   1593 	NdbProfile_ref_t profile_table;
   1594 	flatcc_builder_t *builder;
   1595 
   1596 	ndb_profile_record_builder_init(profile);
   1597 	builder = profile->builder;
   1598 	flatcc_builder_init(builder);
   1599 
   1600 	NdbProfileRecord_start_as_root(builder);
   1601 
   1602 	//printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note));
   1603 	if (!(res = ndbprofile_parse_json(builder, ndb_note_content(note),
   1604 					  note->content_length,
   1605 					  flatcc_json_parser_f_skip_unknown,
   1606 					  &profile_table)))
   1607 	{
   1608 		ndb_debug("profile_parse_json failed %d '%.*s'\n", res,
   1609 			  note->content_length, ndb_note_content(note));
   1610 		ndb_profile_record_builder_free(profile);
   1611 		return 0;
   1612 	}
   1613 
   1614 	uint64_t received_at = time(NULL);
   1615 	const char *lnurl = "fixme";
   1616 
   1617 	NdbProfileRecord_profile_add(builder, profile_table);
   1618 	NdbProfileRecord_received_at_add(builder, received_at);
   1619 
   1620 	flatcc_builder_ref_t lnurl_off;
   1621 	lnurl_off = flatcc_builder_create_string_str(builder, lnurl);
   1622 
   1623 	NdbProfileRecord_lnurl_add(builder, lnurl_off);
   1624 
   1625 	//*profile = flatcc_builder_finalize_aligned_buffer(builder, profile_len);
   1626 	return 1;
   1627 }
   1628 
   1629 static int ndb_ingester_process_note(secp256k1_context *ctx,
   1630 				     struct ndb_note *note,
   1631 				     size_t note_size,
   1632 				     struct ndb_writer_msg *out,
   1633 				     struct ndb_ingester *ingester)
   1634 {
   1635 	enum ndb_ingest_filter_action action;
   1636 	action = NDB_INGEST_ACCEPT;
   1637 
   1638 	if (ingester->filter)
   1639 		action = ingester->filter(ingester->filter_context, note);
   1640 
   1641 	if (action == NDB_INGEST_REJECT)
   1642 		return 0;
   1643 
   1644 	// some special situations we might want to skip sig validation,
   1645 	// like during large imports
   1646 	if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) {
   1647 		// if we're skipping validation we don't need to verify
   1648 	} else {
   1649 		// verify! If it's an invalid note we don't need to
   1650 		// bother writing it to the database
   1651 		if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
   1652 			ndb_debug("signature verification failed\n");
   1653 			return 0;
   1654 		}
   1655 	}
   1656 
   1657 	// we didn't find anything. let's send it
   1658 	// to the writer thread
   1659 	note = realloc(note, note_size);
   1660 	assert(((uint64_t)note % 4) == 0);
   1661 
   1662 	if (note->kind == 0) {
   1663 		struct ndb_profile_record_builder *b = 
   1664 			&out->profile.record;
   1665 
   1666 		ndb_process_profile_note(note, b);
   1667 
   1668 		out->type = NDB_WRITER_PROFILE;
   1669 		out->profile.note.note = note;
   1670 		out->profile.note.note_len = note_size;
   1671 	} else {
   1672 		out->type = NDB_WRITER_NOTE;
   1673 		out->note.note = note;
   1674 		out->note.note_len = note_size;
   1675 	}
   1676 
   1677 	return 1;
   1678 }
   1679 
   1680 
   1681 static int ndb_ingester_process_event(secp256k1_context *ctx,
   1682 				      struct ndb_ingester *ingester,
   1683 				      struct ndb_ingester_event *ev,
   1684 				      struct ndb_writer_msg *out,
   1685 				      MDB_txn *read_txn
   1686 				      )
   1687 {
   1688 	struct ndb_tce tce;
   1689 	struct ndb_fce fce;
   1690 	struct ndb_note *note;
   1691 	struct ndb_ingest_controller controller;
   1692 	struct ndb_id_cb cb;
   1693 	void *buf;
   1694 	int ok;
   1695 	size_t bufsize, note_size;
   1696 
   1697 	ok = 0;
   1698 
   1699 	// we will use this to check if we already have it in the DB during
   1700 	// ID parsing
   1701 	controller.read_txn = read_txn;
   1702 	controller.lmdb = ingester->writer->lmdb;
   1703 	cb.fn = ndb_ingester_json_controller;
   1704 	cb.data = &controller;
   1705 
   1706 	// since we're going to be passing this allocated note to a different
   1707 	// thread, we can't use thread-local buffers. just allocate a block
   1708         bufsize = max(ev->len * 8.0, 4096);
   1709 	buf = malloc(bufsize);
   1710 	if (!buf) {
   1711 		ndb_debug("couldn't malloc buf\n");
   1712 		return 0;
   1713 	}
   1714 
   1715 	note_size =
   1716 		ev->client ? 
   1717 		ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) :
   1718 		ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb);
   1719 
   1720 	if ((int)note_size == -42) {
   1721 		// we already have this!
   1722 		//ndb_debug("already have id??\n");
   1723 		goto cleanup;
   1724 	} else if (note_size == 0) {
   1725 		ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json);
   1726 		goto cleanup;
   1727 	}
   1728 
   1729 	//ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json);
   1730 
   1731 	if (ev->client) {
   1732 		switch (fce.evtype) {
   1733 		case NDB_FCE_EVENT:
   1734 			note = fce.event.note;
   1735 			if (note != buf) {
   1736 				ndb_debug("note buffer not equal to malloc'd buffer\n");
   1737 				goto cleanup;
   1738 			}
   1739 
   1740 			if (!ndb_ingester_process_note(ctx, note, note_size,
   1741 						       out, ingester)) {
   1742 				goto cleanup;
   1743 			} else {
   1744 				// we're done with the original json, free it
   1745 				free(ev->json);
   1746 				return 1;
   1747 			}
   1748 		}
   1749 	} else {
   1750 		switch (tce.evtype) {
   1751 		case NDB_TCE_NOTICE: goto cleanup;
   1752 		case NDB_TCE_EOSE:   goto cleanup;
   1753 		case NDB_TCE_OK:     goto cleanup;
   1754 		case NDB_TCE_EVENT:
   1755 			note = tce.event.note;
   1756 			if (note != buf) {
   1757 				ndb_debug("note buffer not equal to malloc'd buffer\n");
   1758 				goto cleanup;
   1759 			}
   1760 
   1761 			if (!ndb_ingester_process_note(ctx, note, note_size,
   1762 						       out, ingester)) {
   1763 				goto cleanup;
   1764 			} else {
   1765 				// we're done with the original json, free it
   1766 				free(ev->json);
   1767 				return 1;
   1768 			}
   1769 		}
   1770 	}
   1771 
   1772 
   1773 cleanup:
   1774 	free(ev->json);
   1775 	free(buf);
   1776 
   1777 	return ok;
   1778 }
   1779 
   1780 static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db)
   1781 {
   1782 	MDB_cursor *mc;
   1783 	MDB_val key, val;
   1784 
   1785 	if (mdb_cursor_open(txn, db, &mc))
   1786 		return 0;
   1787 
   1788 	if (mdb_cursor_get(mc, &key, &val, MDB_LAST)) {
   1789 		mdb_cursor_close(mc);
   1790 		return 0;
   1791 	}
   1792 
   1793 	mdb_cursor_close(mc);
   1794 
   1795 	assert(key.mv_size == 8);
   1796         return *((uint64_t*)key.mv_data);
   1797 }
   1798 
   1799 //
   1800 // make a search key meant for user queries without any other note info
   1801 static void ndb_make_search_key_low(struct ndb_search_key *key, const char *search)
   1802 {
   1803 	memset(key->id, 0, sizeof(key->id));
   1804 	key->timestamp = 0;
   1805 	lowercase_strncpy(key->search, search, sizeof(key->search) - 1);
   1806 	key->search[sizeof(key->search) - 1] = '\0';
   1807 }
   1808 
   1809 int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query)
   1810 {
   1811 	int rc;
   1812 	struct ndb_search_key s;
   1813 	MDB_val k, v;
   1814 	search->cursor = NULL;
   1815 
   1816 	MDB_cursor **cursor = (MDB_cursor **)&search->cursor;
   1817 
   1818 	ndb_make_search_key_low(&s, query);
   1819 
   1820 	k.mv_data = &s;
   1821 	k.mv_size = sizeof(s);
   1822 
   1823 	if ((rc = mdb_cursor_open(txn->mdb_txn,
   1824 				  txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH],
   1825 				  cursor))) {
   1826 		printf("search_profile: cursor opened failed: %s\n",
   1827 				mdb_strerror(rc));
   1828 		return 0;
   1829 	}
   1830 
   1831 	// Position cursor at the next key greater than or equal to the specified key
   1832 	if (mdb_cursor_get(search->cursor, &k, &v, MDB_SET_RANGE)) {
   1833 		printf("search_profile: cursor get failed\n");
   1834 		goto cleanup;
   1835 	} else {
   1836 		search->key = k.mv_data;
   1837 		assert(v.mv_size == 8);
   1838 		search->profile_key = *((uint64_t*)v.mv_data);
   1839 		return 1;
   1840 	}
   1841 
   1842 cleanup:
   1843 	mdb_cursor_close(search->cursor);
   1844 	search->cursor = NULL;
   1845 	return 0;
   1846 }
   1847 
   1848 void ndb_search_profile_end(struct ndb_search *search)
   1849 {
   1850 	if (search->cursor)
   1851 		mdb_cursor_close(search->cursor);
   1852 }
   1853 
   1854 int ndb_search_profile_next(struct ndb_search *search)
   1855 {
   1856 	int rc;
   1857 	MDB_val k, v;
   1858 	unsigned char *init_id;
   1859 
   1860 	init_id = search->key->id;
   1861 	k.mv_data = search->key;
   1862 	k.mv_size = sizeof(*search->key);
   1863 
   1864 retry:
   1865 	if ((rc = mdb_cursor_get(search->cursor, &k, &v, MDB_NEXT))) {
   1866 		ndb_debug("ndb_search_profile_next: %s\n",
   1867 				mdb_strerror(rc));
   1868 		return 0;
   1869 	} else {
   1870 		search->key = k.mv_data;
   1871 		assert(v.mv_size == 8);
   1872 		search->profile_key = *((uint64_t*)v.mv_data);
   1873 
   1874 		// skip duplicate pubkeys
   1875 		if (!memcmp(init_id, search->key->id, 32))
   1876 			goto retry;
   1877 	}
   1878 
   1879 	return 1;
   1880 }
   1881 
   1882 static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b)
   1883 {
   1884 	int cmp;
   1885 	struct ndb_search_key *ska, *skb;
   1886 
   1887 	ska = a->mv_data;
   1888 	skb = b->mv_data;
   1889 
   1890 	MDB_val a2 = *a;
   1891 	MDB_val b2 = *b;
   1892 
   1893 	a2.mv_data = ska->search;
   1894 	a2.mv_size = sizeof(ska->search) + sizeof(ska->id);
   1895 
   1896 	cmp = mdb_cmp_memn(&a2, &b2);
   1897 	if (cmp) return cmp;
   1898 
   1899 	if (ska->timestamp < skb->timestamp)
   1900 		return -1;
   1901 	else if (ska->timestamp > skb->timestamp)
   1902 		return 1;
   1903 
   1904 	return 0;
   1905 }
   1906 
   1907 static int ndb_write_profile_pk_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key)
   1908 	
   1909 {
   1910 	MDB_val key, val;
   1911 	int rc;
   1912 	struct ndb_tsid tsid;
   1913 	MDB_dbi pk_db;
   1914 
   1915 	pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK];
   1916 
   1917 	// write profile_pk + created_at index
   1918 	ndb_tsid_init(&tsid, note->pubkey, note->created_at);
   1919 
   1920 	key.mv_data = &tsid;
   1921 	key.mv_size = sizeof(tsid);
   1922 	val.mv_data = &profile_key;
   1923 	val.mv_size = sizeof(profile_key);
   1924 
   1925 	if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) {
   1926 		ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n",
   1927 				profile_key, mdb_strerror(rc));
   1928 		return 0;
   1929 	}
   1930 
   1931 	return 1;
   1932 }
   1933 
   1934 static int ndb_write_profile(struct ndb_txn *txn,
   1935 			     struct ndb_writer_profile *profile,
   1936 			     uint64_t note_key)
   1937 {
   1938 	uint64_t profile_key;
   1939 	struct ndb_note *note;
   1940 	void *flatbuf;
   1941 	size_t flatbuf_len;
   1942 	int rc;
   1943 
   1944 	MDB_val key, val;
   1945 	MDB_dbi profile_db;
   1946 	
   1947 	note = profile->note.note;
   1948 
   1949 	// add note_key to profile record
   1950 	NdbProfileRecord_note_key_add(profile->record.builder, note_key);
   1951 	NdbProfileRecord_end_as_root(profile->record.builder);
   1952 
   1953 	flatbuf = profile->record.flatbuf =
   1954 		flatcc_builder_finalize_aligned_buffer(profile->record.builder, &flatbuf_len);
   1955 
   1956 	assert(((uint64_t)flatbuf % 8) == 0);
   1957 
   1958 	// TODO: this may not be safe!?
   1959 	flatbuf_len = (flatbuf_len + 7) & ~7;
   1960 
   1961 	//assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0);
   1962 
   1963 	// get dbs
   1964 	profile_db = txn->lmdb->dbs[NDB_DB_PROFILE];
   1965 
   1966 	// get new key
   1967 	profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1;
   1968 
   1969 	// write profile to profile store
   1970 	key.mv_data = &profile_key;
   1971 	key.mv_size = sizeof(profile_key);
   1972 	val.mv_data = flatbuf;
   1973 	val.mv_size = flatbuf_len;
   1974 
   1975 	if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) {
   1976 		ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc));
   1977 		return 0;
   1978 	}
   1979 
   1980 	// write last fetched record
   1981 	if (!ndb_maybe_write_last_profile_fetch(txn, note)) {
   1982 		ndb_debug("failed to write last profile fetched record\n");
   1983 		return 0;
   1984 	}
   1985 
   1986 	// write profile pubkey index
   1987 	if (!ndb_write_profile_pk_index(txn, note, profile_key)) {
   1988 		ndb_debug("failed to write profile pubkey index\n");
   1989 		return 0;
   1990 	}
   1991 
   1992 	// write name, display_name profile search indices
   1993 	if (!ndb_write_profile_search_indices(txn, note, profile_key,
   1994 					      flatbuf)) {
   1995 		ndb_debug("failed to write profile search indices\n");
   1996 		return 0;
   1997 	}
   1998 
   1999 	return 1;
   2000 }
   2001 
   2002 // find the last id tag in a note (e, p, etc)
   2003 static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type)
   2004 {
   2005 	unsigned char *last = NULL;
   2006 	struct ndb_iterator iter;
   2007 	struct ndb_str str;
   2008 
   2009 	// get the liked event id (last id)
   2010 	ndb_tags_iterate_start(note, &iter);
   2011 
   2012 	while (ndb_tags_iterate_next(&iter)) {
   2013 		if (iter.tag->count < 2)
   2014 			continue;
   2015 
   2016 		str = ndb_tag_str(note, iter.tag, 0);
   2017 
   2018 		// assign liked to the last e tag
   2019 		if (str.flag == NDB_PACKED_STR && str.str[0] == type) {
   2020 			str = ndb_tag_str(note, iter.tag, 1);
   2021 			if (str.flag == NDB_PACKED_ID)
   2022 				last = str.id;
   2023 		}
   2024 	}
   2025 
   2026 	return last;
   2027 }
   2028 
   2029 void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len)
   2030 {
   2031 	MDB_val k, v;
   2032 
   2033 	k.mv_data = (unsigned char*)id;
   2034 	k.mv_size = 32;
   2035 
   2036 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) {
   2037 		ndb_debug("ndb_get_note_meta: mdb_get note failed\n");
   2038 		return NULL;
   2039 	}
   2040 
   2041 	if (len)
   2042 		*len = v.mv_size;
   2043 
   2044 	return v.mv_data;
   2045 }
   2046 
   2047 // When receiving a reaction note, look for the liked id and increase the
   2048 // reaction counter in the note metadata database
   2049 static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note)
   2050 {
   2051 	size_t len;
   2052 	void *root;
   2053 	int reactions, rc;
   2054 	MDB_val key, val;
   2055 	NdbEventMeta_table_t meta;
   2056 	unsigned char *liked = ndb_note_last_id_tag(note, 'e');
   2057 
   2058 	if (liked == NULL)
   2059 		return 0;
   2060 
   2061 	root = ndb_get_note_meta(txn, liked, &len);
   2062 
   2063 	flatcc_builder_t builder;
   2064 	flatcc_builder_init(&builder);
   2065 	NdbEventMeta_start_as_root(&builder);
   2066 
   2067 	// no meta record, let's make one
   2068 	if (root == NULL) {
   2069 		NdbEventMeta_reactions_add(&builder, 1);
   2070 	} else {
   2071 		// clone existing and add to it
   2072 		meta = NdbEventMeta_as_root(root);
   2073 	
   2074 		reactions = NdbEventMeta_reactions_get(meta);
   2075 		NdbEventMeta_clone(&builder, meta);
   2076 		NdbEventMeta_reactions_add(&builder, reactions + 1);
   2077 	}
   2078 
   2079 	NdbProfileRecord_end_as_root(&builder);
   2080 	root = flatcc_builder_finalize_aligned_buffer(&builder, &len);
   2081 	assert(((uint64_t)root % 8) == 0);
   2082 
   2083 	if (root == NULL) {
   2084 		ndb_debug("failed to create note metadata record\n");
   2085 		return 0;
   2086 	}
   2087 
   2088 	// metadata is keyed on id because we want to collect stats regardless
   2089 	// if we have the note yet or not
   2090 	key.mv_data = liked;
   2091 	key.mv_size = 32;
   2092 	
   2093 	val.mv_data = root;
   2094 	val.mv_size = len;
   2095 
   2096 	// write the new meta record
   2097 	//ndb_debug("writing stats record for ");
   2098 	//print_hex(liked, 32);
   2099 	//ndb_debug("\n");
   2100 
   2101 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) {
   2102 		ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc));
   2103 		return 0;
   2104 	}
   2105 
   2106 	free(root);
   2107 
   2108 	return 1;
   2109 }
   2110 
   2111 
   2112 static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note,
   2113 				   uint64_t note_key)
   2114 	
   2115 {
   2116 	struct ndb_tsid tsid;
   2117 	int rc;
   2118 	MDB_val key, val;
   2119 	MDB_dbi id_db;
   2120 
   2121 	ndb_tsid_init(&tsid, note->id, note->created_at);
   2122 
   2123 	key.mv_data = &tsid;
   2124 	key.mv_size = sizeof(tsid);
   2125 	val.mv_data = &note_key;
   2126 	val.mv_size = sizeof(note_key);
   2127 
   2128 	id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
   2129 
   2130 	if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) {
   2131 		ndb_debug("write note id index to db failed: %s\n",
   2132 				mdb_strerror(rc));
   2133 		return 0;
   2134 	}
   2135 
   2136 	return 1;
   2137 }
   2138 
   2139 static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note,
   2140 				     uint64_t note_key)
   2141 {
   2142 	struct ndb_u64_tsid tsid;
   2143 	int rc;
   2144 	MDB_val key, val;
   2145 	MDB_dbi kind_db;
   2146 
   2147 	ndb_u64_tsid_init(&tsid, note->kind, note->created_at);
   2148 
   2149 	key.mv_data = &tsid;
   2150 	key.mv_size = sizeof(tsid);
   2151 	val.mv_data = &note_key;
   2152 	val.mv_size = sizeof(note_key);
   2153 
   2154 	kind_db = txn->lmdb->dbs[NDB_DB_NOTE_KIND];
   2155 
   2156 	if ((rc = mdb_put(txn->mdb_txn, kind_db, &key, &val, 0))) {
   2157 		ndb_debug("write note kind index to db failed: %s\n",
   2158 				mdb_strerror(rc));
   2159 		return 0;
   2160 	}
   2161 
   2162 	return 1;
   2163 }
   2164 
   2165 static int ndb_write_word_to_index(struct ndb_txn *txn, const char *word,
   2166 				   int word_len, int word_index,
   2167 				   uint64_t timestamp, uint64_t note_id)
   2168 {
   2169 	// cap to some reasonable key size
   2170 	unsigned char buffer[1024];
   2171 	int keysize, rc;
   2172 	MDB_val k, v;
   2173 	MDB_dbi text_db;
   2174 
   2175 	// build our compressed text index key
   2176 	if (!ndb_make_text_search_key(buffer, sizeof(buffer), word_index,
   2177 				      word_len, word, timestamp, note_id,
   2178 				      &keysize)) {
   2179 		// probably too big
   2180 
   2181 		return 0;
   2182 	}
   2183 
   2184 	k.mv_data = buffer;
   2185 	k.mv_size = keysize;
   2186 
   2187 	v.mv_data = NULL;
   2188 	v.mv_size = 0;
   2189 
   2190 	text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT];
   2191 
   2192 	if ((rc = mdb_put(txn->mdb_txn, text_db, &k, &v, 0))) {
   2193 		ndb_debug("write note text index to db failed: %s\n",
   2194 				mdb_strerror(rc));
   2195 		return 0;
   2196 	}
   2197 
   2198 	return 1;
   2199 }
   2200 
   2201 
   2202 
   2203 // break a string into individual words for querying or for building the
   2204 // fulltext search index. This is callback based so we don't need to
   2205 // build up an intermediate structure
   2206 static int ndb_parse_words(struct cursor *cur, void *ctx, ndb_word_parser_fn fn)
   2207 {
   2208 	int word_len, words;
   2209 	const char *word;
   2210 
   2211 	words = 0;
   2212 
   2213 	while (cur->p < cur->end) {
   2214 		consume_whitespace_or_punctuation(cur);
   2215 		if (cur->p >= cur->end)
   2216 			break;
   2217 		word = (const char *)cur->p;
   2218 
   2219 		if (!consume_until_boundary(cur))
   2220 			break;
   2221 
   2222 		// start of word or end
   2223 		word_len = cur->p - (unsigned char *)word;
   2224 		if (word_len == 0 && cur->p >= cur->end)
   2225 			break;
   2226 
   2227 		if (word_len == 0) {
   2228 			if (!cursor_skip(cur, 1))
   2229 				break;
   2230 			continue;
   2231 		}
   2232 
   2233 		//ndb_debug("writing word index '%.*s'\n", word_len, word);
   2234 
   2235 		if (!fn(ctx, word, word_len, words))
   2236 			continue;
   2237 
   2238 		words++;
   2239 	}
   2240 
   2241 	return 1;
   2242 }
   2243 
   2244 struct ndb_word_writer_ctx
   2245 {
   2246 	struct ndb_txn *txn;
   2247 	struct ndb_note *note;
   2248 	uint64_t note_id;
   2249 };
   2250 
   2251 static int ndb_fulltext_word_writer(void *ctx,
   2252 		const char *word, int word_len, int words)
   2253 {
   2254 	struct ndb_word_writer_ctx *wctx = ctx;
   2255 
   2256 	if (!ndb_write_word_to_index(wctx->txn, word, word_len, words,
   2257 				     wctx->note->created_at, wctx->note_id)) {
   2258 		// too big to write this one, just skip it
   2259 		ndb_debug("failed to write word '%.*s' to index\n", word_len, word);
   2260 
   2261 		return 0;
   2262 	}
   2263 
   2264 	//fprintf(stderr, "wrote '%.*s' to note text index\n", word_len, word);
   2265 	return 1;
   2266 }
   2267 
   2268 static int ndb_write_note_fulltext_index(struct ndb_txn *txn,
   2269 					 struct ndb_note *note,
   2270 					 uint64_t note_id)
   2271 {
   2272 	struct cursor cur;
   2273 	unsigned char *content;
   2274 	struct ndb_str str;
   2275 	struct ndb_word_writer_ctx ctx;
   2276 
   2277 	str = ndb_note_str(note, &note->content);
   2278 	// I don't think this should happen?
   2279 	if (unlikely(str.flag == NDB_PACKED_ID))
   2280 		return 0;
   2281 
   2282 	content = (unsigned char *)str.str;
   2283 
   2284 	make_cursor(content, content + note->content_length, &cur);
   2285 
   2286 	ctx.txn = txn;
   2287 	ctx.note = note;
   2288 	ctx.note_id = note_id;
   2289 
   2290 	ndb_parse_words(&cur, &ctx, ndb_fulltext_word_writer);
   2291 
   2292 	return 1;
   2293 }
   2294 
   2295 static int ndb_parse_search_words(void *ctx, const char *word_str, int word_len, int word_index)
   2296 {
   2297 	(void)word_index;
   2298 	struct ndb_search_words *words = ctx;
   2299 	struct ndb_word *word;
   2300 
   2301 	if (words->num_words + 1 > MAX_TEXT_SEARCH_WORDS)
   2302 		return 0;
   2303 
   2304 	word = &words->words[words->num_words++];
   2305 	word->word = word_str;
   2306 	word->word_len = word_len;
   2307 
   2308 	return 1;
   2309 }
   2310 
   2311 static void ndb_search_words_init(struct ndb_search_words *words)
   2312 {
   2313 	words->num_words = 0;
   2314 }
   2315 
   2316 static int prefix_count(const char *str1, int len1, const char *str2, int len2) {
   2317 	int i, count = 0;
   2318 	int min_len = len1 < len2 ? len1 : len2;
   2319 
   2320 	for (i = 0; i < min_len; i++) {
   2321 		// case insensitive
   2322 		if (tolower(str1[i]) == tolower(str2[i]))
   2323 			count++;
   2324 		else
   2325 			break;
   2326 	}
   2327 
   2328 	return count;
   2329 }
   2330 
   2331 static void ndb_print_text_search_key(struct ndb_text_search_key *key)
   2332 {
   2333 	printf("K<'%.*s' %d %" PRIu64 " note_id:%" PRIu64 ">", key->str_len, key->str,
   2334 						    key->word_index,
   2335 						    key->timestamp,
   2336 						    key->note_id);
   2337 }
   2338 
   2339 static int ndb_prefix_matches(struct ndb_text_search_result *result,
   2340 			      struct ndb_word *search_word)
   2341 {
   2342 	// Empty strings shouldn't happen but let's
   2343 	if (result->key.str_len < 2 || search_word->word_len < 2)
   2344 		return 0;
   2345 
   2346 	// make sure we at least have two matching prefix characters. exact
   2347 	// matches are nice but range searches allow us to match prefixes as
   2348 	// well. A double-char prefix is suffient, but maybe we could up this
   2349 	// in the future.
   2350 	// 
   2351 	// TODO: How are we handling utf-8 prefix matches like
   2352 	// japanese?
   2353 	//
   2354 	if (   result->key.str[0] != tolower(search_word->word[0])
   2355 	    && result->key.str[1] != tolower(search_word->word[1])
   2356 	    )
   2357 		return 0;
   2358 
   2359 	// count the number of prefix-matched characters. This will be used
   2360 	// for ranking search results
   2361 	result->prefix_chars = prefix_count(result->key.str,
   2362 					    result->key.str_len,
   2363 					    search_word->word,
   2364 					    search_word->word_len);
   2365 
   2366 	if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5)) 
   2367 		return 0;
   2368 
   2369 	return 1;
   2370 }
   2371 
   2372 // This is called when scanning the full text search index. Scanning stops
   2373 // when we no longer have a prefix match for the word
   2374 static int ndb_text_search_next_word(MDB_cursor *cursor, MDB_cursor_op op,
   2375 	MDB_val *k, struct ndb_word *search_word,
   2376 	struct ndb_text_search_result *last_result,
   2377 	struct ndb_text_search_result *result,
   2378 	MDB_cursor_op order_op)
   2379 {
   2380 	struct cursor key_cursor;
   2381 	//struct ndb_text_search_key search_key;
   2382 	MDB_val v;
   2383 	int retries;
   2384 	retries = -1;
   2385 
   2386 	make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor);
   2387 
   2388 	// When op is MDB_SET_RANGE, this initializes the search. Position
   2389 	// the cursor at the next key greater than or equal to the specified
   2390 	// key.
   2391 	//
   2392 	// Subsequent searches should use MDB_NEXT
   2393 	if (mdb_cursor_get(cursor, k, &v, op)) {
   2394 		// we should only do this if we're going in reverse
   2395 		if (op == MDB_SET_RANGE && order_op == MDB_PREV) {
   2396 			// if set range worked and our key exists, it should be
   2397 			// the one right before this one
   2398 			if (mdb_cursor_get(cursor, k, &v, MDB_PREV))
   2399 				return 0;
   2400 		} else {
   2401 			return 0;
   2402 		}
   2403 	}
   2404 
   2405 retry:
   2406 	retries++;
   2407 	/*
   2408 	printf("continuing from ");
   2409 	if (ndb_unpack_text_search_key(k->mv_data, k->mv_size, &search_key)) {
   2410 		ndb_print_text_search_key(&search_key);
   2411 	} else { printf("??"); }
   2412 	printf("\n");
   2413 	*/
   2414 
   2415 	make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor);
   2416 
   2417 	if (unlikely(!ndb_unpack_text_search_key_noteid(&key_cursor, &result->key.note_id))) {
   2418 		fprintf(stderr, "UNUSUAL: failed to unpack text search key note_id\n");
   2419 		return 0;
   2420 	}
   2421 
   2422 	if (last_result) {
   2423 		if (last_result->key.note_id != result->key.note_id)
   2424 			return 0;
   2425 	}
   2426 
   2427 	// On success, this could still be not related at all.
   2428 	// It could just be adjacent to the word. Let's check
   2429 	// if we have a matching prefix at least.
   2430 
   2431 	// Before we unpack the entire key, let's quickly
   2432 	// unpack just the string to check the prefix. We don't
   2433 	// need to unpack the entire key if the prefix doesn't
   2434 	// match
   2435 	if (!ndb_unpack_text_search_key_string(&key_cursor,
   2436 					       &result->key.str,
   2437 					       &result->key.str_len)) {
   2438 		// this should never happen
   2439 		fprintf(stderr, "UNUSUAL: failed to unpack text search key string\n");
   2440 		return 0;
   2441 	}
   2442 
   2443 	if (!ndb_prefix_matches(result, search_word)) {
   2444 		/*
   2445 		printf("result prefix '%.*s' didn't match search word '%.*s'\n",
   2446 			result->key.str_len, result->key.str,
   2447 			search_word->word_len, search_word->word);
   2448 			*/
   2449 		// we should only do this if we're going in reverse
   2450 		if (retries == 0 && op == MDB_SET_RANGE && order_op == MDB_PREV) {
   2451 			// if set range worked and our key exists, it should be
   2452 			// the one right before this one
   2453 			mdb_cursor_get(cursor, k, &v, MDB_PREV);
   2454 			goto retry;
   2455 		} else {
   2456 			return 0;
   2457 		}
   2458 	}
   2459 
   2460 	// Unpack the remaining text search key, we will need this information
   2461 	// when building up our search results.
   2462 	if (!ndb_unpack_remaining_text_search_key(&key_cursor, &result->key)) {
   2463 		// This should never happen
   2464 		fprintf(stderr, "UNUSUAL: failed to unpack text search key\n");
   2465 		return 0;
   2466 	}
   2467 
   2468 	if (last_result) {
   2469 		if (result->key.word_index < last_result->key.word_index) {
   2470 			/*
   2471 			fprintf(stderr, "skipping '%.*s' because it is before last result '%.*s'\n",
   2472 					result->key.str_len, result->key.str,
   2473 					last_result->key.str_len, last_result->key.str);
   2474 					*/
   2475 			return 0;
   2476 		}
   2477 	}
   2478 
   2479 	return 1;
   2480 }
   2481 
   2482 static void ndb_text_search_results_init(
   2483 		struct ndb_text_search_results *results) {
   2484 	results->num_results = 0;
   2485 }
   2486 
   2487 void ndb_default_text_search_config(struct ndb_text_search_config *cfg)
   2488 {
   2489 	cfg->order = NDB_ORDER_DESCENDING;
   2490 	cfg->limit = MAX_TEXT_SEARCH_RESULTS;
   2491 }
   2492 
   2493 void ndb_text_search_config_set_order(struct ndb_text_search_config *cfg,
   2494 				     enum ndb_search_order order)
   2495 {
   2496 	cfg->order = order;
   2497 }
   2498 
   2499 void ndb_text_search_config_set_limit(struct ndb_text_search_config *cfg, int limit)
   2500 {
   2501 	cfg->limit = limit;
   2502 }
   2503 
   2504 int ndb_text_search(struct ndb_txn *txn, const char *query,
   2505 		    struct ndb_text_search_results *results,
   2506 		    struct ndb_text_search_config *config)
   2507 {
   2508 	unsigned char buffer[1024], *buf;
   2509 	unsigned char saved_buf[1024], *saved;
   2510 	struct ndb_text_search_result *result, *last_result;
   2511 	struct ndb_text_search_result candidate, last_candidate;
   2512 	struct ndb_search_words search_words;
   2513 	//struct ndb_text_search_key search_key;
   2514 	struct ndb_word *search_word;
   2515 	struct cursor cur;
   2516 	ndb_text_search_key_order_fn key_order_fn;
   2517 	MDB_dbi text_db;
   2518 	MDB_cursor *cursor;
   2519 	MDB_val k, v;
   2520 	int i, j, keysize, saved_size, limit;
   2521 	MDB_cursor_op op, order_op;
   2522 	//int num_note_ids;
   2523 
   2524 	saved = NULL;
   2525 	ndb_text_search_results_init(results);
   2526 	ndb_search_words_init(&search_words);
   2527 
   2528 	// search config
   2529 	limit = MAX_TEXT_SEARCH_RESULTS;
   2530 	order_op = MDB_PREV;
   2531 	key_order_fn = ndb_make_text_search_key_high;
   2532 	if (config) {
   2533 		if (config->order == NDB_ORDER_ASCENDING) {
   2534 			order_op = MDB_NEXT;
   2535 			key_order_fn = ndb_make_text_search_key_low;
   2536 		}
   2537 		limit = min(limit, config->limit);
   2538 	}
   2539 	// end search config
   2540 	
   2541 	text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT];
   2542 	make_cursor((unsigned char *)query, (unsigned char *)query + strlen(query), &cur);
   2543 
   2544 	ndb_parse_words(&cur, &search_words, ndb_parse_search_words);
   2545 	if (search_words.num_words == 0)
   2546 		return 0;
   2547 
   2548 	if ((i = mdb_cursor_open(txn->mdb_txn, text_db, &cursor))) {
   2549 		fprintf(stderr, "nd_text_search: mdb_cursor_open failed, error %d\n", i);
   2550 		return 0;
   2551 	}
   2552 
   2553 	// for each word, we recursively find all of the submatches
   2554 	while (results->num_results < limit) {
   2555 		last_result = NULL;
   2556 		result = &results->results[results->num_results];
   2557 
   2558 		// if we have saved, then we continue from the last root search
   2559 		// sequence
   2560 		if (saved) {
   2561 			buf = saved_buf;
   2562 			saved = NULL;
   2563 			keysize = saved_size;
   2564 
   2565 			k.mv_data = buf;
   2566 			k.mv_size = saved_size;
   2567 
   2568 			// reposition the cursor so we can continue
   2569 			if (mdb_cursor_get(cursor, &k, &v, MDB_SET_RANGE))
   2570 				break;
   2571 
   2572 			op = order_op;
   2573 		} else {
   2574 			// construct a packed fulltext search key using this
   2575 			// word this key doesn't contain any timestamp or index
   2576 			// info, so it should range match instead of exact
   2577 			// match
   2578 			if (!key_order_fn(buffer, sizeof(buffer),
   2579 					  search_words.words[0].word_len,
   2580 					  search_words.words[0].word, &keysize))
   2581 			{
   2582 				// word is too big to fit in 1024-sized key
   2583 				continue;
   2584 			}
   2585 
   2586 			buf = buffer;
   2587 			op = MDB_SET_RANGE;
   2588 		}
   2589 
   2590 		for (j = 0; j < search_words.num_words; j++) {
   2591 			search_word = &search_words.words[j];
   2592 
   2593 			// shouldn't happen but let's be defensive a bit
   2594 			if (search_word->word_len == 0)
   2595 				continue;
   2596 
   2597 			// if we already matched a note in this phrase, make
   2598 			// sure we're including the note id in the query
   2599 			if (last_result) {
   2600 				// we are narrowing down a search.
   2601 				// if we already have this note id, just continue
   2602 				for (i = 0; i < results->num_results; i++) {
   2603 					if (results->results[i].key.note_id == last_result->key.note_id)
   2604 						goto cont;
   2605 				}
   2606 
   2607 				if (!ndb_make_noted_text_search_key(
   2608 					buffer, sizeof(buffer),
   2609 					search_word->word_len,
   2610 					search_word->word,
   2611 					last_result->key.timestamp,
   2612 					last_result->key.note_id,
   2613 					&keysize))
   2614 				{
   2615 					continue;
   2616 				}
   2617 
   2618 				buf = buffer;
   2619 			}
   2620 
   2621 			k.mv_data = buf;
   2622 			k.mv_size = keysize;
   2623 
   2624 			if (!ndb_text_search_next_word(cursor, op, &k,
   2625 						       search_word,
   2626 						       last_result,
   2627 						       &candidate,
   2628 						       order_op)) {
   2629 				break;
   2630 			}
   2631 
   2632 			*result = candidate;
   2633 			op = MDB_SET_RANGE;
   2634 
   2635 			// save the first key match, since we will continue from
   2636 			// this on the next root word result
   2637 			if (j == 0 && !saved) {
   2638 				memcpy(saved_buf, k.mv_data, k.mv_size);
   2639 				saved = saved_buf;
   2640 				saved_size = k.mv_size;
   2641 			}
   2642 
   2643 			last_candidate = *result;
   2644 			last_result = &last_candidate;
   2645 		}
   2646 
   2647 cont:
   2648 		// we matched all of the queries!
   2649 		if (j == search_words.num_words) {
   2650 			results->num_results++;
   2651 		} else if (j == 0) {
   2652 			break;
   2653 		}
   2654 
   2655 	}
   2656 
   2657 	mdb_cursor_close(cursor);
   2658 
   2659 	return 1;
   2660 }
   2661 
   2662 static uint64_t ndb_write_note(struct ndb_txn *txn,
   2663 			       struct ndb_writer_note *note)
   2664 {
   2665 	int rc;
   2666 	uint64_t note_key;
   2667 	MDB_dbi note_db;
   2668 	MDB_val key, val;
   2669 
   2670 	// let's quickly sanity check if we already have this note
   2671 	if (ndb_get_notekey_by_id(txn, note->note->id))
   2672 		return 0;
   2673 	
   2674 	// get dbs
   2675 	note_db = txn->lmdb->dbs[NDB_DB_NOTE];
   2676 
   2677 	// get new key
   2678 	note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1;
   2679 
   2680 	// write note to event store
   2681 	key.mv_data = &note_key;
   2682 	key.mv_size = sizeof(note_key);
   2683 	val.mv_data = note->note;
   2684 	val.mv_size = note->note_len;
   2685 
   2686 	if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) {
   2687 		ndb_debug("write note to db failed: %s\n", mdb_strerror(rc));
   2688 		return 0;
   2689 	}
   2690 
   2691 	// write id index key clustered with created_at
   2692 	if (!ndb_write_note_id_index(txn, note->note, note_key))
   2693 		return 0;
   2694 
   2695 	// write note kind index
   2696 	if (!ndb_write_note_kind_index(txn, note->note, note_key))
   2697 		return 0;
   2698 
   2699 	// only do fulltext index on text and longform notes
   2700 	if (note->note->kind == 1 || note->note->kind == 30023) {
   2701 		if (!ndb_write_note_fulltext_index(txn, note->note, note_key))
   2702 			return 0;
   2703 	}
   2704 
   2705 	if (note->note->kind == 7) {
   2706 		ndb_write_reaction_stats(txn, note->note);
   2707 	}
   2708 
   2709 	return note_key;
   2710 }
   2711 
   2712 // only to be called from the writer thread
   2713 static void ndb_write_version(struct ndb_txn *txn, uint64_t version)
   2714 {
   2715 	int rc;
   2716 	MDB_val key, val;
   2717 	uint64_t version_key;
   2718 
   2719 	version_key = NDB_META_KEY_VERSION;
   2720 	
   2721 	key.mv_data = &version_key;
   2722 	key.mv_size = sizeof(version_key);
   2723 	val.mv_data = &version;
   2724 	val.mv_size = sizeof(version);
   2725 
   2726 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) {
   2727 		ndb_debug("write version to ndb_meta failed: %s\n",
   2728 				mdb_strerror(rc));
   2729 		return;
   2730 	}
   2731 
   2732 	//fprintf(stderr, "writing version %" PRIu64 "\n", version);
   2733 }
   2734 
   2735 static void *ndb_writer_thread(void *data)
   2736 {
   2737 	struct ndb_writer *writer = data;
   2738 	struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
   2739 	int i, popped, done, any_note;
   2740 	uint64_t note_nkey;
   2741 	MDB_txn *mdb_txn = NULL;
   2742 	struct ndb_txn txn;
   2743 	ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn);
   2744 
   2745 	done = 0;
   2746 	while (!done) {
   2747 		txn.mdb_txn = NULL;
   2748 		popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
   2749 		//ndb_debug("writer popped %d items\n", popped);
   2750 
   2751 		any_note = 0;
   2752 		for (i = 0 ; i < popped; i++) {
   2753 			msg = &msgs[i];
   2754 			switch (msg->type) {
   2755 			case NDB_WRITER_NOTE: any_note = 1; break;
   2756 			case NDB_WRITER_PROFILE: any_note = 1; break;
   2757 			case NDB_WRITER_DBMETA: any_note = 1; break;
   2758 			case NDB_WRITER_PROFILE_LAST_FETCH: any_note = 1; break;
   2759 			case NDB_WRITER_QUIT: break;
   2760 			}
   2761 		}
   2762 
   2763 		if (any_note && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn))
   2764 		{
   2765 			fprintf(stderr, "writer thread txn_begin failed");
   2766 			// should definitely not happen unless DB is full
   2767 			// or something ?
   2768 			assert(false);
   2769 		}
   2770 
   2771 		for (i = 0; i < popped; i++) {
   2772 			msg = &msgs[i];
   2773 
   2774 			switch (msg->type) {
   2775 			case NDB_WRITER_QUIT:
   2776 				// quits are handled before this
   2777 				done = 1;
   2778 				continue;
   2779 			case NDB_WRITER_PROFILE:
   2780 				note_nkey = 
   2781 					ndb_write_note(&txn, &msg->note);
   2782 				if (msg->profile.record.builder) {
   2783 					// only write if parsing didn't fail
   2784 					ndb_write_profile(&txn, &msg->profile,
   2785 							  note_nkey);
   2786 				}
   2787 				break;
   2788 			case NDB_WRITER_NOTE:
   2789 				ndb_write_note(&txn, &msg->note);
   2790 				//printf("wrote note ");
   2791 				//print_hex(msg->note.note->id, 32);
   2792 				//printf("\n");
   2793 				break;
   2794 			case NDB_WRITER_DBMETA:
   2795 				ndb_write_version(&txn, msg->ndb_meta.version);
   2796 				break;
   2797 			case NDB_WRITER_PROFILE_LAST_FETCH:
   2798 				ndb_writer_last_profile_fetch(&txn,
   2799 						msg->last_fetch.pubkey,
   2800 						msg->last_fetch.fetched_at
   2801 						);
   2802 				break;
   2803 			}
   2804 		}
   2805 
   2806 		// commit writes
   2807 		if (any_note && !ndb_end_query(&txn)) {
   2808 			fprintf(stderr, "writer thread txn commit failed");
   2809 			assert(false);
   2810 		}
   2811 
   2812 		// free notes
   2813 		for (i = 0; i < popped; i++) {
   2814 			msg = &msgs[i];
   2815 			if (msg->type == NDB_WRITER_NOTE)
   2816 				free(msg->note.note);
   2817 			else if (msg->type == NDB_WRITER_PROFILE) {
   2818 				free(msg->profile.note.note);
   2819 				ndb_profile_record_builder_free(&msg->profile.record);
   2820 			}
   2821 		}
   2822 	}
   2823 
   2824 	ndb_debug("quitting writer thread\n");
   2825 	return NULL;
   2826 }
   2827 
   2828 static void *ndb_ingester_thread(void *data)
   2829 {
   2830 	secp256k1_context *ctx;
   2831 	struct thread *thread = data;
   2832 	struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx;
   2833 	struct ndb_lmdb *lmdb = ingester->writer->lmdb;
   2834 	struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg;
   2835 	struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out;
   2836 	int i, to_write, popped, done, any_event;
   2837 	MDB_txn *read_txn = NULL;
   2838 	int rc;
   2839 
   2840 	ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
   2841 	ndb_debug("started ingester thread\n");
   2842 
   2843 	done = 0;
   2844 	while (!done) {
   2845 		to_write = 0;
   2846 		any_event = 0;
   2847 
   2848 		popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH);
   2849 		//ndb_debug("ingester popped %d items\n", popped);
   2850 
   2851 		for (i = 0; i < popped; i++) {
   2852 			msg = &msgs[i];
   2853 			if (msg->type == NDB_INGEST_EVENT) {
   2854 				any_event = 1;
   2855 				break;
   2856 			}
   2857 		}
   2858 
   2859 		if (any_event && (rc = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn))) {
   2860 			// this is bad
   2861 			fprintf(stderr, "UNUSUAL ndb_ingester: mdb_txn_begin failed: '%s'\n",
   2862 					mdb_strerror(rc));
   2863 			continue;
   2864 		}
   2865 
   2866 		for (i = 0; i < popped; i++) {
   2867 			msg = &msgs[i];
   2868 			switch (msg->type) {
   2869 			case NDB_INGEST_QUIT:
   2870 				done = 1;
   2871 				break;
   2872 
   2873 			case NDB_INGEST_EVENT:
   2874 				out = &outs[to_write];
   2875 				if (ndb_ingester_process_event(ctx, ingester,
   2876 							       &msg->event, out,
   2877 							       read_txn)) {
   2878 					to_write++;
   2879 				}
   2880 			}
   2881 		}
   2882 
   2883 		if (any_event)
   2884 			mdb_txn_abort(read_txn);
   2885 
   2886 		if (to_write > 0) {
   2887 			//ndb_debug("pushing %d events to write queue\n", to_write); 
   2888 			if (!ndb_writer_queue_msgs(ingester->writer, outs, to_write)) {
   2889 				ndb_debug("failed pushing %d events to write queue\n", to_write); 
   2890 			}
   2891 		}
   2892 	}
   2893 
   2894 	ndb_debug("quitting ingester thread\n");
   2895 	secp256k1_context_destroy(ctx);
   2896 	return NULL;
   2897 }
   2898 
   2899 
   2900 static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb)
   2901 {
   2902 	writer->lmdb = lmdb;
   2903 	writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE;
   2904 	writer->queue_buf = malloc(writer->queue_buflen);
   2905 	if (writer->queue_buf == NULL) {
   2906 		fprintf(stderr, "ndb: failed to allocate space for writer queue");
   2907 		return 0;
   2908 	}
   2909 
   2910 	// init the writer queue.
   2911 	prot_queue_init(&writer->inbox, writer->queue_buf,
   2912 			writer->queue_buflen, sizeof(struct ndb_writer_msg));
   2913 
   2914 	// spin up the writer thread
   2915 	if (pthread_create(&writer->thread_id, NULL, ndb_writer_thread, writer))
   2916 	{
   2917 		fprintf(stderr, "ndb writer thread failed to create\n");
   2918 		return 0;
   2919 	}
   2920 	
   2921 	return 1;
   2922 }
   2923 
   2924 // initialize the ingester queue and then spawn the thread
   2925 static int ndb_ingester_init(struct ndb_ingester *ingester,
   2926 			     struct ndb_writer *writer,
   2927 			     struct ndb_config *config)
   2928 {
   2929 	int elem_size, num_elems;
   2930 	static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT };
   2931 
   2932 	// TODO: configurable queue sizes
   2933 	elem_size = sizeof(struct ndb_ingester_msg);
   2934 	num_elems = DEFAULT_QUEUE_SIZE;
   2935 
   2936 	ingester->writer = writer;
   2937 	ingester->flags = config->flags;
   2938 	ingester->filter = config->ingest_filter;
   2939 	ingester->filter_context = config->filter_context;
   2940 
   2941 	if (!threadpool_init(&ingester->tp, config->ingester_threads,
   2942 			     elem_size, num_elems, &quit_msg, ingester,
   2943 			     ndb_ingester_thread))
   2944 	{
   2945 		fprintf(stderr, "ndb ingester threadpool failed to init\n");
   2946 		return 0;
   2947 	}
   2948 
   2949 	return 1;
   2950 }
   2951 
   2952 static int ndb_writer_destroy(struct ndb_writer *writer)
   2953 {
   2954 	struct ndb_writer_msg msg;
   2955 
   2956 	// kill thread
   2957 	msg.type = NDB_WRITER_QUIT;
   2958 	if (!prot_queue_push(&writer->inbox, &msg)) {
   2959 		// queue is too full to push quit message. just kill it.
   2960 		pthread_exit(&writer->thread_id);
   2961 	} else {
   2962 		pthread_join(writer->thread_id, NULL);
   2963 	}
   2964 
   2965 	// cleanup
   2966 	prot_queue_destroy(&writer->inbox);
   2967 
   2968 	free(writer->queue_buf);
   2969 
   2970 	return 1;
   2971 }
   2972 
   2973 static int ndb_ingester_destroy(struct ndb_ingester *ingester)
   2974 {
   2975 	threadpool_destroy(&ingester->tp);
   2976 	return 1;
   2977 }
   2978 
   2979 static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
   2980 				    char *json, unsigned len, unsigned client)
   2981 {
   2982 	struct ndb_ingester_msg msg;
   2983 	msg.type = NDB_INGEST_EVENT;
   2984 
   2985 	msg.event.json = json;
   2986 	msg.event.len = len;
   2987 	msg.event.client = client;
   2988 
   2989 	return threadpool_dispatch(&ingester->tp, &msg);
   2990 }
   2991 
   2992 static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize)
   2993 {
   2994 	int rc;
   2995 	MDB_txn *txn;
   2996 
   2997 	if ((rc = mdb_env_create(&lmdb->env))) {
   2998 		fprintf(stderr, "mdb_env_create failed, error %d\n", rc);
   2999 		return 0;
   3000 	}
   3001 
   3002 	if ((rc = mdb_env_set_mapsize(lmdb->env, mapsize))) {
   3003 		fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc);
   3004 		return 0;
   3005 	}
   3006 
   3007 	if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) {
   3008 		fprintf(stderr, "mdb_env_set_maxdbs failed, error %d\n", rc);
   3009 		return 0;
   3010 	}
   3011 
   3012 	if ((rc = mdb_env_open(lmdb->env, filename, 0, 0664))) {
   3013 		fprintf(stderr, "mdb_env_open failed, error %d\n", rc);
   3014 		return 0;
   3015 	}
   3016 
   3017 	// Initialize DBs
   3018 	if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) {
   3019 		fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc);
   3020 		return 0;
   3021 	}
   3022 
   3023 	// note flatbuffer db
   3024 	if ((rc = mdb_dbi_open(txn, "note", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NOTE]))) {
   3025 		fprintf(stderr, "mdb_dbi_open event failed, error %d\n", rc);
   3026 		return 0;
   3027 	}
   3028 
   3029 	// note metadata db
   3030 	if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) {
   3031 		fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc);
   3032 		return 0;
   3033 	}
   3034 
   3035 	// profile flatbuffer db
   3036 	if ((rc = mdb_dbi_open(txn, "profile", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_PROFILE]))) {
   3037 		fprintf(stderr, "mdb_dbi_open profile failed, error %d\n", rc);
   3038 		return 0;
   3039 	}
   3040 
   3041 	// profile search db
   3042 	if ((rc = mdb_dbi_open(txn, "profile_search", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_SEARCH]))) {
   3043 		fprintf(stderr, "mdb_dbi_open profile_search failed, error %d\n", rc);
   3044 		return 0;
   3045 	}
   3046 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], ndb_search_key_cmp);
   3047 
   3048 	// ndb metadata (db version, etc)
   3049 	if ((rc = mdb_dbi_open(txn, "ndb_meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NDB_META]))) {
   3050 		fprintf(stderr, "mdb_dbi_open ndb_meta failed, error %d\n", rc);
   3051 		return 0;
   3052 	}
   3053 
   3054 	// profile last fetches
   3055 	if ((rc = mdb_dbi_open(txn, "profile_last_fetch", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH]))) {
   3056 		fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc);
   3057 		return 0;
   3058 	}
   3059 
   3060 	// id+ts index flags
   3061 	unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED;
   3062 
   3063 	// index dbs
   3064 	if ((rc = mdb_dbi_open(txn, "note_id", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_ID]))) {
   3065 		fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
   3066 		return 0;
   3067 	}
   3068 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare);
   3069 
   3070 	if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) {
   3071 		fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
   3072 		return 0;
   3073 	}
   3074 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare);
   3075 
   3076 	if ((rc = mdb_dbi_open(txn, "note_kind", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_KIND]))) {
   3077 		fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
   3078 		return 0;
   3079 	}
   3080 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_tsid_compare);
   3081 
   3082 	if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT,
   3083 			       &lmdb->dbs[NDB_DB_NOTE_TEXT]))) {
   3084 		fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
   3085 		return 0;
   3086 	}
   3087 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare);
   3088 
   3089 	// Commit the transaction
   3090 	if ((rc = mdb_txn_commit(txn))) {
   3091 		fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc);
   3092 		return 0;
   3093 	}
   3094 
   3095 	return 1;
   3096 }
   3097 
   3098 static int ndb_queue_write_version(struct ndb *ndb, uint64_t version)
   3099 {
   3100 	struct ndb_writer_msg msg;
   3101 	msg.type = NDB_WRITER_DBMETA;
   3102 	msg.ndb_meta.version = version;
   3103 	return ndb_writer_queue_msg(&ndb->writer, &msg);
   3104 }
   3105 
   3106 static int ndb_run_migrations(struct ndb *ndb)
   3107 {
   3108 	int64_t version, latest_version, i;
   3109 	
   3110 	latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]);
   3111 
   3112 	if ((version = ndb_db_version(ndb)) == -1) {
   3113 		ndb_debug("run_migrations: no version found, assuming new db\n");
   3114 		version = latest_version;
   3115 
   3116 		// no version found. fresh db?
   3117 		if (!ndb_queue_write_version(ndb, version)) {
   3118 			fprintf(stderr, "run_migrations: failed writing db version");
   3119 			return 0;
   3120 		}
   3121 
   3122 		return 1;
   3123 	} else {
   3124 		ndb_debug("ndb: version %" PRIu64 " found\n", version);
   3125 	}
   3126 
   3127 	if (version < latest_version)
   3128 		ndb_debug("nostrdb: migrating v%d -> v%d\n",
   3129 				(int)version, (int)latest_version);
   3130 
   3131 	for (i = version; i < latest_version; i++) {
   3132 		if (!MIGRATIONS[i].fn(ndb)) {
   3133 			fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1));
   3134 			return 0;
   3135 		}
   3136 
   3137 		if (!ndb_queue_write_version(ndb, i+1)) {
   3138 			fprintf(stderr, "run_migrations: failed writing db version");
   3139 			return 0;
   3140 		}
   3141 
   3142 		version = i+1;
   3143 	}
   3144 
   3145 	ndb->version = version;
   3146 
   3147 	return 1;
   3148 }
   3149 
   3150 int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config)
   3151 {
   3152 	struct ndb *ndb;
   3153 	//MDB_dbi ind_id; // TODO: ind_pk, etc
   3154 
   3155 	ndb = *pndb = calloc(1, sizeof(struct ndb));
   3156 	ndb->flags = config->flags;
   3157 
   3158 	if (ndb == NULL) {
   3159 		fprintf(stderr, "ndb_init: malloc failed\n");
   3160 		return 0;
   3161 	}
   3162 
   3163 	if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize))
   3164 		return 0;
   3165 
   3166 	if (!ndb_writer_init(&ndb->writer, &ndb->lmdb)) {
   3167 		fprintf(stderr, "ndb_writer_init failed\n");
   3168 		return 0;
   3169 	}
   3170 
   3171 	if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) {
   3172 		fprintf(stderr, "failed to initialize %d ingester thread(s)\n",
   3173 				config->ingester_threads);
   3174 		return 0;
   3175 	}
   3176 
   3177 	if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE) &&
   3178 			  !ndb_run_migrations(ndb)) {
   3179 		fprintf(stderr, "failed to run migrations\n");
   3180 		return 0;
   3181 	}
   3182 
   3183 	// Initialize LMDB environment and spin up threads
   3184 	return 1;
   3185 }
   3186 
   3187 void ndb_destroy(struct ndb *ndb)
   3188 {
   3189 	if (ndb == NULL)
   3190 		return;
   3191 
   3192 	// ingester depends on writer and must be destroyed first
   3193 	ndb_ingester_destroy(&ndb->ingester);
   3194 	ndb_writer_destroy(&ndb->writer);
   3195 
   3196 	mdb_env_close(ndb->lmdb.env);
   3197 
   3198 	free(ndb);
   3199 }
   3200 
   3201 // Process a nostr event from a client
   3202 //
   3203 // ie: ["EVENT", {"content":"..."} ...]
   3204 //
   3205 // The client-sent variation of ndb_process_event
   3206 int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
   3207 {
   3208 	// Since we need to return as soon as possible, and we're not
   3209 	// making any assumptions about the lifetime of the string, we
   3210 	// definitely need to copy the json here. In the future once we
   3211 	// have our thread that manages a websocket connection, we can
   3212 	// avoid the copy and just use the buffer we get from that
   3213 	// thread.
   3214 	char *json_copy = strdupn(json, len);
   3215 	if (json_copy == NULL)
   3216 		return 0;
   3217 
   3218 	return ndb_ingester_queue_event(&ndb->ingester, json_copy, len, 1);
   3219 }
   3220 
   3221 // Process anostr event from a relay,
   3222 //
   3223 // ie: ["EVENT", "subid", {"content":"..."}...]
   3224 // 
   3225 // This function returns as soon as possible, first copying the passed
   3226 // json and then queueing it up for processing. Worker threads then take
   3227 // the json and process it.
   3228 //
   3229 // Processing:
   3230 //
   3231 // 1. The event is parsed into ndb_notes and the signature is validated
   3232 // 2. A quick lookup is made on the database to see if we already have
   3233 //    the note id, if we do we don't need to waste time on json parsing
   3234 //    or note validation.
   3235 // 3. Once validation is done we pass it to the writer queue for writing
   3236 //    to LMDB.
   3237 //
   3238 int ndb_process_event(struct ndb *ndb, const char *json, int json_len)
   3239 {
   3240 	// Since we need to return as soon as possible, and we're not
   3241 	// making any assumptions about the lifetime of the string, we
   3242 	// definitely need to copy the json here. In the future once we
   3243 	// have our thread that manages a websocket connection, we can
   3244 	// avoid the copy and just use the buffer we get from that
   3245 	// thread.
   3246 	char *json_copy = strdupn(json, json_len);
   3247 	if (json_copy == NULL)
   3248 		return 0;
   3249 
   3250 	return ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len, 0);
   3251 }
   3252 
   3253 
   3254 int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client)
   3255 {
   3256 	const char *start, *end, *very_end;
   3257 	start = ldjson;
   3258 	end = start + json_len;
   3259 	very_end = ldjson + json_len;
   3260 	int (* process)(struct ndb *, const char *, int);
   3261 #if DEBUG
   3262 	int processed = 0;
   3263 #endif
   3264 	process = client ? ndb_process_client_event : ndb_process_event;
   3265 
   3266 	while ((end = fast_strchr(start, '\n', very_end - start))) {
   3267 		//printf("processing '%.*s'\n", (int)(end-start), start);
   3268 		if (!process(ndb, start, end - start)) {
   3269 			ndb_debug("ndb_process_client_event failed\n");
   3270 			return 0;
   3271 		}
   3272 		start = end + 1;
   3273 #if DEBUG
   3274 		processed++;
   3275 #endif
   3276 	}
   3277 
   3278 #if DEBUG
   3279 	ndb_debug("ndb_process_events: processed %d events\n", processed);
   3280 #endif
   3281 
   3282 	return 1;
   3283 }
   3284 
   3285 int ndb_process_events_stream(struct ndb *ndb, FILE* fp)
   3286 {
   3287 	char *line = NULL;
   3288 	size_t len = 0;
   3289 	ssize_t nread;
   3290 
   3291 	while ((nread = getline(&line, &len, fp)) != -1) {
   3292 		if (line == NULL)
   3293 			break;
   3294 		ndb_process_event(ndb, line, len);
   3295 	}
   3296 
   3297 	if (line)
   3298 		free(line);
   3299 
   3300 	return 1;
   3301 }
   3302 
   3303 int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len)
   3304 {
   3305 	return _ndb_process_events(ndb, ldjson, json_len, 1);
   3306 }
   3307 
   3308 int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len)
   3309 {
   3310 	return _ndb_process_events(ndb, ldjson, json_len, 0);
   3311 }
   3312 
   3313 static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag)
   3314 {
   3315 	return cursor_push_u16(cur, tag->count);
   3316 }
   3317 
   3318 int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf,
   3319 		     size_t bufsize)
   3320 {
   3321 	struct ndb_note *note;
   3322 	int half, size, str_indices_size;
   3323 
   3324 	// come on bruh
   3325 	if (bufsize < sizeof(struct ndb_note) * 2)
   3326 		return 0;
   3327 
   3328 	str_indices_size = bufsize / 32;
   3329 	size = bufsize - str_indices_size;
   3330 	half = size / 2;
   3331 
   3332 	//debug("size %d half %d str_indices %d\n", size, half, str_indices_size);
   3333 
   3334 	// make a safe cursor of our available memory
   3335 	make_cursor(buf, buf + bufsize, &builder->mem);
   3336 
   3337 	note = builder->note = (struct ndb_note *)buf;
   3338 
   3339 	// take slices of the memory into subcursors
   3340 	if (!(cursor_slice(&builder->mem, &builder->note_cur, half) &&
   3341 	      cursor_slice(&builder->mem, &builder->strings, half) &&
   3342 	      cursor_slice(&builder->mem, &builder->str_indices, str_indices_size))) {
   3343 		return 0;
   3344 	}
   3345 
   3346 	memset(note, 0, sizeof(*note));
   3347 	builder->note_cur.p += sizeof(*note);
   3348 
   3349 	note->strings = builder->strings.start - buf;
   3350 	note->version = 1;
   3351 
   3352 	return 1;
   3353 }
   3354 
   3355 
   3356 
   3357 static inline int ndb_json_parser_init(struct ndb_json_parser *p,
   3358 				       const char *json, int json_len,
   3359 				       unsigned char *buf, int bufsize)
   3360 {
   3361 	int half = bufsize / 2;
   3362 
   3363 	unsigned char *tok_start = buf + half;
   3364 	unsigned char *tok_end = buf + bufsize;
   3365 
   3366 	p->toks = (jsmntok_t*)tok_start;
   3367 	p->toks_end = (jsmntok_t*)tok_end;
   3368 	p->num_tokens = 0;
   3369 	p->json = json;
   3370 	p->json_len = json_len;
   3371 
   3372 	// ndb_builder gets the first half of the buffer, and jsmn gets the
   3373 	// second half. I like this way of alloating memory (without actually
   3374 	// dynamically allocating memory). You get one big chunk upfront and
   3375 	// then submodules can recursively subdivide it. Maybe you could do
   3376 	// something even more clever like golden-ratio style subdivision where
   3377 	// the more important stuff gets a larger chunk and then it spirals
   3378 	// downward into smaller chunks. Thanks for coming to my TED talk.
   3379 
   3380 	if (!ndb_builder_init(&p->builder, buf, half))
   3381 		return 0;
   3382 
   3383 	jsmn_init(&p->json_parser);
   3384 
   3385 	return 1;
   3386 }
   3387 
   3388 static inline int ndb_json_parser_parse(struct ndb_json_parser *p,
   3389 					struct ndb_id_cb *cb)
   3390 {
   3391 	jsmntok_t *tok;
   3392 	int cap = ((unsigned char *)p->toks_end - (unsigned char*)p->toks)/sizeof(*p->toks);
   3393 	int res =
   3394 		jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap, cb != NULL);
   3395 
   3396 	// got an ID!
   3397 	if (res == -42) {
   3398 		tok = &p->toks[p->json_parser.toknext-1];
   3399 
   3400 		switch (cb->fn(cb->data, p->json + tok->start)) {
   3401 		case NDB_IDRES_CONT:
   3402 			res = jsmn_parse(&p->json_parser, p->json, p->json_len,
   3403 					 p->toks, cap, 0);
   3404 			break;
   3405 		case NDB_IDRES_STOP:
   3406 			return -42;
   3407 		}
   3408 	} else if (res == 0) {
   3409 		return 0;
   3410 	}
   3411 
   3412 	p->num_tokens = res;
   3413 	p->i = 0;
   3414 
   3415 	return 1;
   3416 }
   3417 
   3418 static inline int toksize(jsmntok_t *tok)
   3419 {
   3420 	return tok->end - tok->start;
   3421 }
   3422 
   3423 
   3424 
   3425 static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2)
   3426 {
   3427 	switch (c2) {
   3428 	case 't':  return cursor_push_byte(cur, '\t');
   3429 	case 'n':  return cursor_push_byte(cur, '\n');
   3430 	case 'r':  return cursor_push_byte(cur, '\r');
   3431 	case 'b':  return cursor_push_byte(cur, '\b');
   3432 	case 'f':  return cursor_push_byte(cur, '\f');
   3433 	case '\\': return cursor_push_byte(cur, '\\');
   3434 	case '/':  return cursor_push_byte(cur, '/');
   3435 	case '"':  return cursor_push_byte(cur, '"');
   3436 	case 'u':
   3437 		// these aren't handled yet
   3438 		return 0;
   3439 	default:
   3440 		return cursor_push_byte(cur, c1) && cursor_push_byte(cur, c2);
   3441 	}
   3442 }
   3443 
   3444 static int cursor_push_escaped_char(struct cursor *cur, char c)
   3445 {
   3446         switch (c) {
   3447         case '"':  return cursor_push_str(cur, "\\\"");
   3448         case '\\': return cursor_push_str(cur, "\\\\");
   3449         case '\b': return cursor_push_str(cur, "\\b");
   3450         case '\f': return cursor_push_str(cur, "\\f");
   3451         case '\n': return cursor_push_str(cur, "\\n");
   3452         case '\r': return cursor_push_str(cur, "\\r");
   3453         case '\t': return cursor_push_str(cur, "\\t");
   3454         // TODO: \u hex hex hex hex
   3455         }
   3456         return cursor_push_byte(cur, c);
   3457 }
   3458 
   3459 static int cursor_push_hex_str(struct cursor *cur, unsigned char *buf, int len)
   3460 {
   3461 	int i;
   3462 
   3463 	if (len % 2 != 0)
   3464 		return 0;
   3465 
   3466         if (!cursor_push_byte(cur, '"'))
   3467                 return 0;
   3468 
   3469 	for (i = 0; i < len; i++) {
   3470 		unsigned int c = ((const unsigned char *)buf)[i];
   3471 		if (!cursor_push_byte(cur, hexchar(c >> 4)))
   3472 			return 0;
   3473 		if (!cursor_push_byte(cur, hexchar(c & 0xF)))
   3474 			return 0;
   3475 	}
   3476 
   3477         if (!cursor_push_byte(cur, '"'))
   3478                 return 0;
   3479 
   3480 	return 1;
   3481 }
   3482 
   3483 static int cursor_push_jsonstr(struct cursor *cur, const char *str)
   3484 {
   3485 	int i;
   3486         int len;
   3487 
   3488 	len = strlen(str);
   3489 
   3490         if (!cursor_push_byte(cur, '"'))
   3491                 return 0;
   3492 
   3493         for (i = 0; i < len; i++) {
   3494                 if (!cursor_push_escaped_char(cur, str[i]))
   3495                         return 0;
   3496         }
   3497 
   3498         if (!cursor_push_byte(cur, '"'))
   3499                 return 0;
   3500 
   3501         return 1;
   3502 }
   3503 
   3504 
   3505 static inline int cursor_push_json_tag_str(struct cursor *cur, struct ndb_str str)
   3506 {
   3507 	if (str.flag == NDB_PACKED_ID)
   3508 		return cursor_push_hex_str(cur, str.id, 32);
   3509 
   3510 	return cursor_push_jsonstr(cur, str.str);
   3511 }
   3512 
   3513 static int cursor_push_json_tag(struct cursor *cur, struct ndb_note *note,
   3514 				struct ndb_tag *tag)
   3515 {
   3516         int i;
   3517 
   3518         if (!cursor_push_byte(cur, '['))
   3519                 return 0;
   3520 
   3521         for (i = 0; i < tag->count; i++) {
   3522                 if (!cursor_push_json_tag_str(cur, ndb_tag_str(note, tag, i)))
   3523                         return 0;
   3524                 if (i != tag->count-1 && !cursor_push_byte(cur, ','))
   3525 			return 0;
   3526         }
   3527 
   3528         return cursor_push_byte(cur, ']');
   3529 }
   3530 
   3531 static int cursor_push_json_tags(struct cursor *cur, struct ndb_note *note)
   3532 {
   3533 	int i;
   3534 	struct ndb_iterator iter, *it = &iter;
   3535 	ndb_tags_iterate_start(note, it);
   3536 
   3537         if (!cursor_push_byte(cur, '['))
   3538                 return 0;
   3539 
   3540 	i = 0;
   3541 	while (ndb_tags_iterate_next(it)) {
   3542 		if (!cursor_push_json_tag(cur, note, it->tag))
   3543 			return 0;
   3544                 if (i != note->tags.count-1 && !cursor_push_str(cur, ","))
   3545 			return 0;
   3546 		i++;
   3547 	}
   3548 
   3549         if (!cursor_push_byte(cur, ']'))
   3550                 return 0;
   3551 
   3552 	return 1;
   3553 }
   3554 
   3555 static int ndb_event_commitment(struct ndb_note *ev, unsigned char *buf, int buflen)
   3556 {
   3557 	char timebuf[16] = {0};
   3558 	char kindbuf[16] = {0};
   3559 	char pubkey[65];
   3560 	struct cursor cur;
   3561 	int ok;
   3562 
   3563 	if (!hex_encode(ev->pubkey, sizeof(ev->pubkey), pubkey))
   3564 		return 0;
   3565 
   3566 	make_cursor(buf, buf + buflen, &cur);
   3567 
   3568 	// TODO: update in 2106 ...
   3569 	snprintf(timebuf, sizeof(timebuf), "%d", (uint32_t)ev->created_at);
   3570 	snprintf(kindbuf, sizeof(kindbuf), "%d", ev->kind);
   3571 
   3572 	ok =
   3573 		cursor_push_str(&cur, "[0,\"") &&
   3574 		cursor_push_str(&cur, pubkey) &&
   3575 		cursor_push_str(&cur, "\",") &&
   3576 		cursor_push_str(&cur, timebuf) &&
   3577 		cursor_push_str(&cur, ",") &&
   3578 		cursor_push_str(&cur, kindbuf) &&
   3579 		cursor_push_str(&cur, ",") &&
   3580 		cursor_push_json_tags(&cur, ev) &&
   3581 		cursor_push_str(&cur, ",") &&
   3582 		cursor_push_jsonstr(&cur, ndb_note_str(ev, &ev->content).str) &&
   3583 		cursor_push_str(&cur, "]");
   3584 
   3585 	if (!ok)
   3586 		return 0;
   3587 
   3588 	return cur.p - cur.start;
   3589 }
   3590 
   3591 int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen) {
   3592 	int len;
   3593 
   3594 	if (!(len = ndb_event_commitment(note, buf, buflen)))
   3595 		return 0;
   3596 
   3597 	//fprintf(stderr, "%.*s\n", len, buf);
   3598 
   3599 	sha256((struct sha256*)note->id, buf, len);
   3600 
   3601 	return 1;
   3602 }
   3603 
   3604 int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32],
   3605 		unsigned char sig[64])
   3606 {
   3607 	unsigned char aux[32];
   3608 	secp256k1_keypair *pair = (secp256k1_keypair*) keypair->pair;
   3609 
   3610 	if (!fill_random(aux, sizeof(aux)))
   3611 		return 0;
   3612 
   3613 	secp256k1_context *ctx =
   3614 		secp256k1_context_create(SECP256K1_CONTEXT_NONE);
   3615 
   3616 	return secp256k1_schnorrsig_sign32(ctx, sig, id, pair, aux);
   3617 }
   3618 
   3619 int ndb_create_keypair(struct ndb_keypair *kp)
   3620 {
   3621 	secp256k1_keypair *keypair = (secp256k1_keypair*)kp->pair;
   3622 	secp256k1_xonly_pubkey pubkey;
   3623 
   3624 	secp256k1_context *ctx =
   3625 		secp256k1_context_create(SECP256K1_CONTEXT_NONE);;
   3626 
   3627 	/* Try to create a keypair with a valid context, it should only
   3628 	 * fail if the secret key is zero or out of range. */
   3629 	if (!secp256k1_keypair_create(ctx, keypair, kp->secret))
   3630 		return 0;
   3631 
   3632 	if (!secp256k1_keypair_xonly_pub(ctx, &pubkey, NULL, keypair))
   3633 		return 0;
   3634 
   3635 	/* Serialize the public key. Should always return 1 for a valid public key. */
   3636 	return secp256k1_xonly_pubkey_serialize(ctx, kp->pubkey, &pubkey);
   3637 }
   3638 
   3639 int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair)
   3640 {
   3641 	if (!hex_decode(secstr, strlen(secstr), keypair->secret, 32)) {
   3642 		fprintf(stderr, "could not hex decode secret key\n");
   3643 		return 0;
   3644 	}
   3645 
   3646 	return ndb_create_keypair(keypair);
   3647 }
   3648 
   3649 int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note,
   3650 			 struct ndb_keypair *keypair)
   3651 {
   3652 	int strings_len = builder->strings.p - builder->strings.start;
   3653 	unsigned char *note_end = builder->note_cur.p + strings_len;
   3654 	int total_size = note_end - builder->note_cur.start;
   3655 
   3656 	// move the strings buffer next to the end of our ndb_note
   3657 	memmove(builder->note_cur.p, builder->strings.start, strings_len);
   3658 
   3659 	// set the strings location
   3660 	builder->note->strings = builder->note_cur.p - builder->note_cur.start;
   3661 
   3662 	// record the total size
   3663 	//builder->note->size = total_size;
   3664 
   3665 	*note = builder->note;
   3666 
   3667 	// generate id and sign if we're building this manually
   3668 	if (keypair) {
   3669 		// use the remaining memory for building our id buffer
   3670 		unsigned char *end   = builder->mem.end;
   3671 		unsigned char *start = (unsigned char*)(*note) + total_size;
   3672 
   3673 		ndb_builder_set_pubkey(builder, keypair->pubkey);
   3674 
   3675 		if (!ndb_calculate_id(builder->note, start, end - start))
   3676 			return 0;
   3677 
   3678 		if (!ndb_sign_id(keypair, (*note)->id, (*note)->sig))
   3679 			return 0;
   3680 	}
   3681 
   3682 	// make sure we're aligned as a whole
   3683 	total_size = (total_size + 7) & ~7;
   3684 	assert((total_size % 8) == 0);
   3685 	return total_size;
   3686 }
   3687 
   3688 struct ndb_note * ndb_builder_note(struct ndb_builder *builder)
   3689 {
   3690 	return builder->note;
   3691 }
   3692 
   3693 static union ndb_packed_str ndb_offset_str(uint32_t offset)
   3694 {
   3695 	// ensure accidents like -1 don't corrupt our packed_str
   3696 	union ndb_packed_str str;
   3697 	// most significant byte is reserved for ndb_packtype
   3698 	str.offset = offset & 0xFFFFFF;
   3699 	return str;
   3700 }
   3701 
   3702 
   3703 /// find an existing string via str_indices. these indices only exist in the
   3704 /// builder phase just for this purpose.
   3705 static inline int ndb_builder_find_str(struct ndb_builder *builder,
   3706 				       const char *str, int len,
   3707 				       union ndb_packed_str *pstr)
   3708 {
   3709 	// find existing matching string to avoid duplicate strings
   3710 	int indices = cursor_count(&builder->str_indices, sizeof(uint32_t));
   3711 	for (int i = 0; i < indices; i++) {
   3712 		uint32_t index = ((uint32_t*)builder->str_indices.start)[i];
   3713 		const char *some_str = (const char*)builder->strings.start + index;
   3714 
   3715 		if (!memcmp(some_str, str, len)) {
   3716 			// found an existing matching str, use that index
   3717 			*pstr = ndb_offset_str(index);
   3718 			return 1;
   3719 		}
   3720 	}
   3721 
   3722 	return 0;
   3723 }
   3724 
   3725 static int ndb_builder_push_str(struct ndb_builder *builder, const char *str,
   3726 				int len, union ndb_packed_str *pstr)
   3727 {
   3728 	uint32_t loc;
   3729 
   3730 	// no string found, push a new one
   3731 	loc = builder->strings.p - builder->strings.start;
   3732 	if (!(cursor_push(&builder->strings, (unsigned char*)str, len) &&
   3733 	      cursor_push_byte(&builder->strings, '\0'))) {
   3734 		return 0;
   3735 	}
   3736 
   3737 	*pstr = ndb_offset_str(loc);
   3738 
   3739 	// record in builder indices. ignore return value, if we can't cache it
   3740 	// then whatever
   3741 	cursor_push_u32(&builder->str_indices, loc);
   3742 
   3743 	return 1;
   3744 }
   3745 
   3746 static int ndb_builder_push_packed_id(struct ndb_builder *builder,
   3747 				      unsigned char *id,
   3748 				      union ndb_packed_str *pstr)
   3749 {
   3750 	// Don't both find id duplicates. very rarely are they duplicated
   3751 	// and it slows things down quite a bit. If we really care about this
   3752 	// We can switch to a hash table.
   3753 	//if (ndb_builder_find_str(builder, (const char*)id, 32, pstr)) {
   3754 	//	pstr->packed.flag = NDB_PACKED_ID;
   3755 	//	return 1;
   3756 	//}
   3757 
   3758 	if (ndb_builder_push_str(builder, (const char*)id, 32, pstr)) {
   3759 		pstr->packed.flag = NDB_PACKED_ID;
   3760 		return 1;
   3761 	}
   3762 
   3763 	return 0;
   3764 }
   3765 
   3766 union ndb_packed_str ndb_chars_to_packed_str(char c1, char c2)
   3767 {
   3768 	union ndb_packed_str str;
   3769 	str.packed.flag = NDB_PACKED_STR;
   3770 	str.packed.str[0] = c1;
   3771 	str.packed.str[1] = c2;
   3772 	str.packed.str[2] = '\0';
   3773 	return str;
   3774 }
   3775 
   3776 static union ndb_packed_str ndb_char_to_packed_str(char c)
   3777 {
   3778 	union ndb_packed_str str;
   3779 	str.packed.flag = NDB_PACKED_STR;
   3780 	str.packed.str[0] = c;
   3781 	str.packed.str[1] = '\0';
   3782 	return str;
   3783 }
   3784 
   3785 
   3786 /// Check for small strings to pack
   3787 static inline int ndb_builder_try_compact_str(struct ndb_builder *builder,
   3788 					      const char *str, int len,
   3789 					      union ndb_packed_str *pstr,
   3790 					      int pack_ids)
   3791 {
   3792 	unsigned char id_buf[32];
   3793 
   3794 	if (len == 0) {
   3795 		*pstr = ndb_char_to_packed_str(0);
   3796 		return 1;
   3797 	} else if (len == 1) {
   3798 		*pstr = ndb_char_to_packed_str(str[0]);
   3799 		return 1;
   3800 	} else if (len == 2) {
   3801 		*pstr = ndb_chars_to_packed_str(str[0], str[1]);
   3802 		return 1;
   3803 	} else if (pack_ids && len == 64 && hex_decode(str, 64, id_buf, 32)) {
   3804 		return ndb_builder_push_packed_id(builder, id_buf, pstr);
   3805 	}
   3806 
   3807 	return 0;
   3808 }
   3809 
   3810 
   3811 static int ndb_builder_push_unpacked_str(struct ndb_builder *builder,
   3812 					 const char *str, int len,
   3813 					 union ndb_packed_str *pstr)
   3814 {
   3815 	if (ndb_builder_find_str(builder, str, len, pstr))
   3816 		return 1;
   3817 
   3818 	return ndb_builder_push_str(builder, str, len, pstr);
   3819 }
   3820 
   3821 int ndb_builder_make_str(struct ndb_builder *builder, const char *str, int len,
   3822 			 union ndb_packed_str *pstr, int pack_ids)
   3823 {
   3824 	if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids))
   3825 		return 1;
   3826 
   3827 	return ndb_builder_push_unpacked_str(builder, str, len, pstr);
   3828 }
   3829 
   3830 int ndb_builder_set_content(struct ndb_builder *builder, const char *content,
   3831 			    int len)
   3832 {
   3833 	int pack_ids = 0;
   3834 	builder->note->content_length = len;
   3835 	return ndb_builder_make_str(builder, content, len,
   3836 				    &builder->note->content, pack_ids);
   3837 }
   3838 
   3839 
   3840 static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len,
   3841 			 const char *s)
   3842 {
   3843 	if (tok->type == JSMN_STRING && (int)strlen(s) == tok_len &&
   3844 	    memcmp(json + tok->start, s, tok_len) == 0) {
   3845 		return 1;
   3846 	}
   3847 	return 0;
   3848 }
   3849 
   3850 static int ndb_builder_finalize_tag(struct ndb_builder *builder,
   3851 				    union ndb_packed_str offset)
   3852 {
   3853 	if (!cursor_push_u32(&builder->note_cur, offset.offset))
   3854 		return 0;
   3855 	builder->current_tag->count++;
   3856 	return 1;
   3857 }
   3858 
   3859 /// Unescape and push json strings
   3860 static int ndb_builder_make_json_str(struct ndb_builder *builder,
   3861 				     const char *str, int len,
   3862 				     union ndb_packed_str *pstr,
   3863 				     int *written, int pack_ids)
   3864 {
   3865 	// let's not care about de-duping these. we should just unescape
   3866 	// in-place directly into the strings table. 
   3867 	if (written)
   3868 		*written = len;
   3869 
   3870 	const char *p, *end, *start;
   3871 	unsigned char *builder_start;
   3872 
   3873 	// always try compact strings first
   3874 	if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids))
   3875 		return 1;
   3876 
   3877 	end = str + len;
   3878 	start = str; // Initialize start to the beginning of the string
   3879 
   3880 	*pstr = ndb_offset_str(builder->strings.p - builder->strings.start);
   3881 	builder_start = builder->strings.p;
   3882 
   3883 	for (p = str; p < end; p++) {
   3884 		if (*p == '\\' && p+1 < end) {
   3885 			// Push the chunk of unescaped characters before this escape sequence
   3886 			if (start < p && !cursor_push(&builder->strings,
   3887 						(unsigned char *)start,
   3888 						p - start)) {
   3889 				return 0;
   3890 			}
   3891 
   3892 			if (!cursor_push_unescaped_char(&builder->strings, *p, *(p+1)))
   3893 				return 0;
   3894 
   3895 			p++; // Skip the character following the backslash
   3896 			start = p + 1; // Update the start pointer to the next character
   3897 		}
   3898 	}
   3899 
   3900 	// Handle the last chunk after the last escape sequence (or if there are no escape sequences at all)
   3901 	if (start < p && !cursor_push(&builder->strings, (unsigned char *)start,
   3902 				      p - start)) {
   3903 		return 0;
   3904 	}
   3905 
   3906 	if (written)
   3907 		*written = builder->strings.p - builder_start;
   3908 
   3909 	// TODO: dedupe these!?
   3910 	return cursor_push_byte(&builder->strings, '\0');
   3911 }
   3912 
   3913 static int ndb_builder_push_json_tag(struct ndb_builder *builder,
   3914 				     const char *str, int len)
   3915 {
   3916 	union ndb_packed_str pstr;
   3917 	int pack_ids = 1;
   3918 	if (!ndb_builder_make_json_str(builder, str, len, &pstr, NULL, pack_ids))
   3919 		return 0;
   3920 	return ndb_builder_finalize_tag(builder, pstr);
   3921 }
   3922 
   3923 // Push a json array into an ndb tag ["p", "abcd..."] -> struct ndb_tag
   3924 static int ndb_builder_tag_from_json_array(struct ndb_json_parser *p,
   3925 					   jsmntok_t *array)
   3926 {
   3927 	jsmntok_t *str_tok;
   3928 	const char *str;
   3929 
   3930 	if (array->size == 0)
   3931 		return 0;
   3932 
   3933 	if (!ndb_builder_new_tag(&p->builder))
   3934 		return 0;
   3935 
   3936 	for (int i = 0; i < array->size; i++) {
   3937 		str_tok = &array[i+1];
   3938 		str = p->json + str_tok->start;
   3939 
   3940 		if (!ndb_builder_push_json_tag(&p->builder, str,
   3941 					       toksize(str_tok))) {
   3942 			return 0;
   3943 		}
   3944 	}
   3945 
   3946 	return 1;
   3947 }
   3948 
   3949 // Push json tags into ndb data
   3950 //   [["t", "hashtag"], ["p", "abcde..."]] -> struct ndb_tags
   3951 static inline int ndb_builder_process_json_tags(struct ndb_json_parser *p,
   3952 						jsmntok_t *array)
   3953 {
   3954 	jsmntok_t *tag = array;
   3955 
   3956 	if (array->size == 0)
   3957 		return 1;
   3958 
   3959 	for (int i = 0; i < array->size; i++) {
   3960 		if (!ndb_builder_tag_from_json_array(p, &tag[i+1]))
   3961 			return 0;
   3962 		tag += tag[i+1].size;
   3963 	}
   3964 
   3965 	return 1;
   3966 }
   3967 
   3968 static int parse_unsigned_int(const char *start, int len, unsigned int *num)
   3969 {
   3970 	unsigned int number = 0;
   3971 	const char *p = start, *end = start + len;
   3972 	int digits = 0;
   3973 
   3974 	while (p < end) {
   3975 		char c = *p;
   3976 
   3977 		if (c < '0' || c > '9')
   3978 			break;
   3979 
   3980 		// Check for overflow
   3981 		char digit = c - '0';
   3982 		if (number > (UINT_MAX - digit) / 10)
   3983 			return 0; // Overflow detected
   3984 
   3985 		number = number * 10 + digit;
   3986 
   3987 		p++;
   3988 		digits++;
   3989 	}
   3990 
   3991 	if (digits == 0)
   3992 		return 0;
   3993 
   3994 	*num = number;
   3995 	return 1;
   3996 }
   3997 
   3998 int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce,
   3999 			       unsigned char *buf, int bufsize, struct ndb_id_cb *cb)
   4000 {
   4001 	jsmntok_t *tok = NULL;
   4002 	int tok_len, res;
   4003 	struct ndb_json_parser parser;
   4004 
   4005 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
   4006 
   4007 	if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
   4008 		return res;
   4009 
   4010 	if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY)
   4011 		return 0;
   4012 
   4013 	parser.i = 1;
   4014 	tok = &parser.toks[parser.i++];
   4015 	tok_len = toksize(tok);
   4016 	if (tok->type != JSMN_STRING)
   4017 		return 0;
   4018 
   4019 	if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
   4020 		fce->evtype = NDB_FCE_EVENT;
   4021 		struct ndb_event *ev = &fce->event;
   4022 		return ndb_parse_json_note(&parser, &ev->note);
   4023 	}
   4024 
   4025 	return 0;
   4026 }
   4027 
   4028 
   4029 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
   4030 			   unsigned char *buf, int bufsize,
   4031 			   struct ndb_id_cb *cb)
   4032 {
   4033 	jsmntok_t *tok = NULL;
   4034 	int tok_len, res;
   4035 	struct ndb_json_parser parser;
   4036 
   4037 	tce->subid_len = 0;
   4038 	tce->subid = "";
   4039 
   4040 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
   4041 
   4042 	if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
   4043 		return res;
   4044 
   4045 	if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY)
   4046 		return 0;
   4047 
   4048 	parser.i = 1;
   4049 	tok = &parser.toks[parser.i++];
   4050 	tok_len = toksize(tok);
   4051 	if (tok->type != JSMN_STRING)
   4052 		return 0;
   4053 
   4054 	if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
   4055 		tce->evtype = NDB_TCE_EVENT;
   4056 		struct ndb_event *ev = &tce->event;
   4057 
   4058 		tok = &parser.toks[parser.i++];
   4059 		if (tok->type != JSMN_STRING)
   4060 			return 0;
   4061 
   4062 		tce->subid = json + tok->start;
   4063 		tce->subid_len = toksize(tok);
   4064 
   4065 		return ndb_parse_json_note(&parser, &ev->note);
   4066 	} else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) { 
   4067 		tce->evtype = NDB_TCE_EOSE;
   4068 
   4069 		tok = &parser.toks[parser.i++];
   4070 		if (tok->type != JSMN_STRING)
   4071 			return 0;
   4072 
   4073 		tce->subid = json + tok->start;
   4074 		tce->subid_len = toksize(tok);
   4075 		return 1;
   4076 	} else if (tok_len == 2 && !memcmp("OK", json + tok->start, 2)) {
   4077 		if (parser.num_tokens != 5)
   4078 			return 0;
   4079 
   4080 		struct ndb_command_result *cr = &tce->command_result;
   4081 
   4082 		tce->evtype = NDB_TCE_OK;
   4083 
   4084 		tok = &parser.toks[parser.i++];
   4085 		if (tok->type != JSMN_STRING)
   4086 			return 0;
   4087 
   4088 		tce->subid = json + tok->start;
   4089 		tce->subid_len = toksize(tok);
   4090 
   4091 		tok = &parser.toks[parser.i++];
   4092 		if (tok->type != JSMN_PRIMITIVE || toksize(tok) == 0)
   4093 			return 0;
   4094 
   4095 		cr->ok = (json + tok->start)[0] == 't';
   4096 
   4097 		tok = &parser.toks[parser.i++];
   4098 		if (tok->type != JSMN_STRING)
   4099 			return 0;
   4100 
   4101 		tce->command_result.msg = json + tok->start;
   4102 		tce->command_result.msglen = toksize(tok);
   4103 
   4104 		return 1;
   4105 	}
   4106 
   4107 	return 0;
   4108 }
   4109 
   4110 int ndb_parse_json_note(struct ndb_json_parser *parser, struct ndb_note **note)
   4111 {
   4112 	jsmntok_t *tok = NULL;
   4113 	unsigned char hexbuf[64];
   4114 	const char *json = parser->json;
   4115 	const char *start;
   4116 	int i, tok_len, parsed;
   4117 
   4118 	parsed = 0;
   4119 
   4120 	if (parser->toks[parser->i].type != JSMN_OBJECT)
   4121 		return 0;
   4122 
   4123 	// TODO: build id buffer and verify at end
   4124 
   4125 	for (i = parser->i + 1; i < parser->num_tokens; i++) {
   4126 		tok = &parser->toks[i];
   4127 		start = json + tok->start;
   4128 		tok_len = toksize(tok);
   4129 
   4130 		//printf("toplevel %.*s %d\n", tok_len, json + tok->start, tok->type);
   4131 		if (tok_len == 0 || i + 1 >= parser->num_tokens)
   4132 			continue;
   4133 
   4134 		if (start[0] == 'p' && jsoneq(json, tok, tok_len, "pubkey")) {
   4135 			// pubkey
   4136 			tok = &parser->toks[i+1];
   4137 			hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf));
   4138 			parsed |= NDB_PARSED_PUBKEY;
   4139 			ndb_builder_set_pubkey(&parser->builder, hexbuf);
   4140 		} else if (tok_len == 2 && start[0] == 'i' && start[1] == 'd') {
   4141 			// id
   4142 			tok = &parser->toks[i+1];
   4143 			hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf));
   4144 			parsed |= NDB_PARSED_ID;
   4145 			ndb_builder_set_id(&parser->builder, hexbuf);
   4146 		} else if (tok_len == 3 && start[0] == 's' && start[1] == 'i' && start[2] == 'g') {
   4147 			// sig
   4148 			tok = &parser->toks[i+1];
   4149 			hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf));
   4150 			parsed |= NDB_PARSED_SIG;
   4151 			ndb_builder_set_sig(&parser->builder, hexbuf);
   4152 		} else if (start[0] == 'k' && jsoneq(json, tok, tok_len, "kind")) {
   4153 			// kind
   4154 			tok = &parser->toks[i+1];
   4155 			start = json + tok->start;
   4156 			if (tok->type != JSMN_PRIMITIVE || tok_len <= 0)
   4157 				return 0;
   4158 			if (!parse_unsigned_int(start, toksize(tok),
   4159 						&parser->builder.note->kind))
   4160 					return 0;
   4161 			parsed |= NDB_PARSED_KIND;
   4162 		} else if (start[0] == 'c') {
   4163 			if (jsoneq(json, tok, tok_len, "created_at")) {
   4164 				// created_at
   4165 				tok = &parser->toks[i+1];
   4166 				start = json + tok->start;
   4167 				if (tok->type != JSMN_PRIMITIVE || tok_len <= 0)
   4168 					return 0;
   4169 				// TODO: update to int64 in 2106 ... xD
   4170 				unsigned int bigi;
   4171 				if (!parse_unsigned_int(start, toksize(tok), &bigi))
   4172 					return 0;
   4173 				parser->builder.note->created_at = bigi;
   4174 				parsed |= NDB_PARSED_CREATED_AT;
   4175 			} else if (jsoneq(json, tok, tok_len, "content")) {
   4176 				// content
   4177 				tok = &parser->toks[i+1];
   4178 				union ndb_packed_str pstr;
   4179 				tok_len = toksize(tok);
   4180 				int written, pack_ids = 0;
   4181 				if (!ndb_builder_make_json_str(&parser->builder,
   4182 							json + tok->start,
   4183 							tok_len, &pstr,
   4184 							&written, pack_ids)) {
   4185 					ndb_debug("ndb_builder_make_json_str failed\n");
   4186 					return 0;
   4187 				}
   4188 				parser->builder.note->content_length = written;
   4189 				parser->builder.note->content = pstr;
   4190 				parsed |= NDB_PARSED_CONTENT;
   4191 			}
   4192 		} else if (start[0] == 't' && jsoneq(json, tok, tok_len, "tags")) {
   4193 			tok = &parser->toks[i+1];
   4194 			ndb_builder_process_json_tags(parser, tok);
   4195 			i += tok->size;
   4196 			parsed |= NDB_PARSED_TAGS;
   4197 		}
   4198 	}
   4199 
   4200 	//ndb_debug("parsed %d = %d, &->%d", parsed, NDB_PARSED_ALL, parsed & NDB_PARSED_ALL);
   4201 	if (parsed != NDB_PARSED_ALL)
   4202 		return 0;
   4203 
   4204 	return ndb_builder_finalize(&parser->builder, note, NULL);
   4205 }
   4206 
   4207 int ndb_note_from_json(const char *json, int len, struct ndb_note **note,
   4208 		       unsigned char *buf, int bufsize)
   4209 {
   4210 	struct ndb_json_parser parser;
   4211 	int res;
   4212 
   4213 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
   4214 	if ((res = ndb_json_parser_parse(&parser, NULL)) < 0)
   4215 		return res;
   4216 
   4217 	if (parser.num_tokens < 1)
   4218 		return 0;
   4219 
   4220 	return ndb_parse_json_note(&parser, note);
   4221 }
   4222 
   4223 void ndb_builder_set_pubkey(struct ndb_builder *builder, unsigned char *pubkey)
   4224 {
   4225 	memcpy(builder->note->pubkey, pubkey, 32);
   4226 }
   4227 
   4228 void ndb_builder_set_id(struct ndb_builder *builder, unsigned char *id)
   4229 {
   4230 	memcpy(builder->note->id, id, 32);
   4231 }
   4232 
   4233 void ndb_builder_set_sig(struct ndb_builder *builder, unsigned char *sig)
   4234 {
   4235 	memcpy(builder->note->sig, sig, 64);
   4236 }
   4237 
   4238 void ndb_builder_set_kind(struct ndb_builder *builder, uint32_t kind)
   4239 {
   4240 	builder->note->kind = kind;
   4241 }
   4242 
   4243 void ndb_builder_set_created_at(struct ndb_builder *builder, uint64_t created_at)
   4244 {
   4245 	builder->note->created_at = created_at;
   4246 }
   4247 
   4248 int ndb_builder_new_tag(struct ndb_builder *builder)
   4249 {
   4250 	builder->note->tags.count++;
   4251 	struct ndb_tag tag = {0};
   4252 	builder->current_tag = (struct ndb_tag *)builder->note_cur.p;
   4253 	return cursor_push_tag(&builder->note_cur, &tag);
   4254 }
   4255 
   4256 void ndb_stat_counts_init(struct ndb_stat_counts *counts)
   4257 {
   4258 	counts->count = 0;
   4259 	counts->key_size = 0;
   4260 	counts->value_size = 0;
   4261 }
   4262 
   4263 static void ndb_stat_init(struct ndb_stat *stat)
   4264 {
   4265 	// init stats
   4266 	int i;
   4267 
   4268 	for (i = 0; i < NDB_CKIND_COUNT; i++) {
   4269 		ndb_stat_counts_init(&stat->common_kinds[i]);
   4270 	}
   4271 
   4272 	for (i = 0; i < NDB_DBS; i++) {
   4273 		ndb_stat_counts_init(&stat->dbs[i]);
   4274 	}
   4275 
   4276 	ndb_stat_counts_init(&stat->other_kinds);
   4277 }
   4278 
   4279 int ndb_stat(struct ndb *ndb, struct ndb_stat *stat)
   4280 {
   4281 	int rc;
   4282 	MDB_cursor *cur;
   4283 	MDB_val k, v;
   4284 	MDB_dbi db;
   4285 	struct ndb_txn txn;
   4286 	struct ndb_note *note;
   4287 	int i;
   4288 	enum ndb_common_kind common_kind;
   4289 
   4290 	// initialize to 0
   4291 	ndb_stat_init(stat);
   4292 
   4293 	if (!ndb_begin_query(ndb, &txn)) {
   4294 		fprintf(stderr, "ndb_stat failed at ndb_begin_query\n");
   4295 		return 0;
   4296 	}
   4297 
   4298 	// stat each dbi in the database
   4299 	for (i = 0; i < NDB_DBS; i++)
   4300 	{
   4301 		db = ndb->lmdb.dbs[i];
   4302 
   4303 		if ((rc = mdb_cursor_open(txn.mdb_txn, db, &cur))) {
   4304 			fprintf(stderr, "ndb_stat: mdb_cursor_open failed, error '%s'\n",
   4305 					mdb_strerror(rc));
   4306 			return 0;
   4307 		}
   4308 
   4309 		// loop over every entry and count kv sizes
   4310 		while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   4311 			// we gather more detailed per-kind stats if we're in
   4312 			// the notes db
   4313 			if (i == NDB_DB_NOTE) {
   4314 				note = v.mv_data;
   4315 				common_kind = ndb_kind_to_common_kind(note->kind);
   4316 
   4317 				// uncommon kind? just count them in bulk
   4318 				if ((int)common_kind == -1) {
   4319 					stat->other_kinds.count++;
   4320 					stat->other_kinds.key_size += k.mv_size;
   4321 					stat->other_kinds.value_size += v.mv_size;
   4322 				} else {
   4323 					stat->common_kinds[common_kind].count++;
   4324 					stat->common_kinds[common_kind].key_size += k.mv_size;
   4325 					stat->common_kinds[common_kind].value_size += v.mv_size;
   4326 				}
   4327 			}
   4328 
   4329 			stat->dbs[i].count++;
   4330 			stat->dbs[i].key_size += k.mv_size;
   4331 			stat->dbs[i].value_size += v.mv_size;
   4332 		}
   4333 
   4334 		// close the cursor, they are per-dbi
   4335 		mdb_cursor_close(cur);
   4336 	}
   4337 
   4338 	ndb_end_query(&txn);
   4339 
   4340 	return 1;
   4341 }
   4342 
   4343 /// Push an element to the current tag
   4344 /// 
   4345 /// Basic idea is to call ndb_builder_new_tag
   4346 inline int ndb_builder_push_tag_str(struct ndb_builder *builder,
   4347 				    const char *str, int len)
   4348 {
   4349 	union ndb_packed_str pstr;
   4350 	int pack_ids = 1;
   4351 	if (!ndb_builder_make_str(builder, str, len, &pstr, pack_ids))
   4352 		return 0;
   4353 	return ndb_builder_finalize_tag(builder, pstr);
   4354 }
   4355 
   4356 //
   4357 // CONFIG
   4358 // 
   4359 void ndb_default_config(struct ndb_config *config)
   4360 {
   4361 	int cores = get_cpu_cores();
   4362 	config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB
   4363 	config->ingester_threads = cores == -1 ? 4 : cores;
   4364 	config->flags = 0;
   4365 	config->ingest_filter = NULL;
   4366 	config->filter_context = NULL;
   4367 }
   4368 
   4369 void ndb_config_set_ingest_threads(struct ndb_config *config, int threads)
   4370 {
   4371 	config->ingester_threads = threads;
   4372 }
   4373 
   4374 void ndb_config_set_flags(struct ndb_config *config, int flags)
   4375 {
   4376 	config->flags = flags;
   4377 }
   4378 
   4379 void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize)
   4380 {
   4381 	config->mapsize = mapsize;
   4382 }
   4383 
   4384 void ndb_config_set_ingest_filter(struct ndb_config *config,
   4385 				  ndb_ingest_filter_fn fn, void *filter_ctx)
   4386 {
   4387 	config->ingest_filter = fn;
   4388 	config->filter_context = filter_ctx;
   4389 }
   4390 
   4391 // used by ndb.c
   4392 int ndb_print_search_keys(struct ndb_txn *txn)
   4393 {
   4394 	MDB_cursor *cur;
   4395 	MDB_val k, v;
   4396 	int i;
   4397 	struct ndb_text_search_key search_key;
   4398 
   4399 	if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TEXT], &cur))
   4400 		return 0;
   4401 
   4402 	i = 1;
   4403 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   4404 		if (!ndb_unpack_text_search_key(k.mv_data, k.mv_size, &search_key)) {
   4405 			fprintf(stderr, "error decoding key %d\n", i);
   4406 			continue;
   4407 		}
   4408 
   4409 		ndb_print_text_search_key(&search_key);
   4410 		printf("\n");
   4411 
   4412 		i++;
   4413 	}
   4414 
   4415 	return 1;
   4416 }
   4417 
   4418 struct ndb_tags *ndb_note_tags(struct ndb_note *note)
   4419 {
   4420 	return &note->tags;
   4421 }
   4422 
   4423 struct ndb_str ndb_note_str(struct ndb_note *note, union ndb_packed_str *pstr)
   4424 {
   4425 	struct ndb_str str;
   4426 	str.flag = pstr->packed.flag;
   4427 
   4428 	if (str.flag == NDB_PACKED_STR) {
   4429 		str.str = pstr->packed.str;
   4430 		return str;
   4431 	}
   4432 
   4433 	str.str = ((const char *)note) + note->strings + (pstr->offset & 0xFFFFFF);
   4434 	return str;
   4435 }
   4436 
   4437 struct ndb_str ndb_tag_str(struct ndb_note *note, struct ndb_tag *tag, int ind)
   4438 {
   4439 	return ndb_note_str(note, &tag->strs[ind]);
   4440 }
   4441 
   4442 struct ndb_str ndb_iter_tag_str(struct ndb_iterator *iter, int ind)
   4443 {
   4444 	return ndb_tag_str(iter->note, iter->tag, ind);
   4445 }
   4446 
   4447 unsigned char * ndb_note_id(struct ndb_note *note)
   4448 {
   4449 	return note->id;
   4450 }
   4451 
   4452 unsigned char * ndb_note_pubkey(struct ndb_note *note)
   4453 {
   4454 	return note->pubkey;
   4455 }
   4456 
   4457 unsigned char * ndb_note_sig(struct ndb_note *note)
   4458 {
   4459 	return note->sig;
   4460 }
   4461 
   4462 uint32_t ndb_note_created_at(struct ndb_note *note)
   4463 {
   4464 	return note->created_at;
   4465 }
   4466 
   4467 uint32_t ndb_note_kind(struct ndb_note *note)
   4468 {
   4469 	return note->kind;
   4470 }
   4471 
   4472 void _ndb_note_set_kind(struct ndb_note *note, uint32_t kind)
   4473 {
   4474 	note->kind = kind;
   4475 }
   4476 
   4477 const char *ndb_note_content(struct ndb_note *note)
   4478 {
   4479 	return ndb_note_str(note, &note->content).str;
   4480 }
   4481 
   4482 uint32_t ndb_note_content_length(struct ndb_note *note)
   4483 {
   4484 	return note->content_length;
   4485 }
   4486 
   4487 struct ndb_note * ndb_note_from_bytes(unsigned char *bytes)
   4488 {
   4489 	struct ndb_note *note = (struct ndb_note *)bytes;
   4490 	if (note->version != 1)
   4491 		return 0;
   4492 	return note;
   4493 }
   4494 
   4495 void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter)
   4496 {
   4497 	iter->note = note;
   4498 	iter->tag = NULL;
   4499 	iter->index = -1;
   4500 }
   4501 
   4502 int ndb_tags_iterate_next(struct ndb_iterator *iter)
   4503 {
   4504 	if (iter->tag == NULL || iter->index == -1) {
   4505 		iter->tag = iter->note->tags.tag;
   4506 		iter->index = 0;
   4507 		return iter->note->tags.count != 0;
   4508 	}
   4509 
   4510 	struct ndb_tags *tags = &iter->note->tags;
   4511 
   4512 	if (++iter->index < tags->count) {
   4513 		uint32_t tag_data_size = iter->tag->count * sizeof(iter->tag->strs[0]);
   4514 		iter->tag = (struct ndb_tag *)(iter->tag->strs[0].bytes + tag_data_size);
   4515 		return 1;
   4516 	}
   4517 
   4518 	return 0;
   4519 }
   4520 
   4521 uint16_t ndb_tags_count(struct ndb_tags *tags)
   4522 {
   4523 	return tags->count;
   4524 }
   4525 
   4526 uint16_t ndb_tag_count(struct ndb_tag *tags)
   4527 {
   4528 	return tags->count;
   4529 }
   4530 
   4531 enum ndb_common_kind ndb_kind_to_common_kind(int kind)
   4532 {
   4533 	switch (kind)
   4534 	{
   4535 		case 0:     return NDB_CKIND_PROFILE;
   4536 		case 1:     return NDB_CKIND_TEXT;
   4537 		case 3:     return NDB_CKIND_CONTACTS;
   4538 		case 4:     return NDB_CKIND_DM;
   4539 		case 5:     return NDB_CKIND_DELETE;
   4540 		case 6:     return NDB_CKIND_REPOST;
   4541 		case 7:     return NDB_CKIND_REACTION;
   4542 		case 9735:  return NDB_CKIND_ZAP;
   4543 		case 9734:  return NDB_CKIND_ZAP_REQUEST;
   4544 		case 23194: return NDB_CKIND_NWC_REQUEST;
   4545 		case 23195: return NDB_CKIND_NWC_RESPONSE;
   4546 		case 27235: return NDB_CKIND_HTTP_AUTH;
   4547 		case 30000: return NDB_CKIND_LIST;
   4548 		case 30023: return NDB_CKIND_LONGFORM;
   4549 		case 30315: return NDB_CKIND_STATUS;
   4550 	}
   4551 
   4552 	return -1;
   4553 }
   4554 
   4555 const char *ndb_kind_name(enum ndb_common_kind ck)
   4556 {
   4557 	switch (ck) {
   4558 		case NDB_CKIND_PROFILE:      return "profile";
   4559 		case NDB_CKIND_TEXT:         return "text";
   4560 		case NDB_CKIND_CONTACTS:     return "contacts";
   4561 		case NDB_CKIND_DM:           return "dm";
   4562 		case NDB_CKIND_DELETE:       return "delete";
   4563 		case NDB_CKIND_REPOST:       return "repost";
   4564 		case NDB_CKIND_REACTION:     return "reaction";
   4565 		case NDB_CKIND_ZAP:          return "zap";
   4566 		case NDB_CKIND_ZAP_REQUEST:  return "zap_request";
   4567 		case NDB_CKIND_NWC_REQUEST:  return "nwc_request";
   4568 		case NDB_CKIND_NWC_RESPONSE: return "nwc_response";
   4569 		case NDB_CKIND_HTTP_AUTH:    return "http_auth";
   4570 		case NDB_CKIND_LIST:         return "list";
   4571 		case NDB_CKIND_LONGFORM:     return "longform";
   4572 		case NDB_CKIND_STATUS:       return "status";
   4573 		case NDB_CKIND_COUNT:        return "unknown";
   4574 	}
   4575 
   4576 	return "unknown";
   4577 }
   4578 
   4579 const char *ndb_db_name(enum ndb_dbs db)
   4580 {
   4581 	switch (db) {
   4582 		case NDB_DB_NOTE:
   4583 			return "note";
   4584 		case NDB_DB_META:
   4585 			return "note_metadata";
   4586 		case NDB_DB_PROFILE:
   4587 			return "profile";
   4588 		case NDB_DB_NOTE_ID:
   4589 			return "note_index";
   4590 		case NDB_DB_PROFILE_PK:
   4591 			return "profile_pubkey_index";
   4592 		case NDB_DB_NDB_META:
   4593 			return "nostrdb_metadata";
   4594 		case NDB_DB_PROFILE_SEARCH:
   4595 			return "profile_search";
   4596 		case NDB_DB_PROFILE_LAST_FETCH:
   4597 			return "profile_last_fetch";
   4598 		case NDB_DB_NOTE_KIND:
   4599 			return "note_kind_index";
   4600 		case NDB_DB_NOTE_TEXT:
   4601 			return "note_fulltext";
   4602 		case NDB_DBS:
   4603 			return "count";
   4604 	}
   4605 
   4606 	return "unknown";
   4607 }