nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

nostrdb.c (174139B)


      1 
      2 #include "nostrdb.h"
      3 #include "jsmn.h"
      4 #include "hex.h"
      5 #include "cursor.h"
      6 #include "random.h"
      7 #include "ccan/crypto/sha256/sha256.h"
      8 #include "bolt11/bolt11.h"
      9 #include "bolt11/amount.h"
     10 #include "lmdb.h"
     11 #include "util.h"
     12 #include "cpu.h"
     13 #include "block.h"
     14 #include "threadpool.h"
     15 #include "thread.h"
     16 #include "protected_queue.h"
     17 #include "memchr.h"
     18 #include "print_util.h"
     19 #include <stdlib.h>
     20 #include <limits.h>
     21 #include <assert.h>
     22 
     23 #include "bindings/c/profile_json_parser.h"
     24 #include "bindings/c/profile_builder.h"
     25 #include "bindings/c/meta_builder.h"
     26 #include "bindings/c/meta_reader.h"
     27 #include "bindings/c/profile_verifier.h"
     28 #include "secp256k1.h"
     29 #include "secp256k1_ecdh.h"
     30 #include "secp256k1_schnorrsig.h"
     31 
     32 #define max(a,b) ((a) > (b) ? (a) : (b))
     33 #define min(a,b) ((a) < (b) ? (a) : (b))
     34 
     35 // the maximum number of things threads pop and push in bulk
     36 #define THREAD_QUEUE_BATCH 4096
     37 
     38 // maximum number of active subscriptions
     39 #define MAX_SUBSCRIPTIONS 256
     40 #define MAX_SCAN_CURSORS 12
     41 #define MAX_FILTERS    16
     42 
     43 // the maximum size of inbox queues
     44 static const int DEFAULT_QUEUE_SIZE = 32768;
     45 
     46 // increase if we need bigger filters
     47 #define NDB_FILTER_PAGES 64
     48 
     49 #define ndb_flag_set(flags, f) ((flags & f) == f)
     50 
     51 #define NDB_PARSED_ID           (1 << 0)
     52 #define NDB_PARSED_PUBKEY       (1 << 1)
     53 #define NDB_PARSED_SIG          (1 << 2)
     54 #define NDB_PARSED_CREATED_AT   (1 << 3)
     55 #define NDB_PARSED_KIND         (1 << 4)
     56 #define NDB_PARSED_CONTENT      (1 << 5)
     57 #define NDB_PARSED_TAGS         (1 << 6)
     58 #define NDB_PARSED_ALL          (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS)
     59 
     60 typedef int (*ndb_migrate_fn)(struct ndb_txn *);
     61 typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len,
     62 				  int word_index);
     63 
     64 // these must be byte-aligned, they are directly accessing the serialized data
     65 // representation
     66 #pragma pack(push, 1)
     67 
     68 union ndb_packed_str {
     69 	struct {
     70 		char str[3];
     71 		// we assume little endian everywhere. sorry not sorry.
     72 		unsigned char flag; // NDB_PACKED_STR, etc
     73 	} packed;
     74 
     75 	uint32_t offset;
     76 	unsigned char bytes[4];
     77 };
     78 
     79 struct ndb_tag {
     80 	uint16_t count;
     81 	union ndb_packed_str strs[0];
     82 };
     83 
     84 struct ndb_tags {
     85 	uint16_t padding;
     86 	uint16_t count;
     87 };
     88 
     89 // v1
     90 struct ndb_note {
     91 	unsigned char version;    // v=1
     92 	unsigned char padding[3]; // keep things aligned
     93 	unsigned char id[32];
     94 	unsigned char pubkey[32];
     95 	unsigned char sig[64];
     96 
     97 	uint64_t created_at;
     98 	uint32_t kind;
     99 	uint32_t content_length;
    100 	union ndb_packed_str content;
    101 	uint32_t strings;
    102 	// nothing can come after tags since it contains variadic data
    103 	struct ndb_tags tags;
    104 };
    105 
    106 #pragma pack(pop)
    107 
    108 
    109 struct ndb_migration {
    110 	ndb_migrate_fn fn;
    111 };
    112 
    113 struct ndb_profile_record_builder {
    114 	flatcc_builder_t *builder;
    115 	void *flatbuf;
    116 };
    117 
    118 // controls whether to continue or stop the json parser
    119 enum ndb_idres {
    120 	NDB_IDRES_CONT,
    121 	NDB_IDRES_STOP,
    122 };
    123 
    124 // closure data for the id-detecting ingest controller
    125 struct ndb_ingest_controller
    126 {
    127 	MDB_txn *read_txn;
    128 	struct ndb_lmdb *lmdb;
    129 };
    130 
    131 enum ndb_writer_msgtype {
    132 	NDB_WRITER_QUIT, // kill thread immediately
    133 	NDB_WRITER_NOTE, // write a note to the db
    134 	NDB_WRITER_PROFILE, // write a profile to the db
    135 	NDB_WRITER_DBMETA, // write ndb metadata
    136 	NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched
    137 	NDB_WRITER_BLOCKS, // write parsed note blocks
    138 	NDB_WRITER_MIGRATE, // migrate the database
    139 };
    140 
    141 // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META)
    142 enum ndb_meta_key {
    143 	NDB_META_KEY_VERSION = 1
    144 };
    145 
    146 struct ndb_json_parser {
    147 	const char *json;
    148 	int json_len;
    149 	struct ndb_builder builder;
    150 	jsmn_parser json_parser;
    151 	jsmntok_t *toks, *toks_end;
    152 	int i;
    153 	int num_tokens;
    154 };
    155 
    156 // useful to pass to threads on its own
    157 struct ndb_lmdb {
    158 	MDB_env *env;
    159 	MDB_dbi dbs[NDB_DBS];
    160 };
    161 
    162 struct ndb_writer {
    163 	struct ndb_lmdb *lmdb;
    164 	struct ndb_monitor *monitor;
    165 
    166 	uint32_t ndb_flags;
    167 	void *queue_buf;
    168 	int queue_buflen;
    169 	pthread_t thread_id;
    170 
    171 	struct prot_queue inbox;
    172 };
    173 
    174 struct ndb_ingester {
    175 	struct ndb_lmdb *lmdb;
    176 	uint32_t flags;
    177 	struct threadpool tp;
    178 	struct prot_queue *writer_inbox;
    179 	void *filter_context;
    180 	ndb_ingest_filter_fn filter;
    181 };
    182 
    183 struct ndb_filter_group {
    184 	struct ndb_filter filters[MAX_FILTERS];
    185 	int num_filters;
    186 };
    187 
    188 struct ndb_subscription {
    189 	uint64_t subid;
    190 	struct ndb_filter_group group;
    191 	struct prot_queue inbox;
    192 };
    193 
    194 struct ndb_monitor {
    195 	struct ndb_subscription subscriptions[MAX_SUBSCRIPTIONS];
    196 	ndb_sub_fn sub_cb;
    197 	void *sub_cb_ctx;
    198 	int num_subscriptions;
    199 
    200 	// monitor isn't a full inbox. We want pollers to be able to poll
    201 	// subscriptions efficiently without going through a message queue, so
    202 	// we use a simple mutex here.
    203 	pthread_mutex_t mutex;
    204 };
    205 
    206 struct ndb {
    207 	struct ndb_lmdb lmdb;
    208 	struct ndb_ingester ingester;
    209 	struct ndb_monitor monitor;
    210 	struct ndb_writer writer;
    211 	int version;
    212 	uint32_t flags; // setting flags
    213 	// lmdb environ handles, etc
    214 };
    215 
    216 ///
    217 /// Query Plans
    218 ///
    219 /// There are general strategies for performing certain types of query
    220 /// depending on the filter. For example, for large contact list queries
    221 /// with many authors, we simply do a descending scan on created_at
    222 /// instead of doing 1000s of pubkey scans.
    223 ///
    224 /// Query plans are calculated from filters via `ndb_filter_plan`
    225 ///
    226 enum ndb_query_plan {
    227 	NDB_PLAN_KINDS,
    228 	NDB_PLAN_IDS,
    229 	NDB_PLAN_AUTHORS,
    230 	NDB_PLAN_AUTHOR_KINDS,
    231 	NDB_PLAN_CREATED,
    232 	NDB_PLAN_TAGS,
    233 };
    234 
    235 // A id + u64 + timestamp
    236 struct ndb_id_u64_ts {
    237 	unsigned char id[32]; // pubkey, id, etc
    238 	uint64_t u64; // kind, etc
    239 	uint64_t timestamp;
    240 };
    241 
    242 // A clustered key with an id and a timestamp
    243 struct ndb_tsid {
    244 	unsigned char id[32];
    245 	uint64_t timestamp;
    246 };
    247 
    248 // A u64 + timestamp id. Just using this for kinds at the moment.
    249 struct ndb_u64_ts {
    250 	uint64_t u64; // kind, etc
    251 	uint64_t timestamp;
    252 };
    253 
    254 struct ndb_word
    255 {
    256 	const char *word;
    257 	int word_len;
    258 };
    259 
    260 struct ndb_search_words
    261 {
    262 	struct ndb_word words[MAX_TEXT_SEARCH_WORDS];
    263 	int num_words;
    264 };
    265 
    266 // ndb_text_search_key
    267 //
    268 // This is compressed when in lmdb:
    269 //
    270 //   note_id:    varint
    271 //   strlen:     varint
    272 //   str:        cstr
    273 //   timestamp:  varint
    274 //   word_index: varint
    275 //
    276 static int ndb_make_text_search_key(unsigned char *buf, int bufsize,
    277 				    int word_index, int word_len, const char *str,
    278 				    uint64_t timestamp, uint64_t note_id,
    279 				    int *keysize)
    280 {
    281 	struct cursor cur;
    282 	make_cursor(buf, buf + bufsize, &cur);
    283 
    284 	// TODO: need update this to uint64_t
    285 	// we push this first because our query function can pull this off
    286 	// quickly to check matches
    287 	if (!cursor_push_varint(&cur, (int32_t)note_id))
    288 		return 0;
    289 
    290 	// string length
    291 	if (!cursor_push_varint(&cur, word_len))
    292 		return 0;
    293 
    294 	// non-null terminated, lowercase string
    295 	if (!cursor_push_lowercase(&cur, str, word_len))
    296 		return 0;
    297 
    298 	// TODO: need update this to uint64_t
    299 	if (!cursor_push_varint(&cur, (int)timestamp))
    300 		return 0;
    301 
    302 	// the index of the word in the content so that we can do more accurate
    303 	// phrase searches
    304 	if (!cursor_push_varint(&cur, word_index))
    305 		return 0;
    306 
    307 	// pad to 8-byte alignment
    308 	if (!cursor_align(&cur, 8))
    309 		return 0;
    310 
    311 	*keysize = cur.p - cur.start;
    312 	assert((*keysize % 8) == 0);
    313 
    314 	return 1;
    315 }
    316 
    317 static int ndb_make_noted_text_search_key(unsigned char *buf, int bufsize,
    318 					  int wordlen, const char *word,
    319 					  uint64_t timestamp, uint64_t note_id,
    320 					  int *keysize)
    321 {
    322 	return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word,
    323 					timestamp, note_id, keysize);
    324 }
    325 
    326 static int ndb_make_text_search_key_low(unsigned char *buf, int bufsize,
    327 					int wordlen, const char *word,
    328 					int *keysize)
    329 {
    330 	uint64_t timestamp, note_id;
    331 	timestamp = 0;
    332 	note_id = 0;
    333 	return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word,
    334 					timestamp, note_id, keysize);
    335 }
    336 
    337 static int ndb_make_text_search_key_high(unsigned char *buf, int bufsize,
    338 					 int wordlen, const char *word,
    339 					 int *keysize)
    340 {
    341 	uint64_t timestamp, note_id;
    342 	timestamp = INT32_MAX;
    343 	note_id = INT32_MAX;
    344 	return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word,
    345 					timestamp, note_id, keysize);
    346 }
    347 
    348 typedef int (*ndb_text_search_key_order_fn)(unsigned char *buf, int bufsize, int wordlen, const char *word, int *keysize);
    349 
    350 /** From LMDB: Compare two items lexically */
    351 static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) {
    352 	int diff;
    353 	ssize_t len_diff;
    354 	unsigned int len;
    355 
    356 	len = a->mv_size;
    357 	len_diff = (ssize_t) a->mv_size - (ssize_t) b->mv_size;
    358 	if (len_diff > 0) {
    359 		len = b->mv_size;
    360 		len_diff = 1;
    361 	}
    362 
    363 	diff = memcmp(a->mv_data, b->mv_data, len);
    364 	return diff ? diff : len_diff<0 ? -1 : len_diff;
    365 }
    366 
    367 static int ndb_tag_key_compare(const MDB_val *a, const MDB_val *b)
    368 {
    369 	MDB_val va, vb;
    370 	uint64_t ts_a, ts_b;
    371 	int cmp;
    372 
    373 	va.mv_data = a->mv_data;
    374 	va.mv_size = a->mv_size - 8;
    375 
    376 	vb.mv_data = b->mv_data;
    377 	vb.mv_size = b->mv_size - 8;
    378 
    379 	if ((cmp = mdb_cmp_memn(&va, &vb)))
    380 		return cmp;
    381 
    382 	ts_a = *(uint64_t*)((unsigned char *)va.mv_data + va.mv_size);
    383 	ts_b = *(uint64_t*)((unsigned char *)vb.mv_data + vb.mv_size);
    384 
    385 	if (ts_a < ts_b)
    386 		return -1;
    387 	else if (ts_a > ts_b)
    388 		return 1;
    389 
    390 	return 0;
    391 }
    392 
    393 static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b)
    394 {
    395 	struct cursor ca, cb;
    396 	uint64_t sa, sb, nid_a, nid_b;
    397 	MDB_val a2, b2;
    398 
    399 	make_cursor(a->mv_data, (unsigned char *)a->mv_data + a->mv_size, &ca);
    400 	make_cursor(b->mv_data, (unsigned char *)b->mv_data + b->mv_size, &cb);
    401 
    402 	// note_id
    403 	if (unlikely(!cursor_pull_varint(&ca, &nid_a) || !cursor_pull_varint(&cb, &nid_b)))
    404 		return 0;
    405 
    406 	// string size
    407 	if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb)))
    408 		return 0;
    409 
    410 	a2.mv_data = ca.p;
    411 	a2.mv_size = sa;
    412 
    413 	b2.mv_data = cb.p;
    414 	b2.mv_size = sb;
    415 
    416 	int cmp = mdb_cmp_memn(&a2, &b2);
    417 	if (cmp) return cmp;
    418 
    419 	// skip over string
    420 	ca.p += sa;
    421 	cb.p += sb;
    422 
    423 	// timestamp
    424 	if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb)))
    425 		return 0;
    426 
    427 	if      (sa < sb) return -1;
    428 	else if (sa > sb) return 1;
    429 
    430 	// note_id
    431 	if      (nid_a < nid_b) return -1;
    432 	else if (nid_a > nid_b) return 1;
    433 
    434 	// word index
    435 	if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb)))
    436 		return 0;
    437 
    438 	if      (sa < sb) return -1;
    439 	else if (sa > sb) return 1;
    440 
    441 	return 0;
    442 }
    443 
    444 static inline int ndb_unpack_text_search_key_noteid(
    445 		struct cursor *cur, uint64_t *note_id)
    446 {
    447 	if (!cursor_pull_varint(cur, note_id))
    448 		return 0;
    449 
    450 	return 1;
    451 }
    452 
    453 // faster peek of just the string instead of unpacking everything
    454 // this is used to quickly discard range query matches if there is no
    455 // common prefix
    456 static inline int ndb_unpack_text_search_key_string(struct cursor *cur,
    457 						    const char **str,
    458 						    int *str_len)
    459 {
    460 	uint64_t len;
    461 
    462 	if (!cursor_pull_varint(cur, &len))
    463 		return 0;
    464 
    465 	*str_len = len;
    466 
    467 	*str = (const char *)cur->p;
    468 
    469 	if (!cursor_skip(cur, *str_len))
    470 		return 0;
    471 
    472 	return 1;
    473 }
    474 
    475 // should be called after ndb_unpack_text_search_key_string. It continues
    476 // the unpacking of a text search key if we've already started it.
    477 static inline int
    478 ndb_unpack_remaining_text_search_key(struct cursor *cur,
    479 				     struct ndb_text_search_key *key)
    480 {
    481 	if (!cursor_pull_varint(cur, &key->timestamp))
    482 		return 0;
    483 
    484 	if (!cursor_pull_varint(cur, &key->word_index))
    485 		return 0;
    486 
    487 	return 1;
    488 }
    489 
    490 // unpack a fulltext search key
    491 //
    492 // full version of string + unpack remaining. This is split up because text
    493 // searching only requires to pull the string for prefix searching, and the
    494 // remaining is optional
    495 static inline int ndb_unpack_text_search_key(unsigned char *p, int len,
    496 				      struct ndb_text_search_key *key)
    497 {
    498 	struct cursor c;
    499 	make_cursor(p, p + len, &c);
    500 
    501 	if (!ndb_unpack_text_search_key_noteid(&c, &key->note_id))
    502 		return 0;
    503 
    504 	if (!ndb_unpack_text_search_key_string(&c, &key->str, &key->str_len))
    505 		return 0;
    506 
    507 	return ndb_unpack_remaining_text_search_key(&c, key);
    508 }
    509 
    510 // Copies only lowercase characters to the destination string and fills the rest with null bytes.
    511 // `dst` and `src` are pointers to the destination and source strings, respectively.
    512 // `n` is the maximum number of characters to copy.
    513 static void lowercase_strncpy(char *dst, const char *src, int n) {
    514 	int j = 0, i = 0;
    515 
    516 	if (!dst || !src || n == 0) {
    517 		return;
    518 	}
    519 
    520 	while (src[i] != '\0' && j < n) {
    521 		dst[j++] = tolower(src[i++]);
    522 	}
    523 
    524 	// Null-terminate and fill the destination string
    525 	while (j < n) {
    526 		dst[j++] = '\0';
    527 	}
    528 }
    529 
    530 static inline int ndb_filter_elem_is_ptr(struct ndb_filter_field *field) {
    531 	return field->elem_type == NDB_ELEMENT_STRING || field->elem_type == NDB_ELEMENT_ID;
    532 }
    533 
    534 // Copy the filter
    535 int ndb_filter_clone(struct ndb_filter *dst, struct ndb_filter *src)
    536 {
    537 	size_t src_size, elem_size, data_size;
    538 
    539 	memcpy(dst, src, sizeof(*src));
    540 
    541 	elem_size = src->elem_buf.end - src->elem_buf.start;
    542 	data_size = src->data_buf.end - src->data_buf.start;
    543 	src_size = data_size + elem_size;
    544 
    545 	// let's only allow finalized filters to be cloned
    546 	if (!src || !src->finalized)
    547 		return 0;
    548 
    549 	dst->elem_buf.start = malloc(src_size);
    550 	dst->elem_buf.end = dst->elem_buf.start + elem_size;
    551 	dst->elem_buf.p = dst->elem_buf.end;
    552 
    553 	dst->data_buf.start = dst->elem_buf.start + elem_size;
    554 	dst->data_buf.end = dst->data_buf.start + data_size;
    555 	dst->data_buf.p = dst->data_buf.end;
    556 
    557 	if (dst->elem_buf.start == NULL)
    558 		return 0;
    559 
    560 	memcpy(dst->elem_buf.start, src->elem_buf.start, src_size);
    561 
    562 	return 1;
    563 }
    564 
    565 // "Finalize" the filter. This resizes the allocated heap buffers so that they
    566 // are as small as possible. This also prevents new fields from being added
    567 int ndb_filter_end(struct ndb_filter *filter)
    568 {
    569 #ifdef DEBUG
    570 	size_t orig_size;
    571 #endif
    572 	size_t data_len, elem_len;
    573 	if (filter->finalized == 1)
    574 		return 0;
    575 
    576 	// move the data buffer to the end of the element buffer and update
    577 	// all of the element pointers accordingly
    578 	data_len = filter->data_buf.p - filter->data_buf.start;
    579 	elem_len = filter->elem_buf.p - filter->elem_buf.start;
    580 #ifdef DEBUG
    581 	orig_size = filter->data_buf.end - filter->elem_buf.start;
    582 #endif
    583 
    584 	// cap the elem buff
    585 	filter->elem_buf.end = filter->elem_buf.p;
    586 
    587 	// move the data buffer to the end of the element buffer
    588 	memmove(filter->elem_buf.p, filter->data_buf.start, data_len);
    589 
    590 	// realloc the whole thing
    591 	filter->elem_buf.start = realloc(filter->elem_buf.start, elem_len + data_len);
    592 	filter->elem_buf.end = filter->elem_buf.start + elem_len;
    593 	filter->elem_buf.p = filter->elem_buf.end;
    594 
    595 	filter->data_buf.start = filter->elem_buf.end;
    596 	filter->data_buf.end = filter->data_buf.start + data_len;
    597 	filter->data_buf.p = filter->data_buf.end;
    598 
    599 	filter->finalized = 1;
    600 
    601 	ndb_debug("ndb_filter_end: %ld -> %ld\n", orig_size, elem_len + data_len);
    602 
    603 	return 1;
    604 }
    605 
    606 static inline struct ndb_filter_elements *
    607 ndb_filter_get_elements_by_offset(const struct ndb_filter *filter, int offset)
    608 {
    609 	struct ndb_filter_elements *els;
    610 
    611 	if (offset < 0)
    612 		return NULL;
    613 
    614 	els = (struct ndb_filter_elements *)(filter->elem_buf.start + offset);
    615 
    616 	if ((unsigned char *)els > filter->elem_buf.p)
    617 		return NULL;
    618 
    619 	return els;
    620 }
    621 
    622 struct ndb_filter_elements *
    623 ndb_filter_current_element(const struct ndb_filter *filter)
    624 {
    625 	return ndb_filter_get_elements_by_offset(filter, filter->current);
    626 }
    627 
    628 struct ndb_filter_elements *
    629 ndb_filter_get_elements(const struct ndb_filter *filter, int index)
    630 {
    631 	if (filter->num_elements <= 0)
    632 		return NULL;
    633 
    634 	if (index > filter->num_elements-1)
    635 		return NULL;
    636 
    637 	return ndb_filter_get_elements_by_offset(filter, filter->elements[index]);
    638 }
    639 
    640 static inline unsigned char *
    641 ndb_filter_elements_data(const struct ndb_filter *filter, int offset)
    642 {
    643 	unsigned char *data;
    644 
    645 	if (offset < 0)
    646 		return NULL;
    647 
    648 	data = filter->data_buf.start + offset;
    649 	if (data > filter->data_buf.p)
    650 		return NULL;
    651 
    652 	return data;
    653 }
    654 
    655 unsigned char *
    656 ndb_filter_get_id_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index)
    657 {
    658 	return ndb_filter_elements_data(filter, els->elements[index]);
    659 }
    660 
    661 const char *
    662 ndb_filter_get_string_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index)
    663 {
    664 	return (const char *)ndb_filter_elements_data(filter, els->elements[index]);
    665 }
    666 
    667 uint64_t *
    668 ndb_filter_get_int_element_ptr(struct ndb_filter_elements *els, int index)
    669 {
    670 	return &els->elements[index];
    671 }
    672 
    673 uint64_t
    674 ndb_filter_get_int_element(const struct ndb_filter_elements *els, int index)
    675 {
    676 	return els->elements[index];
    677 }
    678 
    679 int ndb_filter_init(struct ndb_filter *filter)
    680 {
    681 	struct cursor cur;
    682 	int page_size, elem_pages, data_pages, buf_size;
    683 
    684 	page_size = 4096; // assuming this, not a big deal if we're wrong
    685 	elem_pages = NDB_FILTER_PAGES / 4;
    686 	data_pages = NDB_FILTER_PAGES - elem_pages;
    687 	buf_size = page_size * NDB_FILTER_PAGES;
    688 
    689 	unsigned char *buf = malloc(buf_size);
    690 	if (!buf)
    691 		return 0;
    692 
    693 	// init memory arena for the cursor
    694 	make_cursor(buf, buf + buf_size, &cur);
    695 
    696 	cursor_slice(&cur, &filter->elem_buf, page_size * elem_pages);
    697 	cursor_slice(&cur, &filter->data_buf, page_size * data_pages);
    698 
    699 	// make sure we are fully allocated
    700 	assert(cur.p == cur.end);
    701 
    702 	// make sure elem_buf is the start of the buffer
    703 	assert(filter->elem_buf.start == cur.start);
    704 
    705 	filter->num_elements = 0;
    706 	filter->elements[0] = 0;
    707 	filter->current = -1;
    708 	filter->finalized = 0;
    709 
    710 	return 1;
    711 }
    712 
    713 void ndb_filter_destroy(struct ndb_filter *filter)
    714 {
    715 	if (filter->elem_buf.start)
    716 		free(filter->elem_buf.start);
    717 
    718 	memset(filter, 0, sizeof(*filter));
    719 }
    720 
    721 static const char *ndb_filter_field_name(enum ndb_filter_fieldtype field)
    722 {
    723 	switch (field) {
    724 	case NDB_FILTER_IDS: return "ids";
    725 	case NDB_FILTER_AUTHORS: return "authors";
    726 	case NDB_FILTER_KINDS: return "kinds";
    727 	case NDB_FILTER_TAGS: return "tags";
    728 	case NDB_FILTER_SINCE: return "since";
    729 	case NDB_FILTER_UNTIL: return "until";
    730 	case NDB_FILTER_LIMIT: return "limit";
    731 	}
    732 
    733 	return "unknown";
    734 }
    735 
    736 static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filter_fieldtype field, char tag)
    737 {
    738 	int i;
    739 	struct ndb_filter_elements *els, *el;
    740 
    741 	if (ndb_filter_current_element(filter)) {
    742 		fprintf(stderr, "ndb_filter_start_field: filter field already in progress, did you forget to call ndb_filter_end_field?\n");
    743 		return 0;
    744 	}
    745 
    746 	// you can only start and end fields once
    747 	for (i = 0; i < filter->num_elements; i++) {
    748 		el = ndb_filter_get_elements(filter, i);
    749 		assert(el);
    750 		// TODO: fix this tags check to try to find matching tags
    751 		if (el->field.type == field && field != NDB_FILTER_TAGS) {
    752 			fprintf(stderr, "ndb_filter_start_field: field '%s' already exists\n",
    753 					ndb_filter_field_name(field));
    754 			return 0;
    755 		}
    756 	}
    757 
    758 	filter->current = filter->elem_buf.p - filter->elem_buf.start;
    759 	els = ndb_filter_current_element(filter);
    760 	assert(els);
    761 
    762 	// advance elem buffer to the variable data section
    763 	if (!cursor_skip(&filter->elem_buf, sizeof(struct ndb_filter_elements))) {
    764 		fprintf(stderr, "ndb_filter_start_field: '%s' oom (todo: realloc?)\n",
    765 				ndb_filter_field_name(field));
    766 		return 0;
    767 	}
    768 
    769 	els->field.type = field;
    770 	els->field.tag = tag;
    771 	els->field.elem_type = 0;
    772 	els->count = 0;
    773 
    774 	return 1;
    775 }
    776 
    777 int ndb_filter_start_field(struct ndb_filter *filter, enum ndb_filter_fieldtype field)
    778 {
    779 	return ndb_filter_start_field_impl(filter, field, 0);
    780 }
    781 
    782 int ndb_filter_start_tag_field(struct ndb_filter *filter, char tag)
    783 {
    784 	return ndb_filter_start_field_impl(filter, NDB_FILTER_TAGS, tag);
    785 }
    786 
    787 static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_element el)
    788 {
    789 	struct ndb_filter_elements *current;
    790 	uint64_t offset;
    791 
    792 	if (!(current = ndb_filter_current_element(filter)))
    793 		return 0;
    794 
    795 	offset = filter->data_buf.p - filter->data_buf.start;
    796 
    797 	switch (current->field.type) {
    798 	case NDB_FILTER_IDS:
    799 	case NDB_FILTER_AUTHORS:
    800 		if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32))
    801 			return 0;
    802 		break;
    803 	case NDB_FILTER_KINDS:
    804 		offset = el.integer;
    805 		break;
    806 	case NDB_FILTER_SINCE:
    807 	case NDB_FILTER_UNTIL:
    808 	case NDB_FILTER_LIMIT:
    809 		// only one allowed for since/until
    810 		if (current->count != 0)
    811 			return 0;
    812 		offset = el.integer;
    813 		break;
    814 	case NDB_FILTER_TAGS:
    815 		switch (current->field.elem_type) {
    816 		case NDB_ELEMENT_ID:
    817 			if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32))
    818 				return 0;
    819 			break;
    820 		case NDB_ELEMENT_STRING:
    821 			if (!cursor_push(&filter->data_buf, (unsigned char *)el.string.string, el.string.len))
    822 				return 0;
    823 			if (!cursor_push_byte(&filter->data_buf, 0))
    824 				return 0;
    825 			break;
    826 		case NDB_ELEMENT_INT:
    827 			// ints are not allowed in tag filters
    828 		case NDB_ELEMENT_UNKNOWN:
    829 			return 0;
    830 		}
    831 		// push a pointer of the string in the databuf as an element
    832 		break;
    833 	}
    834 
    835 	if (!cursor_push(&filter->elem_buf, (unsigned char *)&offset,
    836 			 sizeof(offset))) {
    837 		return 0;
    838 	}
    839 
    840 	current->count++;
    841 
    842 	return 1;
    843 }
    844 
    845 static int ndb_filter_set_elem_type(struct ndb_filter *filter,
    846 				    enum ndb_generic_element_type elem_type)
    847 {
    848 	enum ndb_generic_element_type current_elem_type;
    849 	struct ndb_filter_elements *current;
    850 
    851 	if (!(current = ndb_filter_current_element(filter)))
    852 		return 0;
    853 
    854 	current_elem_type = current->field.elem_type;
    855 
    856 	// element types must be uniform
    857 	if (current_elem_type != elem_type && current_elem_type != NDB_ELEMENT_UNKNOWN) {
    858 		fprintf(stderr, "ndb_filter_set_elem_type: element types must be uniform\n");
    859 		return 0;
    860 	}
    861 
    862 	current->field.elem_type = elem_type;
    863 
    864 	return 1;
    865 }
    866 
    867 
    868 int ndb_filter_add_str_element_len(struct ndb_filter *filter, const char *str, int len)
    869 {
    870 	union ndb_filter_element el;
    871 	struct ndb_filter_elements *current;
    872 
    873 	if (!(current = ndb_filter_current_element(filter)))
    874 		return 0;
    875 
    876 	// only generic queries are allowed to have strings
    877 	switch (current->field.type) {
    878 	case NDB_FILTER_SINCE:
    879 	case NDB_FILTER_UNTIL:
    880 	case NDB_FILTER_LIMIT:
    881 	case NDB_FILTER_IDS:
    882 	case NDB_FILTER_AUTHORS:
    883 	case NDB_FILTER_KINDS:
    884 		return 0;
    885 	case NDB_FILTER_TAGS:
    886 		break;
    887 	}
    888 
    889 	if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_STRING))
    890 		return 0;
    891 
    892 	el.string.string = str;
    893 	el.string.len = len;
    894 
    895 	return ndb_filter_add_element(filter, el);
    896 }
    897 
    898 int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str)
    899 {
    900 	return ndb_filter_add_str_element_len(filter, str, strlen(str));
    901 }
    902 
    903 int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer)
    904 {
    905 	union ndb_filter_element el;
    906 	struct ndb_filter_elements *current;
    907 	if (!(current = ndb_filter_current_element(filter)))
    908 		return 0;
    909 
    910 	switch (current->field.type) {
    911 	case NDB_FILTER_IDS:
    912 	case NDB_FILTER_AUTHORS:
    913 	case NDB_FILTER_TAGS:
    914 		return 0;
    915 	case NDB_FILTER_KINDS:
    916 	case NDB_FILTER_SINCE:
    917 	case NDB_FILTER_UNTIL:
    918 	case NDB_FILTER_LIMIT:
    919 		break;
    920 	}
    921 
    922 	if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_INT))
    923 		return 0;
    924 
    925 	el.integer = integer;
    926 
    927 	return ndb_filter_add_element(filter, el);
    928 }
    929 
    930 int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id)
    931 {
    932 	union ndb_filter_element el;
    933 	struct ndb_filter_elements *current;
    934 
    935 	if (!(current = ndb_filter_current_element(filter)))
    936 		return 0;
    937 
    938 	// only certain filter types allow pushing id elements
    939 	switch (current->field.type) {
    940 	case NDB_FILTER_SINCE:
    941 	case NDB_FILTER_UNTIL:
    942 	case NDB_FILTER_LIMIT:
    943 	case NDB_FILTER_KINDS:
    944 		return 0;
    945 	case NDB_FILTER_IDS:
    946 	case NDB_FILTER_AUTHORS:
    947 	case NDB_FILTER_TAGS:
    948 		break;
    949 	}
    950 
    951 	if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_ID))
    952 		return 0;
    953 
    954 	// this is needed so that generic filters know its an id
    955 	el.id = id;
    956 
    957 	return ndb_filter_add_element(filter, el);
    958 }
    959 
    960 static int ndb_tag_filter_matches(struct ndb_filter *filter,
    961 				  struct ndb_filter_elements *els,
    962 				  struct ndb_note *note)
    963 {
    964 	int i;
    965 	const unsigned char *id;
    966 	const char *el_str;
    967 	struct ndb_iterator iter, *it = &iter;
    968 	struct ndb_str str;
    969 
    970 	ndb_tags_iterate_start(note, it);
    971 
    972 	while (ndb_tags_iterate_next(it)) {
    973 		// we're looking for tags with 2 or more entries: ["p", id], etc
    974 		if (it->tag->count < 2)
    975 			continue;
    976 
    977 		str = ndb_tag_str(note, it->tag, 0);
    978 
    979 		// we only care about packed strings (single char, etc)
    980 		if (str.flag != NDB_PACKED_STR)
    981 			continue;
    982 
    983 		// do we have #e matching e (or p, etc)
    984 		if (str.str[0] != els->field.tag || str.str[1] != 0)
    985 			continue;
    986 
    987 		str = ndb_tag_str(note, it->tag, 1);
    988 
    989 		switch (els->field.elem_type) {
    990 		case NDB_ELEMENT_ID:
    991 			// if our filter element type is an id, then we
    992 			// expect a packed id in the tag, otherwise skip
    993 			if (str.flag != NDB_PACKED_ID)
    994 				continue;
    995 			break;
    996 		case NDB_ELEMENT_STRING:
    997 			// if our filter element type is a string, then
    998 			// we should not expect an id
    999 			if (str.flag == NDB_PACKED_ID)
   1000 				continue;
   1001 
   1002 			break;
   1003 		case NDB_ELEMENT_UNKNOWN:
   1004 		default:
   1005 			// For some reason the element type is not set. It's
   1006 			// possible nothing was added to the generic filter?
   1007 			// Let's just fail here and log a note for debugging
   1008 			fprintf(stderr, "UNUSUAL ndb_tag_filter_matches: have unknown element type %d\n", els->field.elem_type);
   1009 			return 0;
   1010 		}
   1011 
   1012 		for (i = 0; i < els->count; i++) {
   1013 			switch (els->field.elem_type) {
   1014 			case NDB_ELEMENT_ID:
   1015 				id = ndb_filter_get_id_element(filter, els, i);
   1016 				if (!memcmp(id, str.id, 32))
   1017 					return 1;
   1018 				break;
   1019 			case NDB_ELEMENT_STRING:
   1020 				el_str = ndb_filter_get_string_element(filter, els, i);
   1021 				if (!strcmp(el_str, str.str))
   1022 					return 1;
   1023 				break;
   1024 			case NDB_ELEMENT_INT:
   1025 				// int elements int tag queries are not supported
   1026 			case NDB_ELEMENT_UNKNOWN:
   1027 				return 0;
   1028 			}
   1029 		}
   1030 	}
   1031 
   1032 	return 0;
   1033 }
   1034 
   1035 struct search_id_state {
   1036 	struct ndb_filter *filter;
   1037 	struct ndb_filter_elements *els;
   1038 	unsigned char *key;
   1039 };
   1040 
   1041 static int compare_ids(const void *pa, const void *pb)
   1042 {
   1043         unsigned char *a = *(unsigned char **)pa;
   1044         unsigned char *b = *(unsigned char **)pb;
   1045 
   1046         return memcmp(a, b, 32);
   1047 }
   1048 
   1049 static int search_ids(const void *ctx, const void *mid_ptr)
   1050 {
   1051 	struct search_id_state *state;
   1052 	unsigned char *mid_id;
   1053 	uint32_t mid;
   1054 
   1055 	state = (struct search_id_state *)ctx;
   1056 	mid = *(uint32_t *)mid_ptr;
   1057 
   1058 	mid_id = ndb_filter_elements_data(state->filter, mid);
   1059 	assert(mid_id);
   1060 
   1061 	return memcmp(state->key, mid_id, 32);
   1062 }
   1063 
   1064 static int compare_kinds(const void *pa, const void *pb)
   1065 {
   1066 
   1067 	// NOTE: this should match type in `union ndb_filter_element`
   1068 	uint64_t a = *(uint64_t *)pa;
   1069 	uint64_t b = *(uint64_t *)pb;
   1070 
   1071 	if (a < b) {
   1072 		return -1;
   1073 	} else if (a > b) {
   1074 		return 1;
   1075 	} else {
   1076 		return 0;
   1077 	}
   1078 }
   1079 
   1080 //
   1081 // returns 1 if a filter matches a note
   1082 static int ndb_filter_matches_with(struct ndb_filter *filter,
   1083 				   struct ndb_note *note, int already_matched)
   1084 {
   1085 	int i, j;
   1086 	struct ndb_filter_elements *els;
   1087 	struct search_id_state state;
   1088 
   1089 	state.filter = filter;
   1090 
   1091 	for (i = 0; i < filter->num_elements; i++) {
   1092 		els = ndb_filter_get_elements(filter, i);
   1093 		state.els = els;
   1094 		assert(els);
   1095 
   1096 		// if we know we already match from a query scan result,
   1097 		// we can skip this check
   1098 		if ((1 << els->field.type) & already_matched)
   1099 			continue;
   1100 
   1101 		switch (els->field.type) {
   1102 		case NDB_FILTER_KINDS:
   1103 			for (j = 0; j < els->count; j++) {
   1104 				if ((unsigned int)els->elements[j] == note->kind)
   1105 					goto cont;
   1106 			}
   1107 			break;
   1108 		case NDB_FILTER_IDS:
   1109 			state.key = ndb_note_id(note);
   1110 			if (bsearch(&state, &els->elements[0], els->count,
   1111 				    sizeof(els->elements[0]), search_ids)) {
   1112 				continue;
   1113 			}
   1114 			break;
   1115 		case NDB_FILTER_AUTHORS:
   1116 			state.key = ndb_note_pubkey(note);
   1117 			if (bsearch(&state, &els->elements[0], els->count,
   1118 				    sizeof(els->elements[0]), search_ids)) {
   1119 				continue;
   1120 			}
   1121 			break;
   1122 		case NDB_FILTER_TAGS:
   1123 			if (ndb_tag_filter_matches(filter, els, note))
   1124 				continue;
   1125 			break;
   1126 		case NDB_FILTER_SINCE:
   1127 			assert(els->count == 1);
   1128 			if (note->created_at >= els->elements[0])
   1129 				continue;
   1130 			break;
   1131 		case NDB_FILTER_UNTIL:
   1132 			assert(els->count == 1);
   1133 			if (note->created_at < els->elements[0])
   1134 				continue;
   1135 		case NDB_FILTER_LIMIT:
   1136 cont:
   1137 			continue;
   1138 		}
   1139 
   1140 		// all need to match
   1141 		return 0;
   1142 	}
   1143 
   1144 	return 1;
   1145 }
   1146 
   1147 int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note)
   1148 {
   1149 	return ndb_filter_matches_with(filter, note, 0);
   1150 }
   1151 
   1152 // because elements are stored as offsets and qsort doesn't support context,
   1153 // we do a dumb thing where we convert elements to pointers and back if we
   1154 // are doing an id or string sort
   1155 static void sort_filter_elements(struct ndb_filter *filter,
   1156 				 struct ndb_filter_elements *els,
   1157 				 int (*cmp)(const void *, const void *))
   1158 {
   1159 	int i;
   1160 
   1161 	assert(ndb_filter_elem_is_ptr(&els->field));
   1162 
   1163 	for (i = 0; i < els->count; i++)
   1164 		els->elements[i] += (uint64_t)filter->data_buf.start;
   1165 
   1166 	qsort(&els->elements[0], els->count, sizeof(els->elements[0]), cmp);
   1167 
   1168 	for (i = 0; i < els->count; i++)
   1169 		els->elements[i] -= (uint64_t)filter->data_buf.start;
   1170 }
   1171 
   1172 static int ndb_filter_field_eq(struct ndb_filter *a_filt,
   1173 			       struct ndb_filter_elements *a_field,
   1174 			       struct ndb_filter *b_filt,
   1175 			       struct ndb_filter_elements *b_field)
   1176 {
   1177 	int i;
   1178 	const char *a_str, *b_str;
   1179 	unsigned char *a_id, *b_id;
   1180 	uint64_t a_int, b_int;
   1181 
   1182 	if (a_field->count != b_field->count)
   1183 		return 0;
   1184 
   1185 	if (a_field->field.type != b_field->field.type) {
   1186 		ndb_debug("UNUSUAL: field types do not match in ndb_filter_field_eq\n");
   1187 		return 0;
   1188 	}
   1189 
   1190 	if (a_field->field.elem_type != b_field->field.elem_type) {
   1191 		ndb_debug("UNUSUAL: field element types do not match in ndb_filter_field_eq\n");
   1192 		return 0;
   1193 	}
   1194 
   1195 	if (a_field->field.elem_type == NDB_ELEMENT_UNKNOWN) {
   1196 		ndb_debug("UNUSUAL: field element types are unknown\n");
   1197 		return 0;
   1198 	}
   1199 
   1200 	for (i = 0; i < a_field->count; i++) {
   1201 		switch (a_field->field.elem_type) {
   1202 		case NDB_ELEMENT_UNKNOWN:
   1203 			return 0;
   1204 		case NDB_ELEMENT_STRING:
   1205 			a_str = ndb_filter_get_string_element(a_filt, a_field, i);
   1206 			b_str = ndb_filter_get_string_element(b_filt, b_field, i);
   1207 			if (strcmp(a_str, b_str))
   1208 				return 0;
   1209 			break;
   1210 		case NDB_ELEMENT_ID:
   1211 			a_id = ndb_filter_get_id_element(a_filt, a_field, i);
   1212 			b_id = ndb_filter_get_id_element(b_filt, b_field, i);
   1213 			if (memcmp(a_id, b_id, 32))
   1214 				return 0;
   1215 			break;
   1216 		case NDB_ELEMENT_INT:
   1217 			a_int = ndb_filter_get_int_element(a_field, i);
   1218 			b_int = ndb_filter_get_int_element(b_field, i);
   1219 			if (a_int != b_int)
   1220 				return 0;
   1221 			break;
   1222 		}
   1223 	}
   1224 
   1225 	return 1;
   1226 }
   1227 
   1228 void ndb_filter_end_field(struct ndb_filter *filter)
   1229 {
   1230 	int cur_offset;
   1231 	struct ndb_filter_elements *cur;
   1232 
   1233 	cur_offset = filter->current;
   1234 
   1235 	if (!(cur = ndb_filter_current_element(filter)))
   1236 		return;
   1237 
   1238 	filter->elements[filter->num_elements++] = cur_offset;
   1239 
   1240 	// sort elements for binary search
   1241 	switch (cur->field.type) {
   1242 	case NDB_FILTER_IDS:
   1243 	case NDB_FILTER_AUTHORS:
   1244 		sort_filter_elements(filter, cur, compare_ids);
   1245 		break;
   1246 	case NDB_FILTER_KINDS:
   1247 		qsort(&cur->elements[0], cur->count,
   1248 		      sizeof(cur->elements[0]), compare_kinds);
   1249 		break;
   1250 	case NDB_FILTER_TAGS:
   1251 		// TODO: generic tag search sorting
   1252 		break;
   1253 	case NDB_FILTER_SINCE:
   1254 	case NDB_FILTER_UNTIL:
   1255 	case NDB_FILTER_LIMIT:
   1256 		// don't need to sort these
   1257 		break;
   1258 	}
   1259 
   1260 	filter->current = -1;
   1261 
   1262 }
   1263 
   1264 static void ndb_filter_group_init(struct ndb_filter_group *group)
   1265 {
   1266 	group->num_filters = 0;
   1267 }
   1268 
   1269 static int ndb_filter_group_add(struct ndb_filter_group *group,
   1270 				struct ndb_filter *filter)
   1271 {
   1272 	if (group->num_filters + 1 > MAX_FILTERS)
   1273 		return 0;
   1274 
   1275 	return ndb_filter_clone(&group->filters[group->num_filters++], filter);
   1276 }
   1277 
   1278 static int ndb_filter_group_matches(struct ndb_filter_group *group,
   1279 				    struct ndb_note *note)
   1280 {
   1281 	int i;
   1282 	struct ndb_filter *filter;
   1283 
   1284 	if (group->num_filters == 0)
   1285 		return 1;
   1286 
   1287 	for (i = 0; i < group->num_filters; i++) {
   1288 		filter = &group->filters[i];
   1289 
   1290 		if (ndb_filter_matches(filter, note))
   1291 			return 1;
   1292 	}
   1293 
   1294 	return 0;
   1295 }
   1296 
   1297 static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id,
   1298 			        uint64_t timestamp, const char *search)
   1299 {
   1300 	memcpy(key->id, id, 32);
   1301 	key->timestamp = timestamp;
   1302 	lowercase_strncpy(key->search, search, sizeof(key->search) - 1);
   1303 	key->search[sizeof(key->search) - 1] = '\0';
   1304 }
   1305 
   1306 static int ndb_write_profile_search_index(struct ndb_txn *txn,
   1307 					  struct ndb_search_key *index_key,
   1308 					  uint64_t profile_key)
   1309 {
   1310 	int rc;
   1311 	MDB_val key, val;
   1312 
   1313 	key.mv_data = index_key;
   1314 	key.mv_size = sizeof(*index_key);
   1315 	val.mv_data = &profile_key;
   1316 	val.mv_size = sizeof(profile_key);
   1317 
   1318 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH],
   1319 			  &key, &val, 0)))
   1320 	{
   1321 		ndb_debug("ndb_write_profile_search_index failed: %s\n",
   1322 			  mdb_strerror(rc));
   1323 		return 0;
   1324 	}
   1325 
   1326 	return 1;
   1327 }
   1328 
   1329 
   1330 // map usernames and display names to profile keys for user searching
   1331 static int ndb_write_profile_search_indices(struct ndb_txn *txn,
   1332 					    struct ndb_note *note,
   1333 					    uint64_t profile_key,
   1334 					    void *profile_root)
   1335 {
   1336 	struct ndb_search_key index;
   1337 	NdbProfileRecord_table_t profile_record;
   1338 	NdbProfile_table_t profile;
   1339 
   1340 	profile_record = NdbProfileRecord_as_root(profile_root);
   1341 	profile = NdbProfileRecord_profile_get(profile_record);
   1342 
   1343 	const char *name = NdbProfile_name_get(profile);
   1344 	const char *display_name = NdbProfile_display_name_get(profile);
   1345 
   1346 	// words + pubkey + created
   1347 	if (name) {
   1348 		ndb_make_search_key(&index, note->pubkey, note->created_at,
   1349 				    name);
   1350 		if (!ndb_write_profile_search_index(txn, &index, profile_key))
   1351 			return 0;
   1352 	}
   1353 
   1354 	if (display_name) {
   1355 		// don't write the same name/display_name twice
   1356 		if (name && !strcmp(display_name, name)) {
   1357 			return 1;
   1358 		}
   1359 		ndb_make_search_key(&index, note->pubkey, note->created_at,
   1360 				    display_name);
   1361 		if (!ndb_write_profile_search_index(txn, &index, profile_key))
   1362 			return 0;
   1363 	}
   1364 
   1365 	return 1;
   1366 }
   1367 
   1368 static inline void ndb_tsid_init(struct ndb_tsid *key, unsigned char *id,
   1369 				 uint64_t timestamp)
   1370 {
   1371 	memcpy(key->id, id, 32);
   1372 	key->timestamp = timestamp;
   1373 }
   1374 
   1375 static inline void ndb_tsid_low(struct ndb_tsid *key, unsigned char *id)
   1376 {
   1377 	memcpy(key->id, id, 32);
   1378 	key->timestamp = 0;
   1379 }
   1380 
   1381 static inline void ndb_u64_ts_init(struct ndb_u64_ts *key, uint64_t integer,
   1382 				     uint64_t timestamp)
   1383 {
   1384 	key->u64 = integer;
   1385 	key->timestamp = timestamp;
   1386 }
   1387 
   1388 // useful for range-searching for the latest key with a clustered created_at timen
   1389 static inline void ndb_tsid_high(struct ndb_tsid *key, const unsigned char *id)
   1390 {
   1391 	memcpy(key->id, id, 32);
   1392 	key->timestamp = UINT64_MAX;
   1393 }
   1394 
   1395 static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags)
   1396 {
   1397 	txn->lmdb = &ndb->lmdb;
   1398 	MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn;
   1399 	if (!txn->lmdb->env)
   1400 		return 0;
   1401 	return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0;
   1402 }
   1403 
   1404 int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn)
   1405 {
   1406 	return _ndb_begin_query(ndb, txn, MDB_RDONLY);
   1407 }
   1408 
   1409 // this should only be used in migrations, etc
   1410 static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn)
   1411 {
   1412 	return _ndb_begin_query(ndb, txn, 0);
   1413 }
   1414 
   1415 static int ndb_db_is_index(enum ndb_dbs index)
   1416 {
   1417 	switch (index) {
   1418 		case NDB_DB_NOTE:
   1419 		case NDB_DB_META:
   1420 		case NDB_DB_PROFILE:
   1421 		case NDB_DB_NOTE_ID:
   1422 		case NDB_DB_NDB_META:
   1423 		case NDB_DB_PROFILE_SEARCH:
   1424 		case NDB_DB_PROFILE_LAST_FETCH:
   1425 		case NDB_DBS:
   1426 			return 0;
   1427 		case NDB_DB_PROFILE_PK:
   1428 		case NDB_DB_NOTE_KIND:
   1429 		case NDB_DB_NOTE_TEXT:
   1430 		case NDB_DB_NOTE_BLOCKS:
   1431 		case NDB_DB_NOTE_TAGS:
   1432 		case NDB_DB_NOTE_PUBKEY:
   1433 		case NDB_DB_NOTE_PUBKEY_KIND:
   1434 			return 1;
   1435 	}
   1436 
   1437 	return 0;
   1438 }
   1439 
   1440 static inline void ndb_id_u64_ts_init(struct ndb_id_u64_ts *key,
   1441 				      unsigned char *id, uint64_t iu64,
   1442 				      uint64_t timestamp)
   1443 {
   1444 	memcpy(key->id, id, 32);
   1445 	key->u64 = iu64;
   1446 	key->timestamp = timestamp;
   1447 }
   1448 
   1449 static int ndb_write_note_pubkey_index(struct ndb_txn *txn, struct ndb_note *note,
   1450 				       uint64_t note_key)
   1451 {
   1452 	int rc;
   1453 	struct ndb_tsid key;
   1454 	MDB_val k, v;
   1455 
   1456 	ndb_tsid_init(&key, ndb_note_pubkey(note), ndb_note_created_at(note));
   1457 
   1458 	k.mv_data = &key;
   1459 	k.mv_size = sizeof(key);
   1460 
   1461 	v.mv_data = &note_key;
   1462 	v.mv_size = sizeof(note_key);
   1463 
   1464 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY], &k, &v, 0))) {
   1465 		fprintf(stderr, "write note pubkey index failed: %s\n",
   1466 			  mdb_strerror(rc));
   1467 		return 0;
   1468 	}
   1469 
   1470 	return 1;
   1471 }
   1472 
   1473 static int ndb_write_note_pubkey_kind_index(struct ndb_txn *txn,
   1474 					    struct ndb_note *note,
   1475 					    uint64_t note_key)
   1476 {
   1477 	int rc;
   1478 	struct ndb_id_u64_ts key;
   1479 	MDB_val k, v;
   1480 
   1481 	ndb_id_u64_ts_init(&key, ndb_note_pubkey(note), ndb_note_kind(note),
   1482 			   ndb_note_created_at(note));
   1483 
   1484 	k.mv_data = &key;
   1485 	k.mv_size = sizeof(key);
   1486 
   1487 	v.mv_data = &note_key;
   1488 	v.mv_size = sizeof(note_key);
   1489 
   1490 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND], &k, &v, 0))) {
   1491 		fprintf(stderr, "write note pubkey_kind index failed: %s\n",
   1492 			  mdb_strerror(rc));
   1493 		return 0;
   1494 	}
   1495 
   1496 	return 1;
   1497 }
   1498 
   1499 
   1500 static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, int num_indices)
   1501 {
   1502 	MDB_val k, v;
   1503 	MDB_cursor *cur;
   1504 	int i, drop_dbi, count, rc;
   1505 	uint64_t note_key;
   1506 	struct ndb_note *note;
   1507 	enum ndb_dbs index;
   1508 
   1509 	// 0 means empty, not delete the dbi
   1510 	drop_dbi = 0;
   1511 
   1512 	// ensure they are all index dbs
   1513 	for (i = 0; i < num_indices; i++) {
   1514 		if (!ndb_db_is_index(indices[i])) {
   1515 			fprintf(stderr, "ndb_rebuild_note_index: %s is not an index db\n", ndb_db_name(index));
   1516 			return -1;
   1517 		}
   1518 	}
   1519 
   1520 	// empty the index dbs before we rebuild
   1521 	for (i = 0; i < num_indices; i++) {
   1522 		index = indices[i];
   1523 		if (mdb_drop(txn->mdb_txn, index, drop_dbi)) {
   1524 			fprintf(stderr, "ndb_rebuild_pubkey_index: mdb_drop failed for %s\n", ndb_db_name(index));
   1525 			return -1;
   1526 		}
   1527 	}
   1528 
   1529 	if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE], &cur))) {
   1530 		fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc);
   1531 		return -1;
   1532 	}
   1533 
   1534 	count = 0;
   1535 
   1536 	// loop through all notes and write search indices
   1537 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   1538 		note = v.mv_data;
   1539 		note_key = *((uint64_t*)k.mv_data);
   1540 
   1541 		for (i = 0; i < num_indices; i++) {
   1542 			index = indices[i];
   1543 			switch (index) {
   1544 			case NDB_DB_NOTE:
   1545 			case NDB_DB_META:
   1546 			case NDB_DB_PROFILE:
   1547 			case NDB_DB_NOTE_ID:
   1548 			case NDB_DB_NDB_META:
   1549 			case NDB_DB_PROFILE_SEARCH:
   1550 			case NDB_DB_PROFILE_LAST_FETCH:
   1551 			case NDB_DBS:
   1552 				// this should never happen since we check at
   1553 				// the start
   1554 				count = -1;
   1555 				goto cleanup;
   1556 			case NDB_DB_PROFILE_PK:
   1557 			case NDB_DB_NOTE_KIND:
   1558 			case NDB_DB_NOTE_TEXT:
   1559 			case NDB_DB_NOTE_BLOCKS:
   1560 			case NDB_DB_NOTE_TAGS:
   1561 				fprintf(stderr, "%s index rebuild not supported yet. sorry.\n", ndb_db_name(index));
   1562 				count = -1;
   1563 				goto cleanup;
   1564 			case NDB_DB_NOTE_PUBKEY:
   1565 				if (!ndb_write_note_pubkey_index(txn, note, note_key)) {
   1566 					count = -1;
   1567 					goto cleanup;
   1568 				}
   1569 				break;
   1570 			case NDB_DB_NOTE_PUBKEY_KIND:
   1571 				if (!ndb_write_note_pubkey_kind_index(txn, note, note_key)) {
   1572 					count = -1;
   1573 					goto cleanup;
   1574 				}
   1575 				break;
   1576 			}
   1577 		}
   1578 
   1579 		count++;
   1580 	}
   1581 
   1582 cleanup:
   1583 	mdb_cursor_close(cur);
   1584 
   1585 	return count;
   1586 }
   1587 
   1588 
   1589 // Migrations
   1590 //
   1591 
   1592 // This was before we had note_profile_pubkey{,_kind} indices. Let's create them.
   1593 static int ndb_migrate_profile_indices(struct ndb_txn *txn)
   1594 {
   1595 	int count;
   1596 
   1597 	enum ndb_dbs indices[] = {NDB_DB_NOTE_PUBKEY, NDB_DB_NOTE_PUBKEY_KIND};
   1598 	if ((count = ndb_rebuild_note_indices(txn, indices, 2)) != -1) {
   1599 		fprintf(stderr, "migrated %d notes to have pubkey and pubkey_kind indices\n", count);
   1600 		return 1;
   1601 	} else {
   1602 		fprintf(stderr, "error migrating notes to have pubkey and pubkey_kind indices, aborting.\n");
   1603 		return 0;
   1604 	}
   1605 }
   1606 
   1607 static int ndb_migrate_user_search_indices(struct ndb_txn *txn)
   1608 {
   1609 	int rc;
   1610 	MDB_cursor *cur;
   1611 	MDB_val k, v;
   1612 	void *profile_root;
   1613 	NdbProfileRecord_table_t record;
   1614 	struct ndb_note *note;
   1615 	uint64_t note_key, profile_key;
   1616 	size_t len;
   1617 	int count;
   1618 
   1619 	if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) {
   1620 		fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc);
   1621 		return 0;
   1622 	}
   1623 
   1624 	count = 0;
   1625 
   1626 	// loop through all profiles and write search indices
   1627 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   1628 		profile_root = v.mv_data;
   1629 		profile_key = *((uint64_t*)k.mv_data);
   1630 		record = NdbProfileRecord_as_root(profile_root);
   1631 		note_key = NdbProfileRecord_note_key(record);
   1632 		note = ndb_get_note_by_key(txn, note_key, &len);
   1633 
   1634 		if (note == NULL) {
   1635 			continue;
   1636 		}
   1637 
   1638 		if (!ndb_write_profile_search_indices(txn, note, profile_key,
   1639 						      profile_root)) {
   1640 			fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n");
   1641 			continue;
   1642 		}
   1643 
   1644 		count++;
   1645 	}
   1646 
   1647 	fprintf(stderr, "migrated %d profiles to include search indices\n", count);
   1648 
   1649 	mdb_cursor_close(cur);
   1650 
   1651 	return 1;
   1652 }
   1653 
   1654 static int ndb_migrate_lower_user_search_indices(struct ndb_txn *txn)
   1655 {
   1656 	// just drop the search db so we can rebuild it
   1657 	if (mdb_drop(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 0)) {
   1658 		fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n");
   1659 		return 0;
   1660 	}
   1661 
   1662 	return ndb_migrate_user_search_indices(txn);
   1663 }
   1664 
   1665 int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile);
   1666 
   1667 
   1668 int ndb_db_version(struct ndb_txn *txn)
   1669 {
   1670 	uint64_t version, version_key;
   1671 	MDB_val k, v;
   1672 
   1673 	version_key = NDB_META_KEY_VERSION;
   1674 	k.mv_data = &version_key;
   1675 	k.mv_size = sizeof(version_key);
   1676 
   1677 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &k, &v)) {
   1678 		version = -1;
   1679 	} else {
   1680 		if (v.mv_size != 8) {
   1681 			fprintf(stderr, "run_migrations: invalid version size?");
   1682 			return 0;
   1683 		}
   1684 		version = *((uint64_t*)v.mv_data);
   1685 	}
   1686 
   1687 	return version;
   1688 }
   1689 
   1690 // custom pubkey+kind+timestamp comparison function. This is used by lmdb to
   1691 // perform b+ tree searches over the pubkey+kind+timestamp index
   1692 static int ndb_id_u64_ts_compare(const MDB_val *a, const MDB_val *b)
   1693 {
   1694 	struct ndb_id_u64_ts *tsa, *tsb;
   1695 	MDB_val a2 = *a, b2 = *b;
   1696 
   1697 	a2.mv_size = sizeof(tsa->id);
   1698 	b2.mv_size = sizeof(tsb->id);
   1699 
   1700 	int cmp = mdb_cmp_memn(&a2, &b2);
   1701 	if (cmp) return cmp;
   1702 
   1703 	tsa = a->mv_data;
   1704 	tsb = b->mv_data;
   1705 
   1706 	if (tsa->u64 < tsb->u64)
   1707 		return -1;
   1708 	else if (tsa->u64 > tsb->u64)
   1709 		return 1;
   1710 
   1711 	if (tsa->timestamp < tsb->timestamp)
   1712 		return -1;
   1713 	else if (tsa->timestamp > tsb->timestamp)
   1714 		return 1;
   1715 
   1716 	return 0;
   1717 }
   1718 
   1719 // custom kind+timestamp comparison function. This is used by lmdb to perform
   1720 // b+ tree searches over the kind+timestamp index
   1721 static int ndb_u64_ts_compare(const MDB_val *a, const MDB_val *b)
   1722 {
   1723 	struct ndb_u64_ts *tsa, *tsb;
   1724 	tsa = a->mv_data;
   1725 	tsb = b->mv_data;
   1726 
   1727 	if (tsa->u64 < tsb->u64)
   1728 		return -1;
   1729 	else if (tsa->u64 > tsb->u64)
   1730 		return 1;
   1731 
   1732 	if (tsa->timestamp < tsb->timestamp)
   1733 		return -1;
   1734 	else if (tsa->timestamp > tsb->timestamp)
   1735 		return 1;
   1736 
   1737 	return 0;
   1738 }
   1739 
   1740 static int ndb_tsid_compare(const MDB_val *a, const MDB_val *b)
   1741 {
   1742 	struct ndb_tsid *tsa, *tsb;
   1743 	MDB_val a2 = *a, b2 = *b;
   1744 
   1745 	a2.mv_size = sizeof(tsa->id);
   1746 	b2.mv_size = sizeof(tsb->id);
   1747 
   1748 	int cmp = mdb_cmp_memn(&a2, &b2);
   1749 	if (cmp) return cmp;
   1750 
   1751 	tsa = a->mv_data;
   1752 	tsb = b->mv_data;
   1753 
   1754 	if (tsa->timestamp < tsb->timestamp)
   1755 		return -1;
   1756 	else if (tsa->timestamp > tsb->timestamp)
   1757 		return 1;
   1758 	return 0;
   1759 }
   1760 
   1761 enum ndb_ingester_msgtype {
   1762 	NDB_INGEST_EVENT, // write json to the ingester queue for processing
   1763 	NDB_INGEST_QUIT,  // kill ingester thread immediately
   1764 };
   1765 
   1766 struct ndb_ingester_event {
   1767 	char *json;
   1768 	unsigned client : 1; // ["EVENT", {...}] messages
   1769 	unsigned len : 31;
   1770 };
   1771 
   1772 struct ndb_writer_note {
   1773 	struct ndb_note *note;
   1774 	size_t note_len;
   1775 };
   1776 
   1777 struct ndb_writer_profile {
   1778 	struct ndb_writer_note note;
   1779 	struct ndb_profile_record_builder record;
   1780 };
   1781 
   1782 struct ndb_ingester_msg {
   1783 	enum ndb_ingester_msgtype type;
   1784 	union {
   1785 		struct ndb_ingester_event event;
   1786 	};
   1787 };
   1788 
   1789 struct ndb_writer_ndb_meta {
   1790 	// these are 64 bit because I'm paranoid of db-wide alignment issues
   1791 	uint64_t version;
   1792 };
   1793 
   1794 // Used in the writer thread when writing ndb_profile_fetch_record's
   1795 //   kv = pubkey: recor
   1796 struct ndb_writer_last_fetch {
   1797 	unsigned char pubkey[32];
   1798 	uint64_t fetched_at;
   1799 };
   1800 
   1801 // write note blocks
   1802 struct ndb_writer_blocks {
   1803 	struct ndb_blocks *blocks;
   1804 	uint64_t note_key;
   1805 };
   1806 
   1807 // The different types of messages that the writer thread can write to the
   1808 // database
   1809 struct ndb_writer_msg {
   1810 	enum ndb_writer_msgtype type;
   1811 	union {
   1812 		struct ndb_writer_note note;
   1813 		struct ndb_writer_profile profile;
   1814 		struct ndb_writer_ndb_meta ndb_meta;
   1815 		struct ndb_writer_last_fetch last_fetch;
   1816 		struct ndb_writer_blocks blocks;
   1817 	};
   1818 };
   1819 
   1820 static inline int ndb_writer_queue_msg(struct ndb_writer *writer,
   1821 				       struct ndb_writer_msg *msg)
   1822 {
   1823 	return prot_queue_push(&writer->inbox, msg);
   1824 }
   1825 
   1826 static uint64_t ndb_write_note_and_profile(struct ndb_txn *txn, struct ndb_writer_profile *profile, unsigned char *scratch, size_t scratch_size, uint32_t ndb_flags);
   1827 static int ndb_migrate_utf8_profile_names(struct ndb_txn *txn)
   1828 {
   1829 	int rc;
   1830 	MDB_cursor *cur;
   1831 	MDB_val k, v;
   1832 	void *profile_root;
   1833 	NdbProfileRecord_table_t record;
   1834 	struct ndb_note *note, *copied_note;
   1835 	uint64_t note_key;
   1836 	size_t len;
   1837 	int count, failed, ret;
   1838 	struct ndb_writer_profile profile;
   1839 
   1840 	if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) {
   1841 		fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc);
   1842 		return 0;
   1843 	}
   1844 
   1845 	size_t scratch_size = 8 * 1024 * 1024;
   1846 	unsigned char *scratch = malloc(scratch_size);
   1847 
   1848 	ret = 1;
   1849 	count = 0;
   1850 	failed = 0;
   1851 
   1852 	// loop through all profiles and write search indices
   1853 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   1854 		profile_root = v.mv_data;
   1855 		record = NdbProfileRecord_as_root(profile_root);
   1856 		note_key = NdbProfileRecord_note_key(record);
   1857 		note = ndb_get_note_by_key(txn, note_key, &len);
   1858 
   1859 		if (note == NULL) {
   1860 			failed++;
   1861 			continue;
   1862 		}
   1863 
   1864 		struct ndb_profile_record_builder *b = &profile.record;
   1865 
   1866 		// reprocess profile
   1867 		if (!ndb_process_profile_note(note, b)) {
   1868 			failed++;
   1869 			continue;
   1870 		}
   1871 
   1872 		// the writer needs to own this note, and its expected to free it
   1873 		copied_note = malloc(len);
   1874 		memcpy(copied_note, note, len);
   1875 
   1876 		profile.note.note = copied_note;
   1877 		profile.note.note_len = len;
   1878 
   1879 		// we don't pass in flags when migrating... a bit sketchy but
   1880 		// whatever. noone is using this to customize nostrdb atm
   1881 		if (ndb_write_note_and_profile(txn, &profile, scratch, scratch_size, 0)) {
   1882 			count++;
   1883 		}
   1884 	}
   1885 
   1886 	fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count);
   1887 
   1888 	if (failed != 0) {
   1889 		fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed);
   1890 	}
   1891 
   1892 	free(scratch);
   1893 	mdb_cursor_close(cur);
   1894 
   1895 	return ret;
   1896 }
   1897 
   1898 static struct ndb_migration MIGRATIONS[] = {
   1899 	{ .fn = ndb_migrate_user_search_indices },
   1900 	{ .fn = ndb_migrate_lower_user_search_indices },
   1901 	{ .fn = ndb_migrate_utf8_profile_names },
   1902 	{ .fn = ndb_migrate_profile_indices },
   1903 };
   1904 
   1905 
   1906 int ndb_end_query(struct ndb_txn *txn)
   1907 {
   1908 	// this works on read or write queries.
   1909 	return mdb_txn_commit(txn->mdb_txn) == 0;
   1910 }
   1911 
   1912 int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32],
   1913 		    unsigned char sig[64])
   1914 {
   1915 	secp256k1_xonly_pubkey xonly_pubkey;
   1916 	int ok;
   1917 
   1918 	ok = secp256k1_xonly_pubkey_parse((secp256k1_context*)ctx, &xonly_pubkey,
   1919 					  pubkey) != 0;
   1920 	if (!ok) return 0;
   1921 
   1922 	ok = secp256k1_schnorrsig_verify((secp256k1_context*)ctx, sig, id, 32,
   1923 					 &xonly_pubkey) > 0;
   1924 	if (!ok) return 0;
   1925 
   1926 	return 1;
   1927 }
   1928 
   1929 static int ndb_writer_queue_note(struct ndb_writer *writer,
   1930 				 struct ndb_note *note, size_t note_len)
   1931 {
   1932 	struct ndb_writer_msg msg;
   1933 	msg.type = NDB_WRITER_NOTE;
   1934 
   1935 	msg.note.note = note;
   1936 	msg.note.note_len = note_len;
   1937 
   1938 	return prot_queue_push(&writer->inbox, &msg);
   1939 }
   1940 
   1941 static void ndb_writer_last_profile_fetch(struct ndb_txn *txn,
   1942 					  const unsigned char *pubkey,
   1943 					  uint64_t fetched_at)
   1944 {
   1945 	int rc;
   1946 	MDB_val key, val;
   1947 
   1948 	key.mv_data = (unsigned char*)pubkey;
   1949 	key.mv_size = 32;
   1950 	val.mv_data = &fetched_at;
   1951 	val.mv_size = sizeof(fetched_at);
   1952 
   1953 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH],
   1954 			  &key, &val, 0)))
   1955 	{
   1956 		ndb_debug("write version to ndb_meta failed: %s\n",
   1957 				mdb_strerror(rc));
   1958 		return;
   1959 	}
   1960 
   1961 	//fprintf(stderr, "writing version %" PRIu64 "\n", version);
   1962 }
   1963 
   1964 
   1965 // We just received a profile that we haven't processed yet, but it could
   1966 // be an older one! Make sure we only write last fetched profile if it's a new
   1967 // one
   1968 //
   1969 // To do this, we first check the latest profile in the database. If the
   1970 // created_date for this profile note is newer, then we write a
   1971 // last_profile_fetch record, otherwise we do not.
   1972 //
   1973 // WARNING: This function is only valid when called from the writer thread
   1974 static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn,
   1975 					       struct ndb_note *note)
   1976 {
   1977 	size_t len;
   1978 	uint64_t profile_key, note_key;
   1979 	void *root;
   1980 	struct ndb_note *last_profile;
   1981 	NdbProfileRecord_table_t record;
   1982 
   1983 	if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) {
   1984 		record = NdbProfileRecord_as_root(root);
   1985 		note_key = NdbProfileRecord_note_key(record);
   1986 		last_profile = ndb_get_note_by_key(txn, note_key, &len);
   1987 		if (last_profile == NULL) {
   1988 			return 0;
   1989 		}
   1990 
   1991 		// found profile, let's see if it's newer than ours
   1992 		if (note->created_at > last_profile->created_at) {
   1993 			// this is a new profile note, record last fetched time
   1994 			ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL));
   1995 		}
   1996 	} else {
   1997 		// couldn't fetch profile. record last fetched time
   1998 		ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL));
   1999 	}
   2000 
   2001 	return 1;
   2002 }
   2003 
   2004 int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey,
   2005 				 uint64_t fetched_at)
   2006 {
   2007 	struct ndb_writer_msg msg;
   2008 	msg.type = NDB_WRITER_PROFILE_LAST_FETCH;
   2009 	memcpy(&msg.last_fetch.pubkey[0], pubkey, 32);
   2010 	msg.last_fetch.fetched_at = fetched_at;
   2011 
   2012 	return ndb_writer_queue_msg(&ndb->writer, &msg);
   2013 }
   2014 
   2015 
   2016 // When doing cursor scans from greatest to lowest, this function positions the
   2017 // cursor at the first element before descending. MDB_SET_RANGE puts us right
   2018 // after the first element, so we have to go back one.
   2019 static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v)
   2020 {
   2021 	// Position cursor at the next key greater than or equal to the
   2022 	// specified key
   2023 	if (mdb_cursor_get(cur, k, v, MDB_SET_RANGE)) {
   2024 		// Failed :(. It could be the last element?
   2025 		if (mdb_cursor_get(cur, k, v, MDB_LAST))
   2026 			return 0;
   2027 	} else {
   2028 		// if set range worked and our key exists, it should be
   2029 		// the one right before this one
   2030 		if (mdb_cursor_get(cur, k, v, MDB_PREV))
   2031 			return 0;
   2032 	}
   2033 
   2034 	return 1;
   2035 }
   2036 
   2037 // get some value based on a clustered id key
   2038 int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id,
   2039 		 MDB_val *val)
   2040 {
   2041 	MDB_val k, v;
   2042 	MDB_cursor *cur;
   2043 	int success = 0, rc;
   2044 	struct ndb_tsid tsid;
   2045 
   2046 	// position at the most recent
   2047 	ndb_tsid_high(&tsid, id);
   2048 
   2049 	k.mv_data = &tsid;
   2050 	k.mv_size = sizeof(tsid);
   2051 
   2052 	if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur))) {
   2053 		ndb_debug("ndb_get_tsid: failed to open cursor: '%s'\n", mdb_strerror(rc));
   2054 		return 0;
   2055 	}
   2056 
   2057 	if (!ndb_cursor_start(cur, &k, &v))
   2058 		goto cleanup;
   2059 
   2060 	if (memcmp(k.mv_data, id, 32) == 0) {
   2061 		*val = v;
   2062 		success = 1;
   2063 	}
   2064 
   2065 cleanup:
   2066 	mdb_cursor_close(cur);
   2067 	return success;
   2068 }
   2069 
   2070 static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key,
   2071 			       enum ndb_dbs store, size_t *len)
   2072 {
   2073 	MDB_val k, v;
   2074 
   2075 	k.mv_data = &key;
   2076 	k.mv_size = sizeof(key);
   2077 
   2078 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) {
   2079 		ndb_debug("ndb_lookup_by_key: mdb_get note failed\n");
   2080 		return NULL;
   2081 	}
   2082 
   2083 	if (len)
   2084 		*len = v.mv_size;
   2085 
   2086 	return v.mv_data;
   2087 }
   2088 
   2089 static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind,
   2090 			     enum ndb_dbs store, const unsigned char *pk,
   2091 			     size_t *len, uint64_t *primkey)
   2092 {
   2093 	MDB_val k, v;
   2094 	void *res = NULL;
   2095 	if (len)
   2096 		*len = 0;
   2097 
   2098 	if (!ndb_get_tsid(txn, ind, pk, &k)) {
   2099 		//ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n");
   2100 		return 0;
   2101 	}
   2102 
   2103 	if (primkey)
   2104 		*primkey = *(uint64_t*)k.mv_data;
   2105 
   2106 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) {
   2107 		ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
   2108 		return 0;
   2109 	}
   2110 
   2111 	res = v.mv_data;
   2112 	assert(((uint64_t)res % 4) == 0);
   2113 	if (len)
   2114 		*len = v.mv_size;
   2115 	return res;
   2116 }
   2117 
   2118 void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pk, size_t *len, uint64_t *key)
   2119 {
   2120 	return ndb_lookup_tsid(txn, NDB_DB_PROFILE_PK, NDB_DB_PROFILE, pk, len, key);
   2121 }
   2122 
   2123 struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *key)
   2124 {
   2125 	return ndb_lookup_tsid(txn, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len, key);
   2126 }
   2127 
   2128 static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn,
   2129 					      enum ndb_dbs db,
   2130 					      const unsigned char *id)
   2131 {
   2132 	MDB_val k;
   2133 
   2134 	if (!ndb_get_tsid(txn, db, id, &k))
   2135 		return 0;
   2136 
   2137 	return *(uint32_t*)k.mv_data;
   2138 }
   2139 
   2140 uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id)
   2141 {
   2142 	return ndb_get_indexkey_by_id(txn, NDB_DB_NOTE_ID, id);
   2143 }
   2144 
   2145 uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id)
   2146 {
   2147 	return ndb_get_indexkey_by_id(txn, NDB_DB_PROFILE_PK, id);
   2148 }
   2149 
   2150 struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len)
   2151 {
   2152 	return ndb_lookup_by_key(txn, key, NDB_DB_NOTE, len);
   2153 }
   2154 
   2155 void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len)
   2156 {
   2157 	return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len);
   2158 }
   2159 
   2160 uint64_t
   2161 ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey)
   2162 {
   2163 	MDB_val k, v;
   2164 
   2165 	k.mv_data = (unsigned char*)pubkey;
   2166 	k.mv_size = 32;
   2167 
   2168 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) {
   2169 		//ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n");
   2170 		return 0;
   2171 	}
   2172 
   2173 	return *((uint64_t*)v.mv_data);
   2174 }
   2175 
   2176 
   2177 static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id)
   2178 {
   2179 	MDB_val val;
   2180 
   2181 	if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val))
   2182 		return 0;
   2183 
   2184 	return 1;
   2185 }
   2186 
   2187 static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb,
   2188 			     MDB_txn *mdb_txn)
   2189 {
   2190 	txn->lmdb = lmdb;
   2191 	txn->mdb_txn = mdb_txn;
   2192 }
   2193 
   2194 static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid)
   2195 {
   2196 	unsigned char id[32];
   2197 	struct ndb_ingest_controller *c = data;
   2198 	struct ndb_txn txn;
   2199 
   2200 	hex_decode(hexid, 64, id, sizeof(id));
   2201 
   2202 	// let's see if we already have it
   2203 
   2204 	ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn);
   2205 	if (!ndb_has_note(&txn, id))
   2206 		return NDB_IDRES_CONT;
   2207 
   2208 	return NDB_IDRES_STOP;
   2209 }
   2210 
   2211 static int ndbprofile_parse_json(flatcc_builder_t *B,
   2212         const char *buf, size_t bufsiz, int flags, NdbProfile_ref_t *profile)
   2213 {
   2214 	flatcc_json_parser_t parser, *ctx = &parser;
   2215 	flatcc_json_parser_init(ctx, B, buf, buf + bufsiz, flags);
   2216 
   2217 	if (flatcc_builder_start_buffer(B, 0, 0, 0))
   2218 		return 0;
   2219 
   2220 	NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile);
   2221 	if (ctx->error)
   2222 		return 0;
   2223 
   2224 	if (!flatcc_builder_end_buffer(B, *profile))
   2225 		return 0;
   2226 
   2227 	ctx->end_loc = buf;
   2228 
   2229 
   2230 	return 1;
   2231 }
   2232 
   2233 void ndb_profile_record_builder_init(struct ndb_profile_record_builder *b)
   2234 {
   2235 	b->builder = malloc(sizeof(*b->builder));
   2236 	b->flatbuf = NULL;
   2237 }
   2238 
   2239 void ndb_profile_record_builder_free(struct ndb_profile_record_builder *b)
   2240 {
   2241 	if (b->builder)
   2242 		free(b->builder);
   2243 	if (b->flatbuf)
   2244 		free(b->flatbuf);
   2245 
   2246 	b->builder = NULL;
   2247 	b->flatbuf = NULL;
   2248 }
   2249 
   2250 int ndb_process_profile_note(struct ndb_note *note,
   2251 			     struct ndb_profile_record_builder *profile)
   2252 {
   2253 	int res;
   2254 
   2255 	NdbProfile_ref_t profile_table;
   2256 	flatcc_builder_t *builder;
   2257 
   2258 	ndb_profile_record_builder_init(profile);
   2259 	builder = profile->builder;
   2260 	flatcc_builder_init(builder);
   2261 
   2262 	NdbProfileRecord_start_as_root(builder);
   2263 
   2264 	//printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note));
   2265 	if (!(res = ndbprofile_parse_json(builder, ndb_note_content(note),
   2266 					  note->content_length,
   2267 					  flatcc_json_parser_f_skip_unknown,
   2268 					  &profile_table)))
   2269 	{
   2270 		ndb_debug("profile_parse_json failed %d '%.*s'\n", res,
   2271 			  note->content_length, ndb_note_content(note));
   2272 		ndb_profile_record_builder_free(profile);
   2273 		return 0;
   2274 	}
   2275 
   2276 	uint64_t received_at = time(NULL);
   2277 	const char *lnurl = "fixme";
   2278 
   2279 	NdbProfileRecord_profile_add(builder, profile_table);
   2280 	NdbProfileRecord_received_at_add(builder, received_at);
   2281 
   2282 	flatcc_builder_ref_t lnurl_off;
   2283 	lnurl_off = flatcc_builder_create_string_str(builder, lnurl);
   2284 
   2285 	NdbProfileRecord_lnurl_add(builder, lnurl_off);
   2286 
   2287 	//*profile = flatcc_builder_finalize_aligned_buffer(builder, profile_len);
   2288 	return 1;
   2289 }
   2290 
   2291 static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
   2292 				    char *json, unsigned len, unsigned client)
   2293 {
   2294 	struct ndb_ingester_msg msg;
   2295 	msg.type = NDB_INGEST_EVENT;
   2296 
   2297 	msg.event.json = json;
   2298 	msg.event.len = len;
   2299 	msg.event.client = client;
   2300 
   2301 	return threadpool_dispatch(&ingester->tp, &msg);
   2302 }
   2303 
   2304 
   2305 static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json,
   2306 			    int len, unsigned client)
   2307 {
   2308 	// Since we need to return as soon as possible, and we're not
   2309 	// making any assumptions about the lifetime of the string, we
   2310 	// definitely need to copy the json here. In the future once we
   2311 	// have our thread that manages a websocket connection, we can
   2312 	// avoid the copy and just use the buffer we get from that
   2313 	// thread.
   2314 	char *json_copy = strdupn(json, len);
   2315 	if (json_copy == NULL)
   2316 		return 0;
   2317 
   2318 	return ndb_ingester_queue_event(ingester, json_copy, len, client);
   2319 }
   2320 
   2321 
   2322 static int ndb_ingester_process_note(secp256k1_context *ctx,
   2323 				     struct ndb_note *note,
   2324 				     size_t note_size,
   2325 				     struct ndb_writer_msg *out,
   2326 				     struct ndb_ingester *ingester)
   2327 {
   2328 	enum ndb_ingest_filter_action action;
   2329 	action = NDB_INGEST_ACCEPT;
   2330 
   2331 	if (ingester->filter)
   2332 		action = ingester->filter(ingester->filter_context, note);
   2333 
   2334 	if (action == NDB_INGEST_REJECT)
   2335 		return 0;
   2336 
   2337 	// some special situations we might want to skip sig validation,
   2338 	// like during large imports
   2339 	if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) {
   2340 		// if we're skipping validation we don't need to verify
   2341 	} else {
   2342 		// verify! If it's an invalid note we don't need to
   2343 		// bother writing it to the database
   2344 		if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
   2345 			ndb_debug("signature verification failed\n");
   2346 			return 0;
   2347 		}
   2348 	}
   2349 
   2350 	// we didn't find anything. let's send it
   2351 	// to the writer thread
   2352 	note = realloc(note, note_size);
   2353 	assert(((uint64_t)note % 4) == 0);
   2354 
   2355 	if (note->kind == 0) {
   2356 		struct ndb_profile_record_builder *b =
   2357 			&out->profile.record;
   2358 
   2359 		ndb_process_profile_note(note, b);
   2360 
   2361 		out->type = NDB_WRITER_PROFILE;
   2362 		out->profile.note.note = note;
   2363 		out->profile.note.note_len = note_size;
   2364 		return 1;
   2365 	} else if (note->kind == 6) {
   2366 		// process the repost if we have a repost event
   2367 		ndb_debug("processing kind 6 repost\n");
   2368 		ndb_ingest_event(ingester, ndb_note_content(note),
   2369 					   ndb_note_content_length(note), 0);
   2370 	}
   2371 
   2372 	out->type = NDB_WRITER_NOTE;
   2373 	out->note.note = note;
   2374 	out->note.note_len = note_size;
   2375 
   2376 	return 1;
   2377 }
   2378 
   2379 
   2380 static int ndb_ingester_process_event(secp256k1_context *ctx,
   2381 				      struct ndb_ingester *ingester,
   2382 				      struct ndb_ingester_event *ev,
   2383 				      struct ndb_writer_msg *out,
   2384 				      MDB_txn *read_txn
   2385 				      )
   2386 {
   2387 	struct ndb_tce tce;
   2388 	struct ndb_fce fce;
   2389 	struct ndb_note *note;
   2390 	struct ndb_ingest_controller controller;
   2391 	struct ndb_id_cb cb;
   2392 	void *buf;
   2393 	int ok;
   2394 	size_t bufsize, note_size;
   2395 
   2396 	ok = 0;
   2397 
   2398 	// we will use this to check if we already have it in the DB during
   2399 	// ID parsing
   2400 	controller.read_txn = read_txn;
   2401 	controller.lmdb = ingester->lmdb;
   2402 	cb.fn = ndb_ingester_json_controller;
   2403 	cb.data = &controller;
   2404 
   2405 	// since we're going to be passing this allocated note to a different
   2406 	// thread, we can't use thread-local buffers. just allocate a block
   2407         bufsize = max(ev->len * 8.0, 4096);
   2408 	buf = malloc(bufsize);
   2409 	if (!buf) {
   2410 		ndb_debug("couldn't malloc buf\n");
   2411 		return 0;
   2412 	}
   2413 
   2414 	note_size =
   2415 		ev->client ?
   2416 		ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) :
   2417 		ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb);
   2418 
   2419 	if ((int)note_size == -42) {
   2420 		// we already have this!
   2421 		//ndb_debug("already have id??\n");
   2422 		goto cleanup;
   2423 	} else if (note_size == 0) {
   2424 		ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json);
   2425 		goto cleanup;
   2426 	}
   2427 
   2428 	//ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json);
   2429 
   2430 	if (ev->client) {
   2431 		switch (fce.evtype) {
   2432 		case NDB_FCE_EVENT:
   2433 			note = fce.event.note;
   2434 			if (note != buf) {
   2435 				ndb_debug("note buffer not equal to malloc'd buffer\n");
   2436 				goto cleanup;
   2437 			}
   2438 
   2439 			if (!ndb_ingester_process_note(ctx, note, note_size,
   2440 						       out, ingester)) {
   2441 				ndb_debug("failed to process note\n");
   2442 				goto cleanup;
   2443 			} else {
   2444 				// we're done with the original json, free it
   2445 				free(ev->json);
   2446 				return 1;
   2447 			}
   2448 		}
   2449 	} else {
   2450 		switch (tce.evtype) {
   2451 		case NDB_TCE_AUTH:   goto cleanup;
   2452 		case NDB_TCE_NOTICE: goto cleanup;
   2453 		case NDB_TCE_EOSE:   goto cleanup;
   2454 		case NDB_TCE_OK:     goto cleanup;
   2455 		case NDB_TCE_EVENT:
   2456 			note = tce.event.note;
   2457 			if (note != buf) {
   2458 				ndb_debug("note buffer not equal to malloc'd buffer\n");
   2459 				goto cleanup;
   2460 			}
   2461 
   2462 			if (!ndb_ingester_process_note(ctx, note, note_size,
   2463 						       out, ingester)) {
   2464 				ndb_debug("failed to process note\n");
   2465 				goto cleanup;
   2466 			} else {
   2467 				// we're done with the original json, free it
   2468 				free(ev->json);
   2469 				return 1;
   2470 			}
   2471 		}
   2472 	}
   2473 
   2474 
   2475 cleanup:
   2476 	free(ev->json);
   2477 	free(buf);
   2478 
   2479 	return ok;
   2480 }
   2481 
   2482 static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db)
   2483 {
   2484 	MDB_cursor *mc;
   2485 	MDB_val key, val;
   2486 
   2487 	if (mdb_cursor_open(txn, db, &mc))
   2488 		return 0;
   2489 
   2490 	if (mdb_cursor_get(mc, &key, &val, MDB_LAST)) {
   2491 		mdb_cursor_close(mc);
   2492 		return 0;
   2493 	}
   2494 
   2495 	mdb_cursor_close(mc);
   2496 
   2497 	assert(key.mv_size == 8);
   2498         return *((uint64_t*)key.mv_data);
   2499 }
   2500 
   2501 //
   2502 // make a search key meant for user queries without any other note info
   2503 static void ndb_make_search_key_low(struct ndb_search_key *key, const char *search)
   2504 {
   2505 	memset(key->id, 0, sizeof(key->id));
   2506 	key->timestamp = 0;
   2507 	lowercase_strncpy(key->search, search, sizeof(key->search) - 1);
   2508 	key->search[sizeof(key->search) - 1] = '\0';
   2509 }
   2510 
   2511 int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query)
   2512 {
   2513 	int rc;
   2514 	struct ndb_search_key s;
   2515 	MDB_val k, v;
   2516 	search->cursor = NULL;
   2517 
   2518 	MDB_cursor **cursor = (MDB_cursor **)&search->cursor;
   2519 
   2520 	ndb_make_search_key_low(&s, query);
   2521 
   2522 	k.mv_data = &s;
   2523 	k.mv_size = sizeof(s);
   2524 
   2525 	if ((rc = mdb_cursor_open(txn->mdb_txn,
   2526 				  txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH],
   2527 				  cursor))) {
   2528 		printf("search_profile: cursor opened failed: %s\n",
   2529 				mdb_strerror(rc));
   2530 		return 0;
   2531 	}
   2532 
   2533 	// Position cursor at the next key greater than or equal to the specified key
   2534 	if (mdb_cursor_get(search->cursor, &k, &v, MDB_SET_RANGE)) {
   2535 		printf("search_profile: cursor get failed\n");
   2536 		goto cleanup;
   2537 	} else {
   2538 		search->key = k.mv_data;
   2539 		assert(v.mv_size == 8);
   2540 		search->profile_key = *((uint64_t*)v.mv_data);
   2541 		return 1;
   2542 	}
   2543 
   2544 cleanup:
   2545 	mdb_cursor_close(search->cursor);
   2546 	search->cursor = NULL;
   2547 	return 0;
   2548 }
   2549 
   2550 void ndb_search_profile_end(struct ndb_search *search)
   2551 {
   2552 	if (search->cursor)
   2553 		mdb_cursor_close(search->cursor);
   2554 }
   2555 
   2556 int ndb_search_profile_next(struct ndb_search *search)
   2557 {
   2558 	int rc;
   2559 	MDB_val k, v;
   2560 	unsigned char *init_id;
   2561 
   2562 	init_id = search->key->id;
   2563 	k.mv_data = search->key;
   2564 	k.mv_size = sizeof(*search->key);
   2565 
   2566 retry:
   2567 	if ((rc = mdb_cursor_get(search->cursor, &k, &v, MDB_NEXT))) {
   2568 		ndb_debug("ndb_search_profile_next: %s\n",
   2569 				mdb_strerror(rc));
   2570 		return 0;
   2571 	} else {
   2572 		search->key = k.mv_data;
   2573 		assert(v.mv_size == 8);
   2574 		search->profile_key = *((uint64_t*)v.mv_data);
   2575 
   2576 		// skip duplicate pubkeys
   2577 		if (!memcmp(init_id, search->key->id, 32))
   2578 			goto retry;
   2579 	}
   2580 
   2581 	return 1;
   2582 }
   2583 
   2584 static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b)
   2585 {
   2586 	int cmp;
   2587 	struct ndb_search_key *ska, *skb;
   2588 
   2589 	ska = a->mv_data;
   2590 	skb = b->mv_data;
   2591 
   2592 	MDB_val a2 = *a;
   2593 	MDB_val b2 = *b;
   2594 
   2595 	a2.mv_data = ska->search;
   2596 	a2.mv_size = sizeof(ska->search) + sizeof(ska->id);
   2597 
   2598 	cmp = mdb_cmp_memn(&a2, &b2);
   2599 	if (cmp) return cmp;
   2600 
   2601 	if (ska->timestamp < skb->timestamp)
   2602 		return -1;
   2603 	else if (ska->timestamp > skb->timestamp)
   2604 		return 1;
   2605 
   2606 	return 0;
   2607 }
   2608 
   2609 static int ndb_write_profile_pk_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key)
   2610 
   2611 {
   2612 	MDB_val key, val;
   2613 	int rc;
   2614 	struct ndb_tsid tsid;
   2615 	MDB_dbi pk_db;
   2616 
   2617 	pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK];
   2618 
   2619 	// write profile_pk + created_at index
   2620 	ndb_tsid_init(&tsid, note->pubkey, note->created_at);
   2621 
   2622 	key.mv_data = &tsid;
   2623 	key.mv_size = sizeof(tsid);
   2624 	val.mv_data = &profile_key;
   2625 	val.mv_size = sizeof(profile_key);
   2626 
   2627 	if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) {
   2628 		ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n",
   2629 				profile_key, mdb_strerror(rc));
   2630 		return 0;
   2631 	}
   2632 
   2633 	return 1;
   2634 }
   2635 
   2636 static int ndb_write_profile(struct ndb_txn *txn,
   2637 			     struct ndb_writer_profile *profile,
   2638 			     uint64_t note_key)
   2639 {
   2640 	uint64_t profile_key;
   2641 	struct ndb_note *note;
   2642 	void *flatbuf;
   2643 	size_t flatbuf_len;
   2644 	int rc;
   2645 
   2646 	MDB_val key, val;
   2647 	MDB_dbi profile_db;
   2648 
   2649 	note = profile->note.note;
   2650 
   2651 	// add note_key to profile record
   2652 	NdbProfileRecord_note_key_add(profile->record.builder, note_key);
   2653 	NdbProfileRecord_end_as_root(profile->record.builder);
   2654 
   2655 	flatbuf = profile->record.flatbuf =
   2656 		flatcc_builder_finalize_aligned_buffer(profile->record.builder, &flatbuf_len);
   2657 
   2658 	assert(((uint64_t)flatbuf % 8) == 0);
   2659 
   2660 	// TODO: this may not be safe!?
   2661 	flatbuf_len = (flatbuf_len + 7) & ~7;
   2662 
   2663 	//assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0);
   2664 
   2665 	// get dbs
   2666 	profile_db = txn->lmdb->dbs[NDB_DB_PROFILE];
   2667 
   2668 	// get new key
   2669 	profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1;
   2670 
   2671 	// write profile to profile store
   2672 	key.mv_data = &profile_key;
   2673 	key.mv_size = sizeof(profile_key);
   2674 	val.mv_data = flatbuf;
   2675 	val.mv_size = flatbuf_len;
   2676 
   2677 	if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) {
   2678 		ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc));
   2679 		return 0;
   2680 	}
   2681 
   2682 	// write last fetched record
   2683 	if (!ndb_maybe_write_last_profile_fetch(txn, note)) {
   2684 		ndb_debug("failed to write last profile fetched record\n");
   2685 	}
   2686 
   2687 	// write profile pubkey index
   2688 	if (!ndb_write_profile_pk_index(txn, note, profile_key)) {
   2689 		ndb_debug("failed to write profile pubkey index\n");
   2690 		return 0;
   2691 	}
   2692 
   2693 	// write name, display_name profile search indices
   2694 	if (!ndb_write_profile_search_indices(txn, note, profile_key,
   2695 					      flatbuf)) {
   2696 		ndb_debug("failed to write profile search indices\n");
   2697 		return 0;
   2698 	}
   2699 
   2700 	return 1;
   2701 }
   2702 
   2703 // find the last id tag in a note (e, p, etc)
   2704 static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type)
   2705 {
   2706 	unsigned char *last = NULL;
   2707 	struct ndb_iterator iter;
   2708 	struct ndb_str str;
   2709 
   2710 	// get the liked event id (last id)
   2711 	ndb_tags_iterate_start(note, &iter);
   2712 
   2713 	while (ndb_tags_iterate_next(&iter)) {
   2714 		if (iter.tag->count < 2)
   2715 			continue;
   2716 
   2717 		str = ndb_tag_str(note, iter.tag, 0);
   2718 
   2719 		// assign liked to the last e tag
   2720 		if (str.flag == NDB_PACKED_STR && str.str[0] == type) {
   2721 			str = ndb_tag_str(note, iter.tag, 1);
   2722 			if (str.flag == NDB_PACKED_ID)
   2723 				last = str.id;
   2724 		}
   2725 	}
   2726 
   2727 	return last;
   2728 }
   2729 
   2730 void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len)
   2731 {
   2732 	MDB_val k, v;
   2733 
   2734 	k.mv_data = (unsigned char*)id;
   2735 	k.mv_size = 32;
   2736 
   2737 	if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) {
   2738 		//ndb_debug("ndb_get_note_meta: mdb_get note failed\n");
   2739 		return NULL;
   2740 	}
   2741 
   2742 	if (len)
   2743 		*len = v.mv_size;
   2744 
   2745 	return v.mv_data;
   2746 }
   2747 
   2748 // When receiving a reaction note, look for the liked id and increase the
   2749 // reaction counter in the note metadata database
   2750 static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note)
   2751 {
   2752 	size_t len;
   2753 	void *root;
   2754 	int reactions, rc;
   2755 	MDB_val key, val;
   2756 	NdbEventMeta_table_t meta;
   2757 	unsigned char *liked = ndb_note_last_id_tag(note, 'e');
   2758 
   2759 	if (liked == NULL)
   2760 		return 0;
   2761 
   2762 	root = ndb_get_note_meta(txn, liked, &len);
   2763 
   2764 	flatcc_builder_t builder;
   2765 	flatcc_builder_init(&builder);
   2766 	NdbEventMeta_start_as_root(&builder);
   2767 
   2768 	// no meta record, let's make one
   2769 	if (root == NULL) {
   2770 		NdbEventMeta_reactions_add(&builder, 1);
   2771 	} else {
   2772 		// clone existing and add to it
   2773 		meta = NdbEventMeta_as_root(root);
   2774 
   2775 		reactions = NdbEventMeta_reactions_get(meta);
   2776 		NdbEventMeta_clone(&builder, meta);
   2777 		NdbEventMeta_reactions_add(&builder, reactions + 1);
   2778 	}
   2779 
   2780 	NdbProfileRecord_end_as_root(&builder);
   2781 	root = flatcc_builder_finalize_aligned_buffer(&builder, &len);
   2782 	assert(((uint64_t)root % 8) == 0);
   2783 
   2784 	if (root == NULL) {
   2785 		ndb_debug("failed to create note metadata record\n");
   2786 		return 0;
   2787 	}
   2788 
   2789 	// metadata is keyed on id because we want to collect stats regardless
   2790 	// if we have the note yet or not
   2791 	key.mv_data = liked;
   2792 	key.mv_size = 32;
   2793 
   2794 	val.mv_data = root;
   2795 	val.mv_size = len;
   2796 
   2797 	// write the new meta record
   2798 	//ndb_debug("writing stats record for ");
   2799 	//print_hex(liked, 32);
   2800 	//ndb_debug("\n");
   2801 
   2802 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) {
   2803 		ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc));
   2804 		free(root);
   2805 		return 0;
   2806 	}
   2807 
   2808 	free(root);
   2809 
   2810 	return 1;
   2811 }
   2812 
   2813 
   2814 static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note,
   2815 				   uint64_t note_key)
   2816 
   2817 {
   2818 	struct ndb_tsid tsid;
   2819 	int rc;
   2820 	MDB_val key, val;
   2821 	MDB_dbi id_db;
   2822 
   2823 	ndb_tsid_init(&tsid, note->id, note->created_at);
   2824 
   2825 	key.mv_data = &tsid;
   2826 	key.mv_size = sizeof(tsid);
   2827 	val.mv_data = &note_key;
   2828 	val.mv_size = sizeof(note_key);
   2829 
   2830 	id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
   2831 
   2832 	if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) {
   2833 		ndb_debug("write note id index to db failed: %s\n",
   2834 				mdb_strerror(rc));
   2835 		return 0;
   2836 	}
   2837 
   2838 	return 1;
   2839 }
   2840 
   2841 static int ndb_filter_group_add_filters(struct ndb_filter_group *group,
   2842 					struct ndb_filter *filters,
   2843 					int num_filters)
   2844 {
   2845 	int i;
   2846 
   2847 	for (i = 0; i < num_filters; i++) {
   2848 		if (!ndb_filter_group_add(group, &filters[i]))
   2849 			return 0;
   2850 	}
   2851 
   2852 	return 1;
   2853 }
   2854 
   2855 
   2856 static struct ndb_filter_elements *
   2857 ndb_filter_find_elements(struct ndb_filter *filter, enum ndb_filter_fieldtype typ)
   2858 {
   2859 	int i;
   2860 	struct ndb_filter_elements *els;
   2861 
   2862 	for (i = 0; i < filter->num_elements; i++) {
   2863 		els = ndb_filter_get_elements(filter, i);
   2864 		assert(els);
   2865 		if (els->field.type == typ) {
   2866 			return els;
   2867 		}
   2868 	}
   2869 
   2870 	return NULL;
   2871 }
   2872 
   2873 int ndb_filter_is_subset_of(const struct ndb_filter *a, const struct ndb_filter *b)
   2874 {
   2875 	int i;
   2876 	struct ndb_filter_elements *b_field, *a_field;
   2877 
   2878         // Everything is always a subset of {}
   2879 	if (b->num_elements == 0)
   2880 		return 1;
   2881 
   2882         // We can't be a subset if the number of elements in the other
   2883         // filter is larger then the number of elements we have.
   2884 	if (b->num_elements > a->num_elements)
   2885 		return 0;
   2886 
   2887         // If our filter count matches, we can only be a subset if we are
   2888         // equal
   2889 	if (b->num_elements == a->num_elements)
   2890 		return ndb_filter_eq(a, b);
   2891 
   2892         // If our element count is larger than the other filter, then we
   2893         // must see if every element in the other filter exists in ours. If
   2894         // so, then we are a subset of the other.
   2895         //
   2896         // eg: B={k:1, a:b} <- A={t:x, k:1, a:b}
   2897         //
   2898         // A is a subset of B because `k:1` and `a:b` both exist in A
   2899 
   2900 	for (i = 0; i < b->num_elements; i++) {
   2901 		b_field = ndb_filter_get_elements((struct ndb_filter*)b, i);
   2902 		a_field = ndb_filter_find_elements((struct ndb_filter*)a,
   2903 						   b_field->field.type);
   2904 
   2905 		if (a_field == NULL)
   2906 			return 0;
   2907 
   2908 		if (!ndb_filter_field_eq((struct ndb_filter*)a, a_field,
   2909 					 (struct ndb_filter*)b, b_field))
   2910 			return 0;
   2911 	}
   2912 
   2913 	return 1;
   2914 }
   2915 
   2916 int ndb_filter_eq(const struct ndb_filter *a, const struct ndb_filter *b)
   2917 {
   2918 	int i;
   2919 	struct ndb_filter_elements *a_els, *b_els;
   2920 
   2921 	if (a->num_elements != b->num_elements)
   2922 		return 0;
   2923 
   2924 	for (i = 0; i < a->num_elements; i++) {
   2925 		a_els = ndb_filter_get_elements((struct ndb_filter*)a, i);
   2926 		b_els = ndb_filter_find_elements((struct ndb_filter *)b,
   2927 						 a_els->field.type);
   2928 
   2929 		if (b_els == NULL)
   2930 			return 0;
   2931 
   2932 		if (!ndb_filter_field_eq((struct ndb_filter*)a, a_els,
   2933 					 (struct ndb_filter*)b, b_els))
   2934 			return 0;
   2935 	}
   2936 
   2937 	return 1;
   2938 }
   2939 
   2940 
   2941 static uint64_t *
   2942 ndb_filter_get_elem(struct ndb_filter *filter, enum ndb_filter_fieldtype typ)
   2943 {
   2944 	struct ndb_filter_elements *els;
   2945 	if ((els = ndb_filter_find_elements(filter, typ)))
   2946 		return &els->elements[0];
   2947 	return NULL;
   2948 }
   2949 
   2950 static uint64_t *ndb_filter_get_int(struct ndb_filter *filter,
   2951 				    enum ndb_filter_fieldtype typ)
   2952 {
   2953 	uint64_t *el;
   2954 	if (NULL == (el = ndb_filter_get_elem(filter, typ)))
   2955 		return NULL;
   2956 	return el;
   2957 }
   2958 
   2959 static inline int push_query_result(struct ndb_query_results *results,
   2960 				    struct ndb_query_result *result)
   2961 {
   2962 	return cursor_push(&results->cur, (unsigned char*)result, sizeof(*result));
   2963 }
   2964 
   2965 static int compare_query_results(const void *pa, const void *pb)
   2966 {
   2967 	struct ndb_query_result *a, *b;
   2968 
   2969 	a = (struct ndb_query_result *)pa;
   2970 	b = (struct ndb_query_result *)pb;
   2971 
   2972 	if (a->note->created_at == b->note->created_at) {
   2973 		return 0;
   2974 	} else if (a->note->created_at > b->note->created_at) {
   2975 		return -1;
   2976 	} else {
   2977 		return 1;
   2978 	}
   2979 }
   2980 
   2981 static void ndb_query_result_init(struct ndb_query_result *res,
   2982 				  struct ndb_note *note,
   2983 				  uint64_t note_size,
   2984 				  uint64_t note_id)
   2985 {
   2986 	*res = (struct ndb_query_result){
   2987 		.note_id = note_id,
   2988 		.note_size = note_size,
   2989 		.note = note,
   2990 	};
   2991 }
   2992 
   2993 static int query_is_full(struct ndb_query_results *results, int limit)
   2994 {
   2995 	if (results->cur.p >= results->cur.end)
   2996 		return 1;
   2997 
   2998 	return cursor_count(&results->cur, sizeof(struct ndb_query_result)) >= limit;
   2999 }
   3000 
   3001 static int ndb_query_plan_execute_ids(struct ndb_txn *txn,
   3002 				      struct ndb_filter *filter,
   3003 				      struct ndb_query_results *results,
   3004 				      int limit
   3005 				      )
   3006 {
   3007 	MDB_cursor *cur;
   3008 	MDB_dbi db;
   3009 	MDB_val k, v;
   3010 	int rc, i;
   3011 	struct ndb_filter_elements *ids;
   3012 	struct ndb_note *note;
   3013 	struct ndb_query_result res;
   3014 	struct ndb_tsid tsid, *ptsid;
   3015 	uint64_t note_id, until, *pint;
   3016 	size_t note_size;
   3017 	unsigned char *id;
   3018 
   3019 	until = UINT64_MAX;
   3020 
   3021 	if (!(ids = ndb_filter_find_elements(filter, NDB_FILTER_IDS)))
   3022 		return 0;
   3023 
   3024 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
   3025 		until = *pint;
   3026 
   3027 	db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
   3028 	if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
   3029 		return 0;
   3030 
   3031 	// for each id in our ids filter, find in the db
   3032 	for (i = 0; i < ids->count; i++) {
   3033 		if (query_is_full(results, limit))
   3034 			break;
   3035 
   3036 		id = ndb_filter_get_id_element(filter, ids, i);
   3037 		ndb_tsid_init(&tsid, (unsigned char *)id, until);
   3038 
   3039 		k.mv_data = &tsid;
   3040 		k.mv_size = sizeof(tsid);
   3041 
   3042 		if (!ndb_cursor_start(cur, &k, &v))
   3043 			continue;
   3044 
   3045 		ptsid = (struct ndb_tsid *)k.mv_data;
   3046 		note_id = *(uint64_t*)v.mv_data;
   3047 
   3048 		if (memcmp(id, ptsid->id, 32))
   3049 			continue;
   3050 
   3051 		// get the note because we need it to match against the filter
   3052 		if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
   3053 			continue;
   3054 
   3055 		// Sure this particular lookup matched the index query, but
   3056 		// does it match the entire filter? Check! We also pass in
   3057 		// things we've already matched via the filter so we don't have
   3058 		// to check again. This can be pretty important for filters
   3059 		// with a large number of entries.
   3060 		if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_IDS))
   3061 			continue;
   3062 
   3063 		ndb_query_result_init(&res, note, note_size, note_id);
   3064 		if (!push_query_result(results, &res))
   3065 			break;
   3066 	}
   3067 
   3068 	mdb_cursor_close(cur);
   3069 	return 1;
   3070 }
   3071 
   3072 //
   3073 // encode a tag index key
   3074 //
   3075 // consists of:
   3076 //
   3077 //   u8   tag
   3078 //   u8   tag_val_len
   3079 //   [u8] tag_val_bytes
   3080 //   u64  created_at
   3081 //
   3082 static int ndb_encode_tag_key(unsigned char *buf, int buf_size,
   3083 			      char tag, const unsigned char *val,
   3084 			      unsigned char val_len,
   3085 			      uint64_t timestamp)
   3086 {
   3087 	struct cursor writer;
   3088 	int ok;
   3089 
   3090 	// quick exit for obvious case where it will be too big. There can be
   3091 	// values of val_len that still fail, but we just let the writer handle
   3092 	// those failure cases
   3093 	if (val_len >= buf_size)
   3094 		return 0;
   3095 
   3096 	make_cursor(buf, buf + buf_size, &writer);
   3097 
   3098 	ok =
   3099 		cursor_push_byte(&writer, tag) &&
   3100 		cursor_push(&writer, (unsigned char*)val, val_len) &&
   3101 		cursor_push(&writer, (unsigned char*)&timestamp, sizeof(timestamp));
   3102 
   3103 	if (!ok)
   3104 		return 0;
   3105 
   3106 	return writer.p - writer.start;
   3107 }
   3108 
   3109 static int ndb_query_plan_execute_authors(struct ndb_txn *txn,
   3110 					  struct ndb_filter *filter,
   3111 					  struct ndb_query_results *results,
   3112 					  int limit)
   3113 {
   3114 	MDB_val k, v;
   3115 	MDB_cursor *cur;
   3116 	int rc, i;
   3117 	uint64_t *pint, until, since, note_key;
   3118 	unsigned char *author;
   3119 	struct ndb_note *note;
   3120 	size_t note_size;
   3121 	struct ndb_filter_elements *authors;
   3122 	struct ndb_query_result res;
   3123 	struct ndb_tsid tsid, *ptsid;
   3124 	enum ndb_dbs db;
   3125 
   3126 	db = txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY];
   3127 
   3128 	if (!(authors = ndb_filter_find_elements(filter, NDB_FILTER_AUTHORS)))
   3129 		return 0;
   3130 
   3131 	until = UINT64_MAX;
   3132 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
   3133 		until = *pint;
   3134 
   3135 	since = 0;
   3136 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE)))
   3137 		since = *pint;
   3138 
   3139 	if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
   3140 		return 0;
   3141 
   3142 	for (i = 0; i < authors->count; i++) {
   3143 		author = ndb_filter_get_id_element(filter, authors, i);
   3144 
   3145 		ndb_tsid_init(&tsid, author, until);
   3146 
   3147 		k.mv_data = &tsid;
   3148 		k.mv_size = sizeof(tsid);
   3149 
   3150 		if (!ndb_cursor_start(cur, &k, &v))
   3151 			continue;
   3152 
   3153 		// for each id in our ids filter, find in the db
   3154 		while (!query_is_full(results, limit)) {
   3155 			ptsid = (struct ndb_tsid *)k.mv_data;
   3156 			note_key = *(uint64_t*)v.mv_data;
   3157 
   3158 			// don't continue the scan if we're below `since`
   3159 			if (ptsid->timestamp < since)
   3160 				break;
   3161 
   3162 			// our author should match, if not bail
   3163 			if (memcmp(author, ptsid->id, 32))
   3164 				break;
   3165 
   3166 			// fetch the note, we need it for our query results
   3167 			// and to match further against the filter
   3168 			if (!(note = ndb_get_note_by_key(txn, note_key, &note_size)))
   3169 				goto next;
   3170 
   3171 			if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_AUTHORS))
   3172 				goto next;
   3173 
   3174 			ndb_query_result_init(&res, note, note_size, note_key);
   3175 			if (!push_query_result(results, &res))
   3176 				break;
   3177 
   3178 next:
   3179 			if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
   3180 				break;
   3181 		}
   3182 	}
   3183 
   3184 	mdb_cursor_close(cur);
   3185 	return 1;
   3186 }
   3187 
   3188 static int ndb_query_plan_execute_created_at(struct ndb_txn *txn,
   3189 					     struct ndb_filter *filter,
   3190 					     struct ndb_query_results *results,
   3191 					     int limit)
   3192 {
   3193 	MDB_dbi db;
   3194 	MDB_val k, v;
   3195 	MDB_cursor *cur;
   3196 	int rc;
   3197 	struct ndb_note *note;
   3198 	struct ndb_tsid key, *pkey;
   3199 	uint64_t *pint, until, since, note_id;
   3200 	size_t note_size;
   3201 	struct ndb_query_result res;
   3202 	unsigned char high_key[32] = {0xFF};
   3203 
   3204 	db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
   3205 
   3206 	until = UINT64_MAX;
   3207 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
   3208 		until = *pint;
   3209 
   3210 	since = 0;
   3211 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE)))
   3212 		since = *pint;
   3213 
   3214 	if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
   3215 		return 0;
   3216 
   3217 	// if we have until, start there, otherwise just use max
   3218 	ndb_tsid_init(&key, high_key, until);
   3219 	k.mv_data = &key;
   3220 	k.mv_size = sizeof(key);
   3221 
   3222 	if (!ndb_cursor_start(cur, &k, &v))
   3223 		return 1;
   3224 
   3225 	while (!query_is_full(results, limit)) {
   3226 		pkey = (struct ndb_tsid *)k.mv_data;
   3227 		note_id = *(uint64_t*)v.mv_data;
   3228 		assert(v.mv_size == 8);
   3229 
   3230 		// don't continue the scan if we're below `since`
   3231 		if (pkey->timestamp < since)
   3232 			break;
   3233 
   3234 		if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
   3235 			goto next;
   3236 
   3237 		// does this entry match our filter?
   3238 		if (!ndb_filter_matches_with(filter, note, 0))
   3239 			goto next;
   3240 
   3241 		ndb_query_result_init(&res, note, (uint64_t)note_size, note_id);
   3242 		if (!push_query_result(results, &res))
   3243 			break;
   3244 next:
   3245 		if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
   3246 			break;
   3247 	}
   3248 
   3249 	mdb_cursor_close(cur);
   3250 	return 1;
   3251 }
   3252 
   3253 static int ndb_query_plan_execute_tags(struct ndb_txn *txn,
   3254 				       struct ndb_filter *filter,
   3255 				       struct ndb_query_results *results,
   3256 				       int limit)
   3257 {
   3258 	MDB_cursor *cur;
   3259 	MDB_dbi db;
   3260 	MDB_val k, v;
   3261 	int len, taglen, rc, i;
   3262 	uint64_t *pint, until, note_id;
   3263 	size_t note_size;
   3264 	unsigned char key_buffer[255];
   3265 	struct ndb_note *note;
   3266 	struct ndb_filter_elements *tags;
   3267 	unsigned char *tag;
   3268 	struct ndb_query_result res;
   3269 
   3270 	db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS];
   3271 
   3272 	if (!(tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS)))
   3273 		return 0;
   3274 
   3275 	until = UINT64_MAX;
   3276 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
   3277 		until = *pint;
   3278 
   3279 	if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
   3280 		return 0;
   3281 
   3282 	for (i = 0; i < tags->count; i++) {
   3283 		tag = ndb_filter_get_id_element(filter, tags, i);
   3284 
   3285 		taglen = tags->field.elem_type == NDB_ELEMENT_ID
   3286 		       ? 32 : strlen((const char*)tag);
   3287 
   3288 		if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer),
   3289 					       tags->field.tag, tag, taglen,
   3290 					       until)))
   3291 			return 0;
   3292 
   3293 		k.mv_data = key_buffer;
   3294 		k.mv_size = len;
   3295 
   3296 		if (!ndb_cursor_start(cur, &k, &v))
   3297 			continue;
   3298 
   3299 		// for each id in our ids filter, find in the db
   3300 		while (!query_is_full(results, limit)) {
   3301 			// check if tag value matches, bail if not
   3302 			if (((unsigned char *)k.mv_data)[0] != tags->field.tag)
   3303 				break;
   3304 
   3305 			// check if tag value matches, bail if not
   3306 			if (taglen != k.mv_size - 9)
   3307 				break;
   3308 
   3309 			if (memcmp((unsigned char *)k.mv_data+1, tag, k.mv_size-9))
   3310 				break;
   3311 
   3312 			note_id = *(uint64_t*)v.mv_data;
   3313 
   3314 			if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
   3315 				goto next;
   3316 
   3317 			if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_TAGS))
   3318 				goto next;
   3319 
   3320 			ndb_query_result_init(&res, note, note_size, note_id);
   3321 			if (!push_query_result(results, &res))
   3322 				break;
   3323 
   3324 next:
   3325 			if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
   3326 				break;
   3327 		}
   3328 	}
   3329 
   3330 	mdb_cursor_close(cur);
   3331 	return 1;
   3332 }
   3333 
   3334 static int ndb_query_plan_execute_kinds(struct ndb_txn *txn,
   3335 					struct ndb_filter *filter,
   3336 					struct ndb_query_results *results,
   3337 					int limit)
   3338 {
   3339 	MDB_cursor *cur;
   3340 	MDB_dbi db;
   3341 	MDB_val k, v;
   3342 	struct ndb_note *note;
   3343 	struct ndb_u64_ts tsid, *ptsid;
   3344 	struct ndb_filter_elements *kinds;
   3345 	struct ndb_query_result res;
   3346 	uint64_t kind, note_id, until, since, *pint;
   3347 	size_t note_size;
   3348 	int i, rc;
   3349 
   3350 	// we should have kinds in a kinds filter!
   3351 	if (!(kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS)))
   3352 		return 0;
   3353 
   3354 	until = UINT64_MAX;
   3355 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
   3356 		until = *pint;
   3357 
   3358 	since = 0;
   3359 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE)))
   3360 		since = *pint;
   3361 
   3362 	db = txn->lmdb->dbs[NDB_DB_NOTE_KIND];
   3363 
   3364 	if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
   3365 		return 0;
   3366 
   3367 	for (i = 0; i < kinds->count; i++) {
   3368 		if (query_is_full(results, limit))
   3369 			break;
   3370 
   3371 		kind = kinds->elements[i];
   3372 		ndb_debug("kind %" PRIu64 "\n", kind);
   3373 		ndb_u64_ts_init(&tsid, kind, until);
   3374 
   3375 		k.mv_data = &tsid;
   3376 		k.mv_size = sizeof(tsid);
   3377 
   3378 		if (!ndb_cursor_start(cur, &k, &v))
   3379 			continue;
   3380 
   3381 		// for each id in our ids filter, find in the db
   3382 		while (!query_is_full(results, limit)) {
   3383 			ptsid = (struct ndb_u64_ts *)k.mv_data;
   3384 			if (ptsid->u64 != kind)
   3385 				break;
   3386 
   3387 			// don't continue the scan if we're below `since`
   3388 			if (ptsid->timestamp < since)
   3389 				break;
   3390 
   3391 			note_id = *(uint64_t*)v.mv_data;
   3392 			if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
   3393 				goto next;
   3394 
   3395 			if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_KINDS))
   3396 				goto next;
   3397 
   3398 			ndb_query_result_init(&res, note, note_size, note_id);
   3399 			if (!push_query_result(results, &res))
   3400 				break;
   3401 
   3402 next:
   3403 			if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
   3404 				break;
   3405 		}
   3406 	}
   3407 
   3408 	mdb_cursor_close(cur);
   3409 	return 1;
   3410 }
   3411 
   3412 static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter)
   3413 {
   3414 	struct ndb_filter_elements *ids, *kinds, *authors, *tags;
   3415 
   3416 	ids = ndb_filter_find_elements(filter, NDB_FILTER_IDS);
   3417 	kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS);
   3418 	authors = ndb_filter_find_elements(filter, NDB_FILTER_AUTHORS);
   3419 	tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS);
   3420 
   3421 	// this is rougly similar to the heuristic in strfry's dbscan
   3422 	if (ids) {
   3423 		return NDB_PLAN_IDS;
   3424 	} else if (kinds && authors && authors->count <= 10) {
   3425 		return NDB_PLAN_AUTHOR_KINDS;
   3426 	} else if (authors && authors->count <= 10) {
   3427 		return NDB_PLAN_AUTHORS;
   3428 	} else if (tags && tags->count <= 10) {
   3429 		return NDB_PLAN_TAGS;
   3430 	} else if (kinds) {
   3431 		return NDB_PLAN_KINDS;
   3432 	}
   3433 
   3434 	return NDB_PLAN_CREATED;
   3435 }
   3436 
   3437 static const char *ndb_query_plan_name(int plan_id)
   3438 {
   3439 	switch (plan_id) {
   3440 		case NDB_PLAN_IDS:     return "ids";
   3441 		case NDB_PLAN_KINDS:   return "kinds";
   3442 		case NDB_PLAN_TAGS:    return "tags";
   3443 		case NDB_PLAN_CREATED: return "created";
   3444 		case NDB_PLAN_AUTHORS: return "authors";
   3445 	}
   3446 
   3447 	return "unknown";
   3448 }
   3449 
   3450 static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
   3451 			    struct ndb_query_result *res, int capacity,
   3452 			    int *results_out)
   3453 {
   3454 	struct ndb_query_results results;
   3455 	uint64_t limit, *pint;
   3456 	enum ndb_query_plan plan;
   3457 	limit = capacity;
   3458 
   3459 	if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT)))
   3460 		limit = *pint;
   3461 
   3462 	limit = min(capacity, limit);
   3463 	make_cursor((unsigned char *)res,
   3464 		    ((unsigned char *)res) + limit * sizeof(*res),
   3465 		    &results.cur);
   3466 
   3467 	plan = ndb_filter_plan(filter);
   3468 	ndb_debug("using query plan '%s'\n", ndb_query_plan_name(plan));
   3469 	switch (plan) {
   3470 	// We have a list of ids, just open a cursor and jump to each once
   3471 	case NDB_PLAN_IDS:
   3472 		if (!ndb_query_plan_execute_ids(txn, filter, &results, limit))
   3473 			return 0;
   3474 		break;
   3475 
   3476 	// We have just kinds, just scan the kind index
   3477 	case NDB_PLAN_KINDS:
   3478 		if (!ndb_query_plan_execute_kinds(txn, filter, &results, limit))
   3479 			return 0;
   3480 		break;
   3481 
   3482 	case NDB_PLAN_TAGS:
   3483 		if (!ndb_query_plan_execute_tags(txn, filter, &results, limit))
   3484 			return 0;
   3485 		break;
   3486 	case NDB_PLAN_CREATED:
   3487 		if (!ndb_query_plan_execute_created_at(txn, filter, &results, limit))
   3488 			return 0;
   3489 		break;
   3490 	case NDB_PLAN_AUTHORS:
   3491 		if (!ndb_query_plan_execute_authors(txn, filter, &results, limit))
   3492 			return 0;
   3493 		break;
   3494 	case NDB_PLAN_AUTHOR_KINDS:
   3495 		/* TODO: author kinds
   3496 		if (!ndb_query_plan_execute_author_kinds(txn, filter, &results, limit))
   3497 			return 0;
   3498 			*/
   3499 		if (!ndb_query_plan_execute_authors(txn, filter, &results, limit))
   3500 			return 0;
   3501 		break;
   3502 	}
   3503 
   3504 	*results_out = cursor_count(&results.cur, sizeof(*res));
   3505 	return 1;
   3506 }
   3507 
   3508 int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters,
   3509 	      struct ndb_query_result *results, int result_capacity, int *count)
   3510 {
   3511 	int i, out;
   3512 	struct ndb_query_result *p = results;
   3513 
   3514 	out = 0;
   3515 	*count = 0;
   3516 
   3517 	for (i = 0; i < num_filters; i++) {
   3518 		if (!ndb_query_filter(txn, &filters[i], p,
   3519 				      result_capacity, &out)) {
   3520 			return 0;
   3521 		}
   3522 
   3523 		*count += out;
   3524 		p += out;
   3525 		result_capacity -= out;
   3526 		if (result_capacity <= 0)
   3527 			break;
   3528 	}
   3529 
   3530 	// sort results
   3531 	qsort(results, *count, sizeof(*results), compare_query_results);
   3532 	return 1;
   3533 }
   3534 
   3535 static int ndb_write_note_tag_index(struct ndb_txn *txn, struct ndb_note *note,
   3536 				    uint64_t note_key)
   3537 {
   3538 	unsigned char key_buffer[255];
   3539 	struct ndb_iterator iter;
   3540 	struct ndb_str tkey, tval;
   3541 	char tchar;
   3542 	int len, rc;
   3543 	MDB_val key, val;
   3544 	MDB_dbi tags_db;
   3545 
   3546 	tags_db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS];
   3547 
   3548 	ndb_tags_iterate_start(note, &iter);
   3549 
   3550 	while (ndb_tags_iterate_next(&iter)) {
   3551 		if (iter.tag->count < 2)
   3552 			continue;
   3553 
   3554 		tkey = ndb_tag_str(note, iter.tag, 0);
   3555 
   3556 		// we only write indices for 1-char tags.
   3557 		tchar = tkey.str[0];
   3558 		if (tchar == 0 || tkey.str[1] != 0)
   3559 			continue;
   3560 
   3561 		tval = ndb_tag_str(note, iter.tag, 1);
   3562 		len = ndb_str_len(&tval);
   3563 
   3564 		if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer),
   3565 					       tchar, tval.id, (unsigned char)len,
   3566 					       ndb_note_created_at(note)))) {
   3567 			// this will fail when we try to encode a key that is
   3568 			// too big
   3569 			continue;
   3570 		}
   3571 
   3572 		//ndb_debug("writing tag '%c':'data:%d' to index\n", tchar, len);
   3573 
   3574 		key.mv_data = key_buffer;
   3575 		key.mv_size = len;
   3576 
   3577 		val.mv_data = &note_key;
   3578 		val.mv_size = sizeof(note_key);
   3579 
   3580 		if ((rc = mdb_put(txn->mdb_txn, tags_db, &key, &val, 0))) {
   3581 			ndb_debug("write note tag index to db failed: %s\n",
   3582 					mdb_strerror(rc));
   3583 			return 0;
   3584 		}
   3585 	}
   3586 
   3587 	return 1;
   3588 }
   3589 
   3590 static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note,
   3591 				     uint64_t note_key)
   3592 {
   3593 	struct ndb_u64_ts tsid;
   3594 	int rc;
   3595 	MDB_val key, val;
   3596 	MDB_dbi kind_db;
   3597 
   3598 	ndb_u64_ts_init(&tsid, note->kind, note->created_at);
   3599 
   3600 	key.mv_data = &tsid;
   3601 	key.mv_size = sizeof(tsid);
   3602 	val.mv_data = &note_key;
   3603 	val.mv_size = sizeof(note_key);
   3604 
   3605 	kind_db = txn->lmdb->dbs[NDB_DB_NOTE_KIND];
   3606 
   3607 	if ((rc = mdb_put(txn->mdb_txn, kind_db, &key, &val, 0))) {
   3608 		ndb_debug("write note kind index to db failed: %s\n",
   3609 				mdb_strerror(rc));
   3610 		return 0;
   3611 	}
   3612 
   3613 	return 1;
   3614 }
   3615 
   3616 static int ndb_write_word_to_index(struct ndb_txn *txn, const char *word,
   3617 				   int word_len, int word_index,
   3618 				   uint64_t timestamp, uint64_t note_id)
   3619 {
   3620 	// cap to some reasonable key size
   3621 	unsigned char buffer[1024];
   3622 	int keysize, rc;
   3623 	MDB_val k, v;
   3624 	MDB_dbi text_db;
   3625 
   3626 	// build our compressed text index key
   3627 	if (!ndb_make_text_search_key(buffer, sizeof(buffer), word_index,
   3628 				      word_len, word, timestamp, note_id,
   3629 				      &keysize)) {
   3630 		// probably too big
   3631 
   3632 		return 0;
   3633 	}
   3634 
   3635 	k.mv_data = buffer;
   3636 	k.mv_size = keysize;
   3637 
   3638 	v.mv_data = NULL;
   3639 	v.mv_size = 0;
   3640 
   3641 	text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT];
   3642 
   3643 	if ((rc = mdb_put(txn->mdb_txn, text_db, &k, &v, 0))) {
   3644 		ndb_debug("write note text index to db failed: %s\n",
   3645 				mdb_strerror(rc));
   3646 		return 0;
   3647 	}
   3648 
   3649 	return 1;
   3650 }
   3651 
   3652 
   3653 
   3654 // break a string into individual words for querying or for building the
   3655 // fulltext search index. This is callback based so we don't need to
   3656 // build up an intermediate structure
   3657 static int ndb_parse_words(struct cursor *cur, void *ctx, ndb_word_parser_fn fn)
   3658 {
   3659 	int word_len, words;
   3660 	const char *word;
   3661 
   3662 	words = 0;
   3663 
   3664 	while (cur->p < cur->end) {
   3665 		consume_whitespace_or_punctuation(cur);
   3666 		if (cur->p >= cur->end)
   3667 			break;
   3668 		word = (const char *)cur->p;
   3669 
   3670 		if (!consume_until_boundary(cur))
   3671 			break;
   3672 
   3673 		// start of word or end
   3674 		word_len = cur->p - (unsigned char *)word;
   3675 		if (word_len == 0 && cur->p >= cur->end)
   3676 			break;
   3677 
   3678 		if (word_len == 0) {
   3679 			if (!cursor_skip(cur, 1))
   3680 				break;
   3681 			continue;
   3682 		}
   3683 
   3684 		//ndb_debug("writing word index '%.*s'\n", word_len, word);
   3685 
   3686 		if (!fn(ctx, word, word_len, words))
   3687 			continue;
   3688 
   3689 		words++;
   3690 	}
   3691 
   3692 	return 1;
   3693 }
   3694 
   3695 struct ndb_word_writer_ctx
   3696 {
   3697 	struct ndb_txn *txn;
   3698 	struct ndb_note *note;
   3699 	uint64_t note_id;
   3700 };
   3701 
   3702 static int ndb_fulltext_word_writer(void *ctx,
   3703 		const char *word, int word_len, int words)
   3704 {
   3705 	struct ndb_word_writer_ctx *wctx = ctx;
   3706 
   3707 	if (!ndb_write_word_to_index(wctx->txn, word, word_len, words,
   3708 				     wctx->note->created_at, wctx->note_id)) {
   3709 		// too big to write this one, just skip it
   3710 		ndb_debug("failed to write word '%.*s' to index\n", word_len, word);
   3711 
   3712 		return 0;
   3713 	}
   3714 
   3715 	//fprintf(stderr, "wrote '%.*s' to note text index\n", word_len, word);
   3716 	return 1;
   3717 }
   3718 
   3719 static int ndb_write_note_fulltext_index(struct ndb_txn *txn,
   3720 					 struct ndb_note *note,
   3721 					 uint64_t note_id)
   3722 {
   3723 	struct cursor cur;
   3724 	unsigned char *content;
   3725 	struct ndb_str str;
   3726 	struct ndb_word_writer_ctx ctx;
   3727 
   3728 	str = ndb_note_str(note, &note->content);
   3729 	// I don't think this should happen?
   3730 	if (unlikely(str.flag == NDB_PACKED_ID))
   3731 		return 0;
   3732 
   3733 	content = (unsigned char *)str.str;
   3734 
   3735 	make_cursor(content, content + note->content_length, &cur);
   3736 
   3737 	ctx.txn = txn;
   3738 	ctx.note = note;
   3739 	ctx.note_id = note_id;
   3740 
   3741 	ndb_parse_words(&cur, &ctx, ndb_fulltext_word_writer);
   3742 
   3743 	return 1;
   3744 }
   3745 
   3746 static int ndb_parse_search_words(void *ctx, const char *word_str, int word_len, int word_index)
   3747 {
   3748 	(void)word_index;
   3749 	struct ndb_search_words *words = ctx;
   3750 	struct ndb_word *word;
   3751 
   3752 	if (words->num_words + 1 > MAX_TEXT_SEARCH_WORDS)
   3753 		return 0;
   3754 
   3755 	word = &words->words[words->num_words++];
   3756 	word->word = word_str;
   3757 	word->word_len = word_len;
   3758 
   3759 	return 1;
   3760 }
   3761 
   3762 static void ndb_search_words_init(struct ndb_search_words *words)
   3763 {
   3764 	words->num_words = 0;
   3765 }
   3766 
   3767 static int prefix_count(const char *str1, int len1, const char *str2, int len2) {
   3768 	int i, count = 0;
   3769 	int min_len = len1 < len2 ? len1 : len2;
   3770 
   3771 	for (i = 0; i < min_len; i++) {
   3772 		// case insensitive
   3773 		if (tolower(str1[i]) == tolower(str2[i]))
   3774 			count++;
   3775 		else
   3776 			break;
   3777 	}
   3778 
   3779 	return count;
   3780 }
   3781 
   3782 static int ndb_prefix_matches(struct ndb_text_search_result *result,
   3783 			      struct ndb_word *search_word)
   3784 {
   3785 	// Empty strings shouldn't happen but let's
   3786 	if (result->key.str_len < 2 || search_word->word_len < 2)
   3787 		return 0;
   3788 
   3789 	// make sure we at least have two matching prefix characters. exact
   3790 	// matches are nice but range searches allow us to match prefixes as
   3791 	// well. A double-char prefix is suffient, but maybe we could up this
   3792 	// in the future.
   3793 	//
   3794 	// TODO: How are we handling utf-8 prefix matches like
   3795 	// japanese?
   3796 	//
   3797 	if (   result->key.str[0] != tolower(search_word->word[0])
   3798 	    && result->key.str[1] != tolower(search_word->word[1])
   3799 	    )
   3800 		return 0;
   3801 
   3802 	// count the number of prefix-matched characters. This will be used
   3803 	// for ranking search results
   3804 	result->prefix_chars = prefix_count(result->key.str,
   3805 					    result->key.str_len,
   3806 					    search_word->word,
   3807 					    search_word->word_len);
   3808 
   3809 	if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5))
   3810 		return 0;
   3811 
   3812 	return 1;
   3813 }
   3814 
   3815 // This is called when scanning the full text search index. Scanning stops
   3816 // when we no longer have a prefix match for the word
   3817 static int ndb_text_search_next_word(MDB_cursor *cursor, MDB_cursor_op op,
   3818 	MDB_val *k, struct ndb_word *search_word,
   3819 	struct ndb_text_search_result *last_result,
   3820 	struct ndb_text_search_result *result,
   3821 	MDB_cursor_op order_op)
   3822 {
   3823 	struct cursor key_cursor;
   3824 	//struct ndb_text_search_key search_key;
   3825 	MDB_val v;
   3826 	int retries;
   3827 	retries = -1;
   3828 
   3829 	make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor);
   3830 
   3831 	// When op is MDB_SET_RANGE, this initializes the search. Position
   3832 	// the cursor at the next key greater than or equal to the specified
   3833 	// key.
   3834 	//
   3835 	// Subsequent searches should use MDB_NEXT
   3836 	if (mdb_cursor_get(cursor, k, &v, op)) {
   3837 		// we should only do this if we're going in reverse
   3838 		if (op == MDB_SET_RANGE && order_op == MDB_PREV) {
   3839 			// if set range worked and our key exists, it should be
   3840 			// the one right before this one
   3841 			if (mdb_cursor_get(cursor, k, &v, MDB_PREV))
   3842 				return 0;
   3843 		} else {
   3844 			return 0;
   3845 		}
   3846 	}
   3847 
   3848 retry:
   3849 	retries++;
   3850 	/*
   3851 	printf("continuing from ");
   3852 	if (ndb_unpack_text_search_key(k->mv_data, k->mv_size, &search_key)) {
   3853 		ndb_print_text_search_key(&search_key);
   3854 	} else { printf("??"); }
   3855 	printf("\n");
   3856 	*/
   3857 
   3858 	make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor);
   3859 
   3860 	if (unlikely(!ndb_unpack_text_search_key_noteid(&key_cursor, &result->key.note_id))) {
   3861 		fprintf(stderr, "UNUSUAL: failed to unpack text search key note_id\n");
   3862 		return 0;
   3863 	}
   3864 
   3865 	if (last_result) {
   3866 		if (last_result->key.note_id != result->key.note_id)
   3867 			return 0;
   3868 	}
   3869 
   3870 	// On success, this could still be not related at all.
   3871 	// It could just be adjacent to the word. Let's check
   3872 	// if we have a matching prefix at least.
   3873 
   3874 	// Before we unpack the entire key, let's quickly
   3875 	// unpack just the string to check the prefix. We don't
   3876 	// need to unpack the entire key if the prefix doesn't
   3877 	// match
   3878 	if (!ndb_unpack_text_search_key_string(&key_cursor,
   3879 					       &result->key.str,
   3880 					       &result->key.str_len)) {
   3881 		// this should never happen
   3882 		fprintf(stderr, "UNUSUAL: failed to unpack text search key string\n");
   3883 		return 0;
   3884 	}
   3885 
   3886 	if (!ndb_prefix_matches(result, search_word)) {
   3887 		/*
   3888 		printf("result prefix '%.*s' didn't match search word '%.*s'\n",
   3889 			result->key.str_len, result->key.str,
   3890 			search_word->word_len, search_word->word);
   3891 			*/
   3892 		// we should only do this if we're going in reverse
   3893 		if (retries == 0 && op == MDB_SET_RANGE && order_op == MDB_PREV) {
   3894 			// if set range worked and our key exists, it should be
   3895 			// the one right before this one
   3896 			mdb_cursor_get(cursor, k, &v, MDB_PREV);
   3897 			goto retry;
   3898 		} else {
   3899 			return 0;
   3900 		}
   3901 	}
   3902 
   3903 	// Unpack the remaining text search key, we will need this information
   3904 	// when building up our search results.
   3905 	if (!ndb_unpack_remaining_text_search_key(&key_cursor, &result->key)) {
   3906 		// This should never happen
   3907 		fprintf(stderr, "UNUSUAL: failed to unpack text search key\n");
   3908 		return 0;
   3909 	}
   3910 
   3911 			/*
   3912 	if (last_result) {
   3913 		if (result->key.word_index < last_result->key.word_index) {
   3914 			fprintf(stderr, "skipping '%.*s' because it is before last result '%.*s'\n",
   3915 					result->key.str_len, result->key.str,
   3916 					last_result->key.str_len, last_result->key.str);
   3917 			return 0;
   3918 		}
   3919 	}
   3920 					*/
   3921 
   3922 	return 1;
   3923 }
   3924 
   3925 static void ndb_text_search_results_init(
   3926 		struct ndb_text_search_results *results) {
   3927 	results->num_results = 0;
   3928 }
   3929 
   3930 void ndb_default_text_search_config(struct ndb_text_search_config *cfg)
   3931 {
   3932 	cfg->order = NDB_ORDER_DESCENDING;
   3933 	cfg->limit = MAX_TEXT_SEARCH_RESULTS;
   3934 }
   3935 
   3936 void ndb_text_search_config_set_order(struct ndb_text_search_config *cfg,
   3937 				     enum ndb_search_order order)
   3938 {
   3939 	cfg->order = order;
   3940 }
   3941 
   3942 void ndb_text_search_config_set_limit(struct ndb_text_search_config *cfg, int limit)
   3943 {
   3944 	cfg->limit = limit;
   3945 }
   3946 
   3947 int ndb_text_search(struct ndb_txn *txn, const char *query,
   3948 		    struct ndb_text_search_results *results,
   3949 		    struct ndb_text_search_config *config)
   3950 {
   3951 	unsigned char buffer[1024], *buf;
   3952 	unsigned char saved_buf[1024], *saved;
   3953 	struct ndb_text_search_result *result, *last_result;
   3954 	struct ndb_text_search_result candidate, last_candidate;
   3955 	struct ndb_search_words search_words;
   3956 	//struct ndb_text_search_key search_key;
   3957 	struct ndb_word *search_word;
   3958 	struct cursor cur;
   3959 	ndb_text_search_key_order_fn key_order_fn;
   3960 	MDB_dbi text_db;
   3961 	MDB_cursor *cursor;
   3962 	MDB_val k, v;
   3963 	int i, j, keysize, saved_size, limit;
   3964 	MDB_cursor_op op, order_op;
   3965 
   3966 	saved = NULL;
   3967 	ndb_text_search_results_init(results);
   3968 	ndb_search_words_init(&search_words);
   3969 
   3970 	// search config
   3971 	limit = MAX_TEXT_SEARCH_RESULTS;
   3972 	order_op = MDB_PREV;
   3973 	key_order_fn = ndb_make_text_search_key_high;
   3974 	if (config) {
   3975 		if (config->order == NDB_ORDER_ASCENDING) {
   3976 			order_op = MDB_NEXT;
   3977 			key_order_fn = ndb_make_text_search_key_low;
   3978 		}
   3979 		limit = min(limit, config->limit);
   3980 	}
   3981 	// end search config
   3982 
   3983 	text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT];
   3984 	make_cursor((unsigned char *)query, (unsigned char *)query + strlen(query), &cur);
   3985 
   3986 	ndb_parse_words(&cur, &search_words, ndb_parse_search_words);
   3987 	if (search_words.num_words == 0)
   3988 		return 0;
   3989 
   3990 	if ((i = mdb_cursor_open(txn->mdb_txn, text_db, &cursor))) {
   3991 		fprintf(stderr, "nd_text_search: mdb_cursor_open failed, error %d\n", i);
   3992 		return 0;
   3993 	}
   3994 
   3995 	// for each word, we recursively find all of the submatches
   3996 	while (results->num_results < limit) {
   3997 		last_result = NULL;
   3998 		result = &results->results[results->num_results];
   3999 
   4000 		// if we have saved, then we continue from the last root search
   4001 		// sequence
   4002 		if (saved) {
   4003 			buf = saved_buf;
   4004 			saved = NULL;
   4005 			keysize = saved_size;
   4006 
   4007 			k.mv_data = buf;
   4008 			k.mv_size = saved_size;
   4009 
   4010 			// reposition the cursor so we can continue
   4011 			if (mdb_cursor_get(cursor, &k, &v, MDB_SET_RANGE))
   4012 				break;
   4013 
   4014 			op = order_op;
   4015 		} else {
   4016 			// construct a packed fulltext search key using this
   4017 			// word this key doesn't contain any timestamp or index
   4018 			// info, so it should range match instead of exact
   4019 			// match
   4020 			if (!key_order_fn(buffer, sizeof(buffer),
   4021 					  search_words.words[0].word_len,
   4022 					  search_words.words[0].word, &keysize))
   4023 			{
   4024 				// word is too big to fit in 1024-sized key
   4025 				continue;
   4026 			}
   4027 
   4028 			buf = buffer;
   4029 			op = MDB_SET_RANGE;
   4030 		}
   4031 
   4032 		for (j = 0; j < search_words.num_words; j++) {
   4033 			search_word = &search_words.words[j];
   4034 
   4035 			// shouldn't happen but let's be defensive a bit
   4036 			if (search_word->word_len == 0)
   4037 				continue;
   4038 
   4039 			// if we already matched a note in this phrase, make
   4040 			// sure we're including the note id in the query
   4041 			if (last_result) {
   4042 				// we are narrowing down a search.
   4043 				// if we already have this note id, just continue
   4044 				for (i = 0; i < results->num_results; i++) {
   4045 					if (results->results[i].key.note_id == last_result->key.note_id)
   4046 						goto cont;
   4047 				}
   4048 
   4049 				if (!ndb_make_noted_text_search_key(
   4050 					buffer, sizeof(buffer),
   4051 					search_word->word_len,
   4052 					search_word->word,
   4053 					last_result->key.timestamp,
   4054 					last_result->key.note_id,
   4055 					&keysize))
   4056 				{
   4057 					continue;
   4058 				}
   4059 
   4060 				buf = buffer;
   4061 			}
   4062 
   4063 			k.mv_data = buf;
   4064 			k.mv_size = keysize;
   4065 
   4066 			if (!ndb_text_search_next_word(cursor, op, &k,
   4067 						       search_word,
   4068 						       last_result,
   4069 						       &candidate,
   4070 						       order_op)) {
   4071 				break;
   4072 			}
   4073 
   4074 			*result = candidate;
   4075 			op = MDB_SET_RANGE;
   4076 
   4077 			// save the first key match, since we will continue from
   4078 			// this on the next root word result
   4079 			if (j == 0 && !saved) {
   4080 				memcpy(saved_buf, k.mv_data, k.mv_size);
   4081 				saved = saved_buf;
   4082 				saved_size = k.mv_size;
   4083 			}
   4084 
   4085 			last_candidate = *result;
   4086 			last_result = &last_candidate;
   4087 		}
   4088 
   4089 cont:
   4090 		// we matched all of the queries!
   4091 		if (j == search_words.num_words) {
   4092 			results->num_results++;
   4093 		} else if (j == 0) {
   4094 			break;
   4095 		}
   4096 
   4097 	}
   4098 
   4099 	mdb_cursor_close(cursor);
   4100 
   4101 	return 1;
   4102 }
   4103 
   4104 static void ndb_write_blocks(struct ndb_txn *txn, uint64_t note_key,
   4105 			     struct ndb_blocks *blocks)
   4106 {
   4107 	int rc;
   4108 	MDB_val key, val;
   4109 
   4110 	// make sure we're not writing the owned flag to the db
   4111 	blocks->flags &= ~NDB_BLOCK_FLAG_OWNED;
   4112 
   4113 	key.mv_data = &note_key;
   4114 	key.mv_size = sizeof(note_key);
   4115 	val.mv_data = blocks;
   4116 	val.mv_size = ndb_blocks_total_size(blocks);
   4117 	assert((val.mv_size % 8) == 0);
   4118 
   4119 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_BLOCKS], &key, &val, 0))) {
   4120 		ndb_debug("write version to note_blocks failed: %s\n",
   4121 				mdb_strerror(rc));
   4122 		return;
   4123 	}
   4124 }
   4125 
   4126 static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note,
   4127 				uint64_t note_key, unsigned char *scratch,
   4128 				size_t scratch_size)
   4129 {
   4130 	size_t content_len;
   4131 	const char *content;
   4132 	struct ndb_blocks *blocks;
   4133 
   4134 	content_len = ndb_note_content_length(note);
   4135 	content = ndb_note_content(note);
   4136 
   4137 	if (!ndb_parse_content(scratch, scratch_size, content, content_len, &blocks)) {
   4138 		//ndb_debug("failed to parse content '%.*s'\n", content_len, content);
   4139 		return 0;
   4140 	}
   4141 
   4142 	ndb_write_blocks(txn, note_key, blocks);
   4143 	return 1;
   4144 }
   4145 
   4146 static uint64_t ndb_write_note(struct ndb_txn *txn,
   4147 			       struct ndb_writer_note *note,
   4148 			       unsigned char *scratch, size_t scratch_size,
   4149 			       uint32_t ndb_flags)
   4150 {
   4151 	int rc;
   4152 	uint64_t note_key;
   4153 	MDB_dbi note_db;
   4154 	MDB_val key, val;
   4155 
   4156 	// let's quickly sanity check if we already have this note
   4157 	if (ndb_get_notekey_by_id(txn, note->note->id))
   4158 		return 0;
   4159 
   4160 	// get dbs
   4161 	note_db = txn->lmdb->dbs[NDB_DB_NOTE];
   4162 
   4163 	// get new key
   4164 	note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1;
   4165 
   4166 	// write note to event store
   4167 	key.mv_data = &note_key;
   4168 	key.mv_size = sizeof(note_key);
   4169 	val.mv_data = note->note;
   4170 	val.mv_size = note->note_len;
   4171 
   4172 	if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) {
   4173 		ndb_debug("write note to db failed: %s\n", mdb_strerror(rc));
   4174 		return 0;
   4175 	}
   4176 
   4177 	ndb_write_note_id_index(txn, note->note, note_key);
   4178 	ndb_write_note_kind_index(txn, note->note, note_key);
   4179 	ndb_write_note_tag_index(txn, note->note, note_key);
   4180 	ndb_write_note_pubkey_index(txn, note->note, note_key);
   4181 	ndb_write_note_pubkey_kind_index(txn, note->note, note_key);
   4182 
   4183 	// only parse content and do fulltext index on text and longform notes
   4184 	if (note->note->kind == 1 || note->note->kind == 30023) {
   4185 		if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_FULLTEXT)) {
   4186 			if (!ndb_write_note_fulltext_index(txn, note->note, note_key))
   4187 				return 0;
   4188 		}
   4189 
   4190 		// write note blocks
   4191 		if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_NOTE_BLOCKS)) {
   4192 			ndb_write_new_blocks(txn, note->note, note_key, scratch, scratch_size);
   4193 		}
   4194 	}
   4195 
   4196 	if (note->note->kind == 7 && !ndb_flag_set(ndb_flags, NDB_FLAG_NO_STATS)) {
   4197 		ndb_write_reaction_stats(txn, note->note);
   4198 	}
   4199 
   4200 	return note_key;
   4201 }
   4202 
   4203 static void ndb_monitor_lock(struct ndb_monitor *mon) {
   4204 	pthread_mutex_lock(&mon->mutex);
   4205 }
   4206 
   4207 static void ndb_monitor_unlock(struct ndb_monitor *mon) {
   4208 	pthread_mutex_unlock(&mon->mutex);
   4209 }
   4210 
   4211 struct written_note {
   4212 	uint64_t note_id;
   4213 	struct ndb_writer_note *note;
   4214 };
   4215 
   4216 // When the data has been committed to the database, take all of the written
   4217 // notes, check them against subscriptions, and then write to the subscription
   4218 // inbox for all matching notes
   4219 static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
   4220 				     struct written_note *wrote, int num_notes)
   4221 {
   4222 	int i, k;
   4223 	int pushed;
   4224 	struct written_note *written;
   4225 	struct ndb_note *note;
   4226 	struct ndb_subscription *sub;
   4227 
   4228 	ndb_monitor_lock(monitor);
   4229 
   4230 	for (i = 0; i < monitor->num_subscriptions; i++) {
   4231 		sub = &monitor->subscriptions[i];
   4232 		ndb_debug("checking subscription %d, %d notes\n", i, num_notes);
   4233 
   4234 		pushed = 0;
   4235 		for (k = 0; k < num_notes; k++) {
   4236 			written = &wrote[k];
   4237 			note = written->note->note;
   4238 
   4239 			if (ndb_filter_group_matches(&sub->group, note)) {
   4240 				ndb_debug("pushing note\n");
   4241 
   4242 				if (!prot_queue_push(&sub->inbox, &written->note_id)) {
   4243 					ndb_debug("couldn't push note to subscriber");
   4244 				} else {
   4245 					pushed++;
   4246 				}
   4247 			} else {
   4248 				ndb_debug("not pushing note\n");
   4249 			}
   4250 		}
   4251 
   4252 		// After pushing all of the matching notes, check to see if we
   4253 		// have a registered subscription callback. If so, we call it.
   4254 		// The callback needs to call ndb_poll_for_notes to pull data
   4255 		// that was just pushed to the queue in the for loop above.
   4256 		if (monitor->sub_cb != NULL && pushed > 0) {
   4257 			monitor->sub_cb(monitor->sub_cb_ctx, sub->subid);
   4258 		}
   4259 	}
   4260 
   4261 	ndb_monitor_unlock(monitor);
   4262 }
   4263 
   4264 uint64_t ndb_write_note_and_profile(
   4265 		struct ndb_txn *txn,
   4266 		struct ndb_writer_profile *profile,
   4267 		unsigned char *scratch,
   4268 		size_t scratch_size,
   4269 		uint32_t ndb_flags)
   4270 {
   4271 	uint64_t note_nkey;
   4272 
   4273 	note_nkey = ndb_write_note(txn, &profile->note, scratch, scratch_size, ndb_flags);
   4274 
   4275 	if (profile->record.builder) {
   4276 		// only write if parsing didn't fail
   4277 		ndb_write_profile(txn, profile, note_nkey);
   4278 	}
   4279 
   4280 	return note_nkey;
   4281 }
   4282 
   4283 // only to be called from the writer thread
   4284 static int ndb_write_version(struct ndb_txn *txn, uint64_t version)
   4285 {
   4286 	int rc;
   4287 	MDB_val key, val;
   4288 	uint64_t version_key;
   4289 
   4290 	version_key = NDB_META_KEY_VERSION;
   4291 
   4292 	key.mv_data = &version_key;
   4293 	key.mv_size = sizeof(version_key);
   4294 	val.mv_data = &version;
   4295 	val.mv_size = sizeof(version);
   4296 
   4297 	if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) {
   4298 		ndb_debug("write version to ndb_meta failed: %s\n",
   4299 				mdb_strerror(rc));
   4300 		return 0;
   4301 	}
   4302 
   4303 	//fprintf(stderr, "writing version %" PRIu64 "\n", version);
   4304 	return 1;
   4305 }
   4306 
   4307 
   4308 static int ndb_run_migrations(struct ndb_txn *txn)
   4309 {
   4310 	int64_t version, latest_version, i;
   4311 
   4312 	latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]);
   4313 
   4314 	if ((version = ndb_db_version(txn)) == -1) {
   4315 		ndb_debug("run_migrations: no version found, assuming new db\n");
   4316 		version = latest_version;
   4317 
   4318 		// no version found. fresh db?
   4319 		if (!ndb_write_version(txn, version)) {
   4320 			fprintf(stderr, "run_migrations: failed writing db version");
   4321 			return 0;
   4322 		}
   4323 
   4324 		return 1;
   4325 	} else {
   4326 		ndb_debug("ndb: version %" PRIu64 " found\n", version);
   4327 	}
   4328 
   4329 	if (version < latest_version)
   4330 		fprintf(stderr, "nostrdb: migrating v%d -> v%d\n",
   4331 				(int)version, (int)latest_version);
   4332 
   4333 	for (i = version; i < latest_version; i++) {
   4334 		if (!MIGRATIONS[i].fn(txn)) {
   4335 			fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1));
   4336 			return 0;
   4337 		}
   4338 
   4339 		if (!ndb_write_version(txn, i+1)) {
   4340 			fprintf(stderr, "run_migrations: failed writing db version");
   4341 			return 0;
   4342 		}
   4343 
   4344 		version = i+1;
   4345 	}
   4346 
   4347 	return 1;
   4348 }
   4349 
   4350 
   4351 static void *ndb_writer_thread(void *data)
   4352 {
   4353 	ndb_debug("started writer thread\n");
   4354 	struct ndb_writer *writer = data;
   4355 	struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
   4356 	struct written_note written_notes[THREAD_QUEUE_BATCH];
   4357 	size_t scratch_size;
   4358 	int i, popped, done, needs_commit, num_notes;
   4359 	uint64_t note_nkey;
   4360 	struct ndb_txn txn;
   4361 	unsigned char *scratch;
   4362 
   4363 	// 8mb scratch buffer for parsing note content
   4364 	scratch_size = 8 * 1024 * 1024;
   4365 	scratch = malloc(scratch_size);
   4366 	MDB_txn *mdb_txn = NULL;
   4367 	ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn);
   4368 
   4369 	done = 0;
   4370 	while (!done) {
   4371 		txn.mdb_txn = NULL;
   4372 		num_notes = 0;
   4373 		ndb_debug("writer waiting for items\n");
   4374 		popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
   4375 		ndb_debug("writer popped %d items\n", popped);
   4376 
   4377 		needs_commit = 0;
   4378 		for (i = 0 ; i < popped; i++) {
   4379 			msg = &msgs[i];
   4380 			switch (msg->type) {
   4381 			case NDB_WRITER_NOTE: needs_commit = 1; break;
   4382 			case NDB_WRITER_PROFILE: needs_commit = 1; break;
   4383 			case NDB_WRITER_DBMETA: needs_commit = 1; break;
   4384 			case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break;
   4385 			case NDB_WRITER_BLOCKS: needs_commit = 1; break;
   4386 			case NDB_WRITER_MIGRATE: needs_commit = 1; break;
   4387 			case NDB_WRITER_QUIT: break;
   4388 			}
   4389 		}
   4390 
   4391 		if (needs_commit && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn))
   4392 		{
   4393 			fprintf(stderr, "writer thread txn_begin failed");
   4394 			// should definitely not happen unless DB is full
   4395 			// or something ?
   4396 			continue;
   4397 		}
   4398 
   4399 		for (i = 0; i < popped; i++) {
   4400 			msg = &msgs[i];
   4401 
   4402 			switch (msg->type) {
   4403 			case NDB_WRITER_QUIT:
   4404 				// quits are handled before this
   4405 				ndb_debug("writer thread got quit message\n");
   4406 				done = 1;
   4407 				continue;
   4408 			case NDB_WRITER_PROFILE:
   4409 				note_nkey =
   4410 					ndb_write_note_and_profile(
   4411 						&txn,
   4412 						&msg->profile,
   4413 						scratch,
   4414 						scratch_size,
   4415 						writer->ndb_flags);
   4416 
   4417 				if (note_nkey > 0) {
   4418 					written_notes[num_notes++] =
   4419 					(struct written_note){
   4420 						.note_id = note_nkey,
   4421 						.note = &msg->profile.note,
   4422 					};
   4423 				} else {
   4424 					ndb_debug("failed to write note\n");
   4425 				}
   4426 				break;
   4427 			case NDB_WRITER_NOTE:
   4428 				note_nkey = ndb_write_note(&txn, &msg->note,
   4429 							   scratch,
   4430 							   scratch_size,
   4431 							   writer->ndb_flags);
   4432 
   4433 				if (note_nkey > 0) {
   4434 					written_notes[num_notes++] = (struct written_note){
   4435 						.note_id = note_nkey,
   4436 						.note = &msg->note,
   4437 					};
   4438 				}
   4439 				break;
   4440 			case NDB_WRITER_DBMETA:
   4441 				ndb_write_version(&txn, msg->ndb_meta.version);
   4442 				break;
   4443 			case NDB_WRITER_BLOCKS:
   4444 				ndb_write_blocks(&txn, msg->blocks.note_key,
   4445 						       msg->blocks.blocks);
   4446 				break;
   4447 			case NDB_WRITER_MIGRATE:
   4448 				if (!ndb_run_migrations(&txn)) {
   4449 					mdb_txn_abort(txn.mdb_txn);
   4450 					goto bail;
   4451 				}
   4452 				break;
   4453 			case NDB_WRITER_PROFILE_LAST_FETCH:
   4454 				ndb_writer_last_profile_fetch(&txn,
   4455 						msg->last_fetch.pubkey,
   4456 						msg->last_fetch.fetched_at
   4457 						);
   4458 				break;
   4459 			}
   4460 		}
   4461 
   4462 		// commit writes
   4463 		if (needs_commit) {
   4464 			if (!ndb_end_query(&txn)) {
   4465 				ndb_debug("writer thread txn commit failed\n");
   4466 			} else {
   4467 				ndb_debug("notifying subscriptions, %d notes\n", num_notes);
   4468 				ndb_notify_subscriptions(writer->monitor,
   4469 							 written_notes,
   4470 							 num_notes);
   4471 				// update subscriptions
   4472 			}
   4473 		}
   4474 
   4475 		// free notes
   4476 		for (i = 0; i < popped; i++) {
   4477 			msg = &msgs[i];
   4478 			if (msg->type == NDB_WRITER_NOTE) {
   4479 				free(msg->note.note);
   4480 			} else if (msg->type == NDB_WRITER_PROFILE) {
   4481 				free(msg->profile.note.note);
   4482 				//ndb_profile_record_builder_free(&msg->profile.record);
   4483 			}  else if (msg->type == NDB_WRITER_BLOCKS) {
   4484 				ndb_blocks_free(msg->blocks.blocks);
   4485 			}
   4486 		}
   4487 	}
   4488 
   4489 bail:
   4490 	free(scratch);
   4491 	ndb_debug("quitting writer thread\n");
   4492 	return NULL;
   4493 }
   4494 
   4495 static void *ndb_ingester_thread(void *data)
   4496 {
   4497 	secp256k1_context *ctx;
   4498 	struct thread *thread = data;
   4499 	struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx;
   4500 	struct ndb_lmdb *lmdb = ingester->lmdb;
   4501 	struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg;
   4502 	struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out;
   4503 	int i, to_write, popped, done, any_event;
   4504 	MDB_txn *read_txn = NULL;
   4505 	int rc;
   4506 
   4507 	ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
   4508 	ndb_debug("started ingester thread\n");
   4509 
   4510 	done = 0;
   4511 	while (!done) {
   4512 		to_write = 0;
   4513 		any_event = 0;
   4514 
   4515 		popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH);
   4516 		ndb_debug("ingester popped %d items\n", popped);
   4517 
   4518 		for (i = 0; i < popped; i++) {
   4519 			msg = &msgs[i];
   4520 			if (msg->type == NDB_INGEST_EVENT) {
   4521 				any_event = 1;
   4522 				break;
   4523 			}
   4524 		}
   4525 
   4526 		if (any_event && (rc = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn))) {
   4527 			// this is bad
   4528 			fprintf(stderr, "UNUSUAL ndb_ingester: mdb_txn_begin failed: '%s'\n",
   4529 					mdb_strerror(rc));
   4530 			continue;
   4531 		}
   4532 
   4533 		for (i = 0; i < popped; i++) {
   4534 			msg = &msgs[i];
   4535 			switch (msg->type) {
   4536 			case NDB_INGEST_QUIT:
   4537 				done = 1;
   4538 				break;
   4539 
   4540 			case NDB_INGEST_EVENT:
   4541 				out = &outs[to_write];
   4542 				if (ndb_ingester_process_event(ctx, ingester,
   4543 							       &msg->event, out,
   4544 							       read_txn)) {
   4545 					to_write++;
   4546 				}
   4547 			}
   4548 		}
   4549 
   4550 		if (any_event)
   4551 			mdb_txn_abort(read_txn);
   4552 
   4553 		if (to_write > 0) {
   4554 			ndb_debug("pushing %d events to write queue\n", to_write);
   4555 			if (!prot_queue_push_all(ingester->writer_inbox, outs, to_write)) {
   4556 				ndb_debug("failed pushing %d events to write queue\n", to_write);
   4557 			}
   4558 		}
   4559 	}
   4560 
   4561 	ndb_debug("quitting ingester thread\n");
   4562 	secp256k1_context_destroy(ctx);
   4563 	return NULL;
   4564 }
   4565 
   4566 
   4567 static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb,
   4568 			   struct ndb_monitor *monitor, uint32_t ndb_flags)
   4569 {
   4570 	writer->lmdb = lmdb;
   4571 	writer->monitor = monitor;
   4572 	writer->ndb_flags = ndb_flags;
   4573 	writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE;
   4574 	writer->queue_buf = malloc(writer->queue_buflen);
   4575 	if (writer->queue_buf == NULL) {
   4576 		fprintf(stderr, "ndb: failed to allocate space for writer queue");
   4577 		return 0;
   4578 	}
   4579 
   4580 	// init the writer queue.
   4581 	prot_queue_init(&writer->inbox, writer->queue_buf,
   4582 			writer->queue_buflen, sizeof(struct ndb_writer_msg));
   4583 
   4584 	// spin up the writer thread
   4585 	if (THREAD_CREATE(writer->thread_id, ndb_writer_thread, writer))
   4586 	{
   4587 		fprintf(stderr, "ndb writer thread failed to create\n");
   4588 		return 0;
   4589 	}
   4590 
   4591 	return 1;
   4592 }
   4593 
   4594 // initialize the ingester queue and then spawn the thread
   4595 static int ndb_ingester_init(struct ndb_ingester *ingester,
   4596 			     struct ndb_lmdb *lmdb,
   4597 			     struct prot_queue *writer_inbox,
   4598 			     const struct ndb_config *config)
   4599 {
   4600 	int elem_size, num_elems;
   4601 	static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT };
   4602 
   4603 	// TODO: configurable queue sizes
   4604 	elem_size = sizeof(struct ndb_ingester_msg);
   4605 	num_elems = DEFAULT_QUEUE_SIZE;
   4606 
   4607 	ingester->writer_inbox = writer_inbox;
   4608 	ingester->lmdb = lmdb;
   4609 	ingester->flags = config->flags;
   4610 	ingester->filter = config->ingest_filter;
   4611 	ingester->filter_context = config->filter_context;
   4612 
   4613 	if (!threadpool_init(&ingester->tp, config->ingester_threads,
   4614 			     elem_size, num_elems, &quit_msg, ingester,
   4615 			     ndb_ingester_thread))
   4616 	{
   4617 		fprintf(stderr, "ndb ingester threadpool failed to init\n");
   4618 		return 0;
   4619 	}
   4620 
   4621 	return 1;
   4622 }
   4623 
   4624 static int ndb_writer_destroy(struct ndb_writer *writer)
   4625 {
   4626 	struct ndb_writer_msg msg;
   4627 
   4628 	// kill thread
   4629 	msg.type = NDB_WRITER_QUIT;
   4630 	ndb_debug("writer: pushing quit message\n");
   4631 	if (!prot_queue_push(&writer->inbox, &msg)) {
   4632 		// queue is too full to push quit message. just kill it.
   4633 		ndb_debug("writer: terminating thread\n");
   4634 		THREAD_TERMINATE(writer->thread_id);
   4635 	} else {
   4636 		ndb_debug("writer: joining thread\n");
   4637 		THREAD_FINISH(writer->thread_id);
   4638 	}
   4639 
   4640 	// cleanup
   4641 	ndb_debug("writer: cleaning up protected queue\n");
   4642 	prot_queue_destroy(&writer->inbox);
   4643 
   4644 	free(writer->queue_buf);
   4645 
   4646 	return 1;
   4647 }
   4648 
   4649 static int ndb_ingester_destroy(struct ndb_ingester *ingester)
   4650 {
   4651 	threadpool_destroy(&ingester->tp);
   4652 	return 1;
   4653 }
   4654 
   4655 static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize)
   4656 {
   4657 	int rc;
   4658 	MDB_txn *txn;
   4659 
   4660 	if ((rc = mdb_env_create(&lmdb->env))) {
   4661 		fprintf(stderr, "mdb_env_create failed, error %d\n", rc);
   4662 		return 0;
   4663 	}
   4664 
   4665 	if ((rc = mdb_env_set_mapsize(lmdb->env, mapsize))) {
   4666 		fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc);
   4667 		return 0;
   4668 	}
   4669 
   4670 	if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) {
   4671 		fprintf(stderr, "mdb_env_set_maxdbs failed, error %d\n", rc);
   4672 		return 0;
   4673 	}
   4674 
   4675 	if ((rc = mdb_env_open(lmdb->env, filename, 0, 0664))) {
   4676 		fprintf(stderr, "mdb_env_open failed, error %d\n", rc);
   4677 		return 0;
   4678 	}
   4679 
   4680 	// Initialize DBs
   4681 	if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) {
   4682 		fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc);
   4683 		return 0;
   4684 	}
   4685 
   4686 	// note flatbuffer db
   4687 	if ((rc = mdb_dbi_open(txn, "note", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NOTE]))) {
   4688 		fprintf(stderr, "mdb_dbi_open event failed, error %d\n", rc);
   4689 		return 0;
   4690 	}
   4691 
   4692 	// note metadata db
   4693 	if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) {
   4694 		fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc);
   4695 		return 0;
   4696 	}
   4697 
   4698 	// profile flatbuffer db
   4699 	if ((rc = mdb_dbi_open(txn, "profile", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_PROFILE]))) {
   4700 		fprintf(stderr, "mdb_dbi_open profile failed, error %d\n", rc);
   4701 		return 0;
   4702 	}
   4703 
   4704 	// profile search db
   4705 	if ((rc = mdb_dbi_open(txn, "profile_search", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_SEARCH]))) {
   4706 		fprintf(stderr, "mdb_dbi_open profile_search failed, error %d\n", rc);
   4707 		return 0;
   4708 	}
   4709 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], ndb_search_key_cmp);
   4710 
   4711 	// ndb metadata (db version, etc)
   4712 	if ((rc = mdb_dbi_open(txn, "ndb_meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NDB_META]))) {
   4713 		fprintf(stderr, "mdb_dbi_open ndb_meta failed, error %d\n", rc);
   4714 		return 0;
   4715 	}
   4716 
   4717 	// profile last fetches
   4718 	if ((rc = mdb_dbi_open(txn, "profile_last_fetch", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH]))) {
   4719 		fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc);
   4720 		return 0;
   4721 	}
   4722 
   4723 	// id+ts index flags
   4724 	unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED;
   4725 
   4726 	// index dbs
   4727 	if ((rc = mdb_dbi_open(txn, "note_id", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_ID]))) {
   4728 		fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
   4729 		return 0;
   4730 	}
   4731 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare);
   4732 
   4733 	if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) {
   4734 		fprintf(stderr, "mdb_dbi_open profile_pk failed: %s\n", mdb_strerror(rc));
   4735 		return 0;
   4736 	}
   4737 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare);
   4738 
   4739 	if ((rc = mdb_dbi_open(txn, "note_kind",
   4740 			       MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED,
   4741 			       &lmdb->dbs[NDB_DB_NOTE_KIND]))) {
   4742 		fprintf(stderr, "mdb_dbi_open note_kind failed: %s\n", mdb_strerror(rc));
   4743 		return 0;
   4744 	}
   4745 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_ts_compare);
   4746 
   4747 	if ((rc = mdb_dbi_open(txn, "note_pubkey",
   4748 			       MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED,
   4749 			       &lmdb->dbs[NDB_DB_NOTE_PUBKEY]))) {
   4750 		fprintf(stderr, "mdb_dbi_open note_pubkey failed: %s\n", mdb_strerror(rc));
   4751 		return 0;
   4752 	}
   4753 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_PUBKEY], ndb_tsid_compare);
   4754 
   4755 	if ((rc = mdb_dbi_open(txn, "note_pubkey_kind",
   4756 			       MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED,
   4757 			       &lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND]))) {
   4758 		fprintf(stderr, "mdb_dbi_open note_pubkey_kind failed: %s\n", mdb_strerror(rc));
   4759 		return 0;
   4760 	}
   4761 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND], ndb_id_u64_ts_compare);
   4762 
   4763 	if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT,
   4764 			       &lmdb->dbs[NDB_DB_NOTE_TEXT]))) {
   4765 		fprintf(stderr, "mdb_dbi_open note_text failed: %s\n", mdb_strerror(rc));
   4766 		return 0;
   4767 	}
   4768 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare);
   4769 
   4770 	if ((rc = mdb_dbi_open(txn, "note_blocks", MDB_CREATE | MDB_INTEGERKEY,
   4771 			       &lmdb->dbs[NDB_DB_NOTE_BLOCKS]))) {
   4772 		fprintf(stderr, "mdb_dbi_open note_blocks failed: %s\n", mdb_strerror(rc));
   4773 		return 0;
   4774 	}
   4775 
   4776 	if ((rc = mdb_dbi_open(txn, "note_tags", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED,
   4777 			       &lmdb->dbs[NDB_DB_NOTE_TAGS]))) {
   4778 		fprintf(stderr, "mdb_dbi_open note_tags failed: %s\n", mdb_strerror(rc));
   4779 		return 0;
   4780 	}
   4781 	mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TAGS], ndb_tag_key_compare);
   4782 
   4783 	// Commit the transaction
   4784 	if ((rc = mdb_txn_commit(txn))) {
   4785 		fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc);
   4786 		return 0;
   4787 	}
   4788 
   4789 	return 1;
   4790 }
   4791 
   4792 static int ndb_queue_write_version(struct ndb *ndb, uint64_t version)
   4793 {
   4794 	struct ndb_writer_msg msg;
   4795 	msg.type = NDB_WRITER_DBMETA;
   4796 	msg.ndb_meta.version = version;
   4797 	return ndb_writer_queue_msg(&ndb->writer, &msg);
   4798 }
   4799 
   4800 static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb,
   4801 			     void *sub_cb_ctx)
   4802 {
   4803 	monitor->num_subscriptions = 0;
   4804 	monitor->sub_cb = cb;
   4805 	monitor->sub_cb_ctx = sub_cb_ctx;
   4806 	pthread_mutex_init(&monitor->mutex, NULL);
   4807 }
   4808 
   4809 void ndb_filter_group_destroy(struct ndb_filter_group *group)
   4810 {
   4811 	struct ndb_filter *filter;
   4812 	int i;
   4813 	for (i = 0; i < group->num_filters; i++) {
   4814 		filter = &group->filters[i];
   4815 		ndb_filter_destroy(filter);
   4816 	}
   4817 }
   4818 
   4819 static void ndb_subscription_destroy(struct ndb_subscription *sub)
   4820 {
   4821 	ndb_filter_group_destroy(&sub->group);
   4822 	// this was malloc'd inside ndb_subscribe
   4823 	free(sub->inbox.buf);
   4824 	prot_queue_destroy(&sub->inbox);
   4825 	sub->subid = 0;
   4826 }
   4827 
   4828 static void ndb_monitor_destroy(struct ndb_monitor *monitor)
   4829 {
   4830 	int i;
   4831 
   4832 	ndb_monitor_lock(monitor);
   4833 
   4834 	for (i = 0; i < monitor->num_subscriptions; i++) {
   4835 		ndb_subscription_destroy(&monitor->subscriptions[i]);
   4836 	}
   4837 
   4838 	monitor->num_subscriptions = 0;
   4839 
   4840 	ndb_monitor_unlock(monitor);
   4841 
   4842 	pthread_mutex_destroy(&monitor->mutex);
   4843 }
   4844 
   4845 int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config)
   4846 {
   4847 	struct ndb *ndb;
   4848 	//MDB_dbi ind_id; // TODO: ind_pk, etc
   4849 
   4850 	ndb = *pndb = calloc(1, sizeof(struct ndb));
   4851 	ndb->flags = config->flags;
   4852 
   4853 	if (ndb == NULL) {
   4854 		fprintf(stderr, "ndb_init: malloc failed\n");
   4855 		return 0;
   4856 	}
   4857 
   4858 	if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize))
   4859 		return 0;
   4860 
   4861 	ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx);
   4862 
   4863 	if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor, ndb->flags)) {
   4864 		fprintf(stderr, "ndb_writer_init failed\n");
   4865 		return 0;
   4866 	}
   4867 
   4868 	if (!ndb_ingester_init(&ndb->ingester, &ndb->lmdb, &ndb->writer.inbox, config)) {
   4869 		fprintf(stderr, "failed to initialize %d ingester thread(s)\n",
   4870 				config->ingester_threads);
   4871 		return 0;
   4872 	}
   4873 
   4874 	if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE)) {
   4875 		struct ndb_writer_msg msg = { .type = NDB_WRITER_MIGRATE };
   4876 		ndb_writer_queue_msg(&ndb->writer, &msg);
   4877 	}
   4878 
   4879 	// Initialize LMDB environment and spin up threads
   4880 	return 1;
   4881 }
   4882 
   4883 void ndb_destroy(struct ndb *ndb)
   4884 {
   4885 	if (ndb == NULL)
   4886 		return;
   4887 
   4888 	// ingester depends on writer and must be destroyed first
   4889 	ndb_debug("destroying ingester\n");
   4890 	ndb_ingester_destroy(&ndb->ingester);
   4891 	ndb_debug("destroying writer\n");
   4892 	ndb_writer_destroy(&ndb->writer);
   4893 	ndb_debug("destroying monitor\n");
   4894 	ndb_monitor_destroy(&ndb->monitor);
   4895 
   4896 	ndb_debug("closing env\n");
   4897 	mdb_env_close(ndb->lmdb.env);
   4898 
   4899 	ndb_debug("ndb destroyed\n");
   4900 	free(ndb);
   4901 }
   4902 
   4903 // Process a nostr event from a client
   4904 //
   4905 // ie: ["EVENT", {"content":"..."} ...]
   4906 //
   4907 // The client-sent variation of ndb_process_event
   4908 int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
   4909 {
   4910 	return ndb_ingest_event(&ndb->ingester, json, len, 1);
   4911 }
   4912 
   4913 // Process anostr event from a relay,
   4914 //
   4915 // ie: ["EVENT", "subid", {"content":"..."}...]
   4916 //
   4917 // This function returns as soon as possible, first copying the passed
   4918 // json and then queueing it up for processing. Worker threads then take
   4919 // the json and process it.
   4920 //
   4921 // Processing:
   4922 //
   4923 // 1. The event is parsed into ndb_notes and the signature is validated
   4924 // 2. A quick lookup is made on the database to see if we already have
   4925 //    the note id, if we do we don't need to waste time on json parsing
   4926 //    or note validation.
   4927 // 3. Once validation is done we pass it to the writer queue for writing
   4928 //    to LMDB.
   4929 //
   4930 int ndb_process_event(struct ndb *ndb, const char *json, int json_len)
   4931 {
   4932 	return ndb_ingest_event(&ndb->ingester, json, json_len, 0);
   4933 }
   4934 
   4935 
   4936 int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client)
   4937 {
   4938 	const char *start, *end, *very_end;
   4939 	start = ldjson;
   4940 	end = start + json_len;
   4941 	very_end = ldjson + json_len;
   4942 	int (* process)(struct ndb *, const char *, int);
   4943 #if DEBUG
   4944 	int processed = 0;
   4945 #endif
   4946 	process = client ? ndb_process_client_event : ndb_process_event;
   4947 
   4948 	while ((end = fast_strchr(start, '\n', very_end - start))) {
   4949 		//printf("processing '%.*s'\n", (int)(end-start), start);
   4950 		if (!process(ndb, start, end - start)) {
   4951 			ndb_debug("ndb_process_client_event failed\n");
   4952 			return 0;
   4953 		}
   4954 		start = end + 1;
   4955 #if DEBUG
   4956 		processed++;
   4957 #endif
   4958 	}
   4959 
   4960 #if DEBUG
   4961 	ndb_debug("ndb_process_events: processed %d events\n", processed);
   4962 #endif
   4963 
   4964 	return 1;
   4965 }
   4966 
   4967 #ifndef _WIN32
   4968 // TODO: windows
   4969 int ndb_process_events_stream(struct ndb *ndb, FILE* fp)
   4970 {
   4971 	char *line = NULL;
   4972 	size_t len = 0;
   4973 	ssize_t nread;
   4974 
   4975 	while ((nread = getline(&line, &len, fp)) != -1) {
   4976 		if (line == NULL)
   4977 			break;
   4978 		ndb_process_event(ndb, line, len);
   4979 	}
   4980 
   4981 	if (line)
   4982 		free(line);
   4983 
   4984 	return 1;
   4985 }
   4986 #endif
   4987 
   4988 int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len)
   4989 {
   4990 	return _ndb_process_events(ndb, ldjson, json_len, 1);
   4991 }
   4992 
   4993 int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len)
   4994 {
   4995 	return _ndb_process_events(ndb, ldjson, json_len, 0);
   4996 }
   4997 
   4998 static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag)
   4999 {
   5000 	return cursor_push_u16(cur, tag->count);
   5001 }
   5002 
   5003 int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf,
   5004 		     size_t bufsize)
   5005 {
   5006 	struct ndb_note *note;
   5007 	int half, size, str_indices_size;
   5008 
   5009 	// come on bruh
   5010 	if (bufsize < sizeof(struct ndb_note) * 2)
   5011 		return 0;
   5012 
   5013 	str_indices_size = bufsize / 32;
   5014 	size = bufsize - str_indices_size;
   5015 	half = size / 2;
   5016 
   5017 	//debug("size %d half %d str_indices %d\n", size, half, str_indices_size);
   5018 
   5019 	// make a safe cursor of our available memory
   5020 	make_cursor(buf, buf + bufsize, &builder->mem);
   5021 
   5022 	note = builder->note = (struct ndb_note *)buf;
   5023 
   5024 	// take slices of the memory into subcursors
   5025 	if (!(cursor_slice(&builder->mem, &builder->note_cur, half) &&
   5026 	      cursor_slice(&builder->mem, &builder->strings, half) &&
   5027 	      cursor_slice(&builder->mem, &builder->str_indices, str_indices_size))) {
   5028 		return 0;
   5029 	}
   5030 
   5031 	memset(note, 0, sizeof(*note));
   5032 	builder->note_cur.p += sizeof(*note);
   5033 
   5034 	note->strings = builder->strings.start - buf;
   5035 	note->version = 1;
   5036 
   5037 	return 1;
   5038 }
   5039 
   5040 
   5041 
   5042 static inline int ndb_json_parser_init(struct ndb_json_parser *p,
   5043 				       const char *json, int json_len,
   5044 				       unsigned char *buf, int bufsize)
   5045 {
   5046 	int half = bufsize / 2;
   5047 
   5048 	unsigned char *tok_start = buf + half;
   5049 	unsigned char *tok_end = buf + bufsize;
   5050 
   5051 	p->toks = (jsmntok_t*)tok_start;
   5052 	p->toks_end = (jsmntok_t*)tok_end;
   5053 	p->num_tokens = 0;
   5054 	p->json = json;
   5055 	p->json_len = json_len;
   5056 
   5057 	// ndb_builder gets the first half of the buffer, and jsmn gets the
   5058 	// second half. I like this way of alloating memory (without actually
   5059 	// dynamically allocating memory). You get one big chunk upfront and
   5060 	// then submodules can recursively subdivide it. Maybe you could do
   5061 	// something even more clever like golden-ratio style subdivision where
   5062 	// the more important stuff gets a larger chunk and then it spirals
   5063 	// downward into smaller chunks. Thanks for coming to my TED talk.
   5064 
   5065 	if (!ndb_builder_init(&p->builder, buf, half))
   5066 		return 0;
   5067 
   5068 	jsmn_init(&p->json_parser);
   5069 
   5070 	return 1;
   5071 }
   5072 
   5073 static inline int ndb_json_parser_parse(struct ndb_json_parser *p,
   5074 					struct ndb_id_cb *cb)
   5075 {
   5076 	jsmntok_t *tok;
   5077 	int cap = ((unsigned char *)p->toks_end - (unsigned char*)p->toks)/sizeof(*p->toks);
   5078 	int res =
   5079 		jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap, cb != NULL);
   5080 
   5081 	// got an ID!
   5082 	if (res == -42) {
   5083 		tok = &p->toks[p->json_parser.toknext-1];
   5084 
   5085 		switch (cb->fn(cb->data, p->json + tok->start)) {
   5086 		case NDB_IDRES_CONT:
   5087 			res = jsmn_parse(&p->json_parser, p->json, p->json_len,
   5088 					 p->toks, cap, 0);
   5089 			break;
   5090 		case NDB_IDRES_STOP:
   5091 			return -42;
   5092 		}
   5093 	} else if (res == 0) {
   5094 		return 0;
   5095 	}
   5096 
   5097 	p->num_tokens = res;
   5098 	p->i = 0;
   5099 
   5100 	return 1;
   5101 }
   5102 
   5103 static inline int toksize(jsmntok_t *tok)
   5104 {
   5105 	return tok->end - tok->start;
   5106 }
   5107 
   5108 
   5109 
   5110 static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2)
   5111 {
   5112 	switch (c2) {
   5113 	case 't':  return cursor_push_byte(cur, '\t');
   5114 	case 'n':  return cursor_push_byte(cur, '\n');
   5115 	case 'r':  return cursor_push_byte(cur, '\r');
   5116 	case 'b':  return cursor_push_byte(cur, '\b');
   5117 	case 'f':  return cursor_push_byte(cur, '\f');
   5118 	case '\\': return cursor_push_byte(cur, '\\');
   5119 	case '/':  return cursor_push_byte(cur, '/');
   5120 	case '"':  return cursor_push_byte(cur, '"');
   5121 	case 'u':
   5122 		// these aren't handled yet
   5123 		return 0;
   5124 	default:
   5125 		return cursor_push_byte(cur, c1) && cursor_push_byte(cur, c2);
   5126 	}
   5127 }
   5128 
   5129 static int cursor_push_escaped_char(struct cursor *cur, char c)
   5130 {
   5131         switch (c) {
   5132         case '"':  return cursor_push_str(cur, "\\\"");
   5133         case '\\': return cursor_push_str(cur, "\\\\");
   5134         case '\b': return cursor_push_str(cur, "\\b");
   5135         case '\f': return cursor_push_str(cur, "\\f");
   5136         case '\n': return cursor_push_str(cur, "\\n");
   5137         case '\r': return cursor_push_str(cur, "\\r");
   5138         case '\t': return cursor_push_str(cur, "\\t");
   5139         // TODO: \u hex hex hex hex
   5140         }
   5141         return cursor_push_byte(cur, c);
   5142 }
   5143 
   5144 static int cursor_push_hex_str(struct cursor *cur, unsigned char *buf, int len)
   5145 {
   5146 	int i;
   5147 
   5148 	if (len % 2 != 0)
   5149 		return 0;
   5150 
   5151         if (!cursor_push_byte(cur, '"'))
   5152                 return 0;
   5153 
   5154 	for (i = 0; i < len; i++) {
   5155 		unsigned int c = ((const unsigned char *)buf)[i];
   5156 		if (!cursor_push_byte(cur, hexchar(c >> 4)))
   5157 			return 0;
   5158 		if (!cursor_push_byte(cur, hexchar(c & 0xF)))
   5159 			return 0;
   5160 	}
   5161 
   5162         if (!cursor_push_byte(cur, '"'))
   5163                 return 0;
   5164 
   5165 	return 1;
   5166 }
   5167 
   5168 static int cursor_push_jsonstr(struct cursor *cur, const char *str)
   5169 {
   5170 	int i;
   5171         int len;
   5172 
   5173 	len = strlen(str);
   5174 
   5175         if (!cursor_push_byte(cur, '"'))
   5176                 return 0;
   5177 
   5178         for (i = 0; i < len; i++) {
   5179                 if (!cursor_push_escaped_char(cur, str[i]))
   5180                         return 0;
   5181         }
   5182 
   5183         if (!cursor_push_byte(cur, '"'))
   5184                 return 0;
   5185 
   5186         return 1;
   5187 }
   5188 
   5189 
   5190 static inline int cursor_push_json_tag_str(struct cursor *cur, struct ndb_str str)
   5191 {
   5192 	if (str.flag == NDB_PACKED_ID)
   5193 		return cursor_push_hex_str(cur, str.id, 32);
   5194 
   5195 	return cursor_push_jsonstr(cur, str.str);
   5196 }
   5197 
   5198 static int cursor_push_json_tag(struct cursor *cur, struct ndb_note *note,
   5199 				struct ndb_tag *tag)
   5200 {
   5201         int i;
   5202 
   5203         if (!cursor_push_byte(cur, '['))
   5204                 return 0;
   5205 
   5206         for (i = 0; i < tag->count; i++) {
   5207                 if (!cursor_push_json_tag_str(cur, ndb_tag_str(note, tag, i)))
   5208                         return 0;
   5209                 if (i != tag->count-1 && !cursor_push_byte(cur, ','))
   5210 			return 0;
   5211         }
   5212 
   5213         return cursor_push_byte(cur, ']');
   5214 }
   5215 
   5216 static int cursor_push_json_tags(struct cursor *cur, struct ndb_note *note)
   5217 {
   5218 	int i;
   5219 	struct ndb_iterator iter, *it = &iter;
   5220 	ndb_tags_iterate_start(note, it);
   5221 
   5222         if (!cursor_push_byte(cur, '['))
   5223                 return 0;
   5224 
   5225 	i = 0;
   5226 	while (ndb_tags_iterate_next(it)) {
   5227 		if (!cursor_push_json_tag(cur, note, it->tag))
   5228 			return 0;
   5229                 if (i != note->tags.count-1 && !cursor_push_str(cur, ","))
   5230 			return 0;
   5231 		i++;
   5232 	}
   5233 
   5234         if (!cursor_push_byte(cur, ']'))
   5235                 return 0;
   5236 
   5237 	return 1;
   5238 }
   5239 
   5240 static int ndb_event_commitment(struct ndb_note *ev, unsigned char *buf, int buflen)
   5241 {
   5242 	char timebuf[16] = {0};
   5243 	char kindbuf[16] = {0};
   5244 	char pubkey[65];
   5245 	struct cursor cur;
   5246 	int ok;
   5247 
   5248 	if (!hex_encode(ev->pubkey, sizeof(ev->pubkey), pubkey))
   5249 		return 0;
   5250 
   5251 	make_cursor(buf, buf + buflen, &cur);
   5252 
   5253 	// TODO: update in 2106 ...
   5254 	snprintf(timebuf, sizeof(timebuf), "%d", (uint32_t)ev->created_at);
   5255 	snprintf(kindbuf, sizeof(kindbuf), "%d", ev->kind);
   5256 
   5257 	ok =
   5258 		cursor_push_str(&cur, "[0,\"") &&
   5259 		cursor_push_str(&cur, pubkey) &&
   5260 		cursor_push_str(&cur, "\",") &&
   5261 		cursor_push_str(&cur, timebuf) &&
   5262 		cursor_push_str(&cur, ",") &&
   5263 		cursor_push_str(&cur, kindbuf) &&
   5264 		cursor_push_str(&cur, ",") &&
   5265 		cursor_push_json_tags(&cur, ev) &&
   5266 		cursor_push_str(&cur, ",") &&
   5267 		cursor_push_jsonstr(&cur, ndb_note_str(ev, &ev->content).str) &&
   5268 		cursor_push_str(&cur, "]");
   5269 
   5270 	if (!ok)
   5271 		return 0;
   5272 
   5273 	return cur.p - cur.start;
   5274 }
   5275 
   5276 static int cursor_push_hex(struct cursor *c, unsigned char *bytes, int len)
   5277 {
   5278 	int i;
   5279 	unsigned char chr;
   5280 	if (c->p + (len * 2) >= c->end)
   5281 		return 0;
   5282 
   5283 	for (i = 0; i < len; i++) {
   5284 		chr = bytes[i];
   5285 
   5286 		*(c->p++) = hexchar(chr >> 4);
   5287 		*(c->p++) = hexchar(chr & 0xF);
   5288 	}
   5289 
   5290 	return 1;
   5291 }
   5292 
   5293 static int cursor_push_int_str(struct cursor *c, uint64_t num)
   5294 {
   5295 	char timebuf[16] = {0};
   5296 	snprintf(timebuf, sizeof(timebuf), "%" PRIu64, num);
   5297 	return cursor_push_str(c, timebuf);
   5298 }
   5299 
   5300 int ndb_note_json(struct ndb_note *note, char *buf, int buflen)
   5301 {
   5302 	struct cursor cur, *c = &cur;
   5303 
   5304 	make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, &cur);
   5305 
   5306 	int ok = cursor_push_str(c, "{\"id\":\"") &&
   5307 	       cursor_push_hex(c, ndb_note_id(note), 32) &&
   5308 	       cursor_push_str(c, "\",\"pubkey\":\"") &&
   5309 	       cursor_push_hex(c, ndb_note_pubkey(note), 32) &&
   5310 	       cursor_push_str(c, "\",\"created_at\":") &&
   5311 	       cursor_push_int_str(c, ndb_note_created_at(note)) &&
   5312 	       cursor_push_str(c, ",\"kind\":") &&
   5313 	       cursor_push_int_str(c, ndb_note_kind(note)) &&
   5314 	       cursor_push_str(c, ",\"tags\":") &&
   5315 	       cursor_push_json_tags(c, note) &&
   5316 	       cursor_push_str(c, ",\"content\":") &&
   5317 	       cursor_push_jsonstr(c, ndb_note_content(note)) &&
   5318 	       cursor_push_str(c, ",\"sig\":\"") &&
   5319 	       cursor_push_hex(c, ndb_note_sig(note), 64) &&
   5320 	       cursor_push_c_str(c, "\"}");
   5321 
   5322 	if (!ok) {
   5323 		return 0;
   5324 	}
   5325 
   5326 	return cur.p - cur.start;
   5327 }
   5328 
   5329 static int cursor_push_json_elem_array(struct cursor *cur,
   5330 				       const struct ndb_filter *filter,
   5331 				       struct ndb_filter_elements *elems)
   5332 {
   5333 	int i;
   5334 	unsigned char *id;
   5335 	const char *str;
   5336 	uint64_t val;
   5337 
   5338 	if (!cursor_push_byte(cur, '['))
   5339 		return 0;
   5340 
   5341 	for (i = 0; i < elems->count; i++) {
   5342 
   5343 		switch (elems->field.elem_type)  {
   5344 		case NDB_ELEMENT_STRING:
   5345 			str = ndb_filter_get_string_element(filter, elems, i);
   5346 			if (!cursor_push_jsonstr(cur, str))
   5347 				return 0;
   5348 			break;
   5349 		case NDB_ELEMENT_ID:
   5350 			id = ndb_filter_get_id_element(filter, elems, i);
   5351 			if (!cursor_push_hex_str(cur, id, 32))
   5352 				return 0;
   5353 			break;
   5354 		case NDB_ELEMENT_INT:
   5355 			val = ndb_filter_get_int_element(elems, i);
   5356 			if (!cursor_push_int_str(cur, val))
   5357 				return 0;
   5358 			break;
   5359 		case NDB_ELEMENT_UNKNOWN:
   5360 			ndb_debug("unknown element in cursor_push_json_elem_array");
   5361 			return 0;
   5362 		}
   5363 
   5364 		if (i != elems->count-1) {
   5365 			if (!cursor_push_byte(cur, ','))
   5366 				return 0;
   5367 		}
   5368 	}
   5369 
   5370 	if (!cursor_push_str(cur, "]"))
   5371 		return 0;
   5372 
   5373 	return 1;
   5374 }
   5375 
   5376 int ndb_filter_json(const struct ndb_filter *filter, char *buf, int buflen)
   5377 {
   5378 	struct cursor cur, *c = &cur;
   5379 	struct ndb_filter_elements *elems;
   5380 	int i;
   5381 
   5382 	if (!filter->finalized) {
   5383 		ndb_debug("filter not finalized in ndb_filter_json\n");
   5384 		return 0;
   5385 	}
   5386 
   5387 	make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, c);
   5388 
   5389 	if (!cursor_push_str(c, "{"))
   5390 		return 0;
   5391 
   5392 	for (i = 0; i < filter->num_elements; i++) {
   5393 		elems = ndb_filter_get_elements(filter, i);
   5394 		switch (elems->field.type) {
   5395 		case NDB_FILTER_IDS:
   5396 			if (!cursor_push_str(c, "\"ids\":"))
   5397 				return 0;
   5398 			if (!cursor_push_json_elem_array(c, filter, elems))
   5399 				return 0;
   5400 			break;
   5401 		case NDB_FILTER_AUTHORS:
   5402 			if (!cursor_push_str(c, "\"authors\":"))
   5403 				return 0;
   5404 			if (!cursor_push_json_elem_array(c, filter, elems))
   5405 				return 0;
   5406 			break;
   5407 		case NDB_FILTER_KINDS:
   5408 			if (!cursor_push_str(c, "\"kinds\":"))
   5409 				return 0;
   5410 			if (!cursor_push_json_elem_array(c, filter, elems))
   5411 				return 0;
   5412 			break;
   5413 		case NDB_FILTER_TAGS:
   5414 			if (!cursor_push_str(c, "\"#"))
   5415 				return 0;
   5416 			if (!cursor_push_byte(c, elems->field.tag))
   5417 				return 0;
   5418 			if (!cursor_push_str(c, "\":"))
   5419 				return 0;
   5420 			if (!cursor_push_json_elem_array(c, filter, elems))
   5421 				return 0;
   5422 			break;
   5423 		case NDB_FILTER_SINCE:
   5424 			if (!cursor_push_str(c, "\"since\":"))
   5425 				return 0;
   5426 			if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0)))
   5427 				return 0;
   5428 			break;
   5429 		case NDB_FILTER_UNTIL:
   5430 			if (!cursor_push_str(c, "\"until\":"))
   5431 				return 0;
   5432 			if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0)))
   5433 				return 0;
   5434 			break;
   5435 		case NDB_FILTER_LIMIT:
   5436 			if (!cursor_push_str(c, "\"limit\":"))
   5437 				return 0;
   5438 			if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0)))
   5439 				return 0;
   5440 			break;
   5441 		}
   5442 
   5443 		if (i != filter->num_elements-1) {
   5444 			if (!cursor_push_byte(c, ',')) {
   5445 				return 0;
   5446 			}
   5447 		}
   5448 
   5449 	}
   5450 
   5451 	if (!cursor_push_c_str(c, "}"))
   5452 		return 0;
   5453 
   5454 	return cur.p - cur.start;
   5455 }
   5456 
   5457 int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen) {
   5458 	int len;
   5459 
   5460 	if (!(len = ndb_event_commitment(note, buf, buflen)))
   5461 		return 0;
   5462 
   5463 	//fprintf(stderr, "%.*s\n", len, buf);
   5464 
   5465 	sha256((struct sha256*)note->id, buf, len);
   5466 
   5467 	return 1;
   5468 }
   5469 
   5470 int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32],
   5471 		unsigned char sig[64])
   5472 {
   5473 	unsigned char aux[32];
   5474 	secp256k1_keypair *pair = (secp256k1_keypair*) keypair->pair;
   5475 
   5476 	if (!fill_random(aux, sizeof(aux)))
   5477 		return 0;
   5478 
   5479 	secp256k1_context *ctx =
   5480 		secp256k1_context_create(SECP256K1_CONTEXT_NONE);
   5481 
   5482 	return secp256k1_schnorrsig_sign32(ctx, sig, id, pair, aux);
   5483 }
   5484 
   5485 int ndb_create_keypair(struct ndb_keypair *kp)
   5486 {
   5487 	secp256k1_keypair *keypair = (secp256k1_keypair*)kp->pair;
   5488 	secp256k1_xonly_pubkey pubkey;
   5489 
   5490 	secp256k1_context *ctx =
   5491 		secp256k1_context_create(SECP256K1_CONTEXT_NONE);;
   5492 
   5493 	/* Try to create a keypair with a valid context, it should only
   5494 	 * fail if the secret key is zero or out of range. */
   5495 	if (!secp256k1_keypair_create(ctx, keypair, kp->secret))
   5496 		return 0;
   5497 
   5498 	if (!secp256k1_keypair_xonly_pub(ctx, &pubkey, NULL, keypair))
   5499 		return 0;
   5500 
   5501 	/* Serialize the public key. Should always return 1 for a valid public key. */
   5502 	return secp256k1_xonly_pubkey_serialize(ctx, kp->pubkey, &pubkey);
   5503 }
   5504 
   5505 int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair)
   5506 {
   5507 	if (!hex_decode(secstr, strlen(secstr), keypair->secret, 32)) {
   5508 		fprintf(stderr, "could not hex decode secret key\n");
   5509 		return 0;
   5510 	}
   5511 
   5512 	return ndb_create_keypair(keypair);
   5513 }
   5514 
   5515 int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note,
   5516 			 struct ndb_keypair *keypair)
   5517 {
   5518 	int strings_len = builder->strings.p - builder->strings.start;
   5519 	unsigned char *note_end = builder->note_cur.p + strings_len;
   5520 	int total_size = note_end - builder->note_cur.start;
   5521 
   5522 	// move the strings buffer next to the end of our ndb_note
   5523 	memmove(builder->note_cur.p, builder->strings.start, strings_len);
   5524 
   5525 	// set the strings location
   5526 	builder->note->strings = builder->note_cur.p - builder->note_cur.start;
   5527 
   5528 	// record the total size
   5529 	//builder->note->size = total_size;
   5530 
   5531 	*note = builder->note;
   5532 
   5533 	// generate id and sign if we're building this manually
   5534 	if (keypair) {
   5535 		// use the remaining memory for building our id buffer
   5536 		unsigned char *end   = builder->mem.end;
   5537 		unsigned char *start = (unsigned char*)(*note) + total_size;
   5538 
   5539 		ndb_builder_set_pubkey(builder, keypair->pubkey);
   5540 
   5541 		if (!ndb_calculate_id(builder->note, start, end - start))
   5542 			return 0;
   5543 
   5544 		if (!ndb_sign_id(keypair, (*note)->id, (*note)->sig))
   5545 			return 0;
   5546 	}
   5547 
   5548 	// make sure we're aligned as a whole
   5549 	total_size = (total_size + 7) & ~7;
   5550 	assert((total_size % 8) == 0);
   5551 	return total_size;
   5552 }
   5553 
   5554 struct ndb_note * ndb_builder_note(struct ndb_builder *builder)
   5555 {
   5556 	return builder->note;
   5557 }
   5558 
   5559 static union ndb_packed_str ndb_offset_str(uint32_t offset)
   5560 {
   5561 	// ensure accidents like -1 don't corrupt our packed_str
   5562 	union ndb_packed_str str;
   5563 	// most significant byte is reserved for ndb_packtype
   5564 	str.offset = offset & 0xFFFFFF;
   5565 	return str;
   5566 }
   5567 
   5568 
   5569 /// find an existing string via str_indices. these indices only exist in the
   5570 /// builder phase just for this purpose.
   5571 static inline int ndb_builder_find_str(struct ndb_builder *builder,
   5572 				       const char *str, int len,
   5573 				       union ndb_packed_str *pstr)
   5574 {
   5575 	// find existing matching string to avoid duplicate strings
   5576 	int indices = cursor_count(&builder->str_indices, sizeof(uint32_t));
   5577 	for (int i = 0; i < indices; i++) {
   5578 		uint32_t index = ((uint32_t*)builder->str_indices.start)[i];
   5579 		const char *some_str = (const char*)builder->strings.start + index;
   5580 
   5581 		if (!memcmp(some_str, str, len) && some_str[len] == '\0') {
   5582 			// found an existing matching str, use that index
   5583 			*pstr = ndb_offset_str(index);
   5584 			return 1;
   5585 		}
   5586 	}
   5587 
   5588 	return 0;
   5589 }
   5590 
   5591 static int ndb_builder_push_str(struct ndb_builder *builder, const char *str,
   5592 				int len, union ndb_packed_str *pstr)
   5593 {
   5594 	uint32_t loc;
   5595 
   5596 	// no string found, push a new one
   5597 	loc = builder->strings.p - builder->strings.start;
   5598 	if (!(cursor_push(&builder->strings, (unsigned char*)str, len) &&
   5599 	      cursor_push_byte(&builder->strings, '\0'))) {
   5600 		return 0;
   5601 	}
   5602 
   5603 	*pstr = ndb_offset_str(loc);
   5604 
   5605 	// record in builder indices. ignore return value, if we can't cache it
   5606 	// then whatever
   5607 	cursor_push_u32(&builder->str_indices, loc);
   5608 
   5609 	return 1;
   5610 }
   5611 
   5612 static int ndb_builder_push_packed_id(struct ndb_builder *builder,
   5613 				      unsigned char *id,
   5614 				      union ndb_packed_str *pstr)
   5615 {
   5616 	// Don't both find id duplicates. very rarely are they duplicated
   5617 	// and it slows things down quite a bit. If we really care about this
   5618 	// We can switch to a hash table.
   5619 	//if (ndb_builder_find_str(builder, (const char*)id, 32, pstr)) {
   5620 	//	pstr->packed.flag = NDB_PACKED_ID;
   5621 	//	return 1;
   5622 	//}
   5623 
   5624 	if (ndb_builder_push_str(builder, (const char*)id, 32, pstr)) {
   5625 		pstr->packed.flag = NDB_PACKED_ID;
   5626 		return 1;
   5627 	}
   5628 
   5629 	return 0;
   5630 }
   5631 
   5632 union ndb_packed_str ndb_chars_to_packed_str(char c1, char c2)
   5633 {
   5634 	union ndb_packed_str str;
   5635 	str.packed.flag = NDB_PACKED_STR;
   5636 	str.packed.str[0] = c1;
   5637 	str.packed.str[1] = c2;
   5638 	str.packed.str[2] = '\0';
   5639 	return str;
   5640 }
   5641 
   5642 static union ndb_packed_str ndb_char_to_packed_str(char c)
   5643 {
   5644 	union ndb_packed_str str;
   5645 	str.packed.flag = NDB_PACKED_STR;
   5646 	str.packed.str[0] = c;
   5647 	str.packed.str[1] = '\0';
   5648 	return str;
   5649 }
   5650 
   5651 
   5652 /// Check for small strings to pack
   5653 static inline int ndb_builder_try_compact_str(struct ndb_builder *builder,
   5654 					      const char *str, int len,
   5655 					      union ndb_packed_str *pstr,
   5656 					      int pack_ids)
   5657 {
   5658 	unsigned char id_buf[32];
   5659 
   5660 	if (len == 0) {
   5661 		*pstr = ndb_char_to_packed_str(0);
   5662 		return 1;
   5663 	} else if (len == 1) {
   5664 		*pstr = ndb_char_to_packed_str(str[0]);
   5665 		return 1;
   5666 	} else if (len == 2) {
   5667 		*pstr = ndb_chars_to_packed_str(str[0], str[1]);
   5668 		return 1;
   5669 	} else if (pack_ids && len == 64 && hex_decode(str, 64, id_buf, 32)) {
   5670 		return ndb_builder_push_packed_id(builder, id_buf, pstr);
   5671 	}
   5672 
   5673 	return 0;
   5674 }
   5675 
   5676 
   5677 static int ndb_builder_push_unpacked_str(struct ndb_builder *builder,
   5678 					 const char *str, int len,
   5679 					 union ndb_packed_str *pstr)
   5680 {
   5681 	if (ndb_builder_find_str(builder, str, len, pstr))
   5682 		return 1;
   5683 
   5684 	return ndb_builder_push_str(builder, str, len, pstr);
   5685 }
   5686 
   5687 int ndb_builder_make_str(struct ndb_builder *builder, const char *str, int len,
   5688 			 union ndb_packed_str *pstr, int pack_ids)
   5689 {
   5690 	if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids))
   5691 		return 1;
   5692 
   5693 	return ndb_builder_push_unpacked_str(builder, str, len, pstr);
   5694 }
   5695 
   5696 int ndb_builder_set_content(struct ndb_builder *builder, const char *content,
   5697 			    int len)
   5698 {
   5699 	int pack_ids = 0;
   5700 	builder->note->content_length = len;
   5701 	return ndb_builder_make_str(builder, content, len,
   5702 				    &builder->note->content, pack_ids);
   5703 }
   5704 
   5705 
   5706 static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len,
   5707 			 const char *s)
   5708 {
   5709 	if (tok->type == JSMN_STRING && (int)strlen(s) == tok_len &&
   5710 	    memcmp(json + tok->start, s, tok_len) == 0) {
   5711 		return 1;
   5712 	}
   5713 	return 0;
   5714 }
   5715 
   5716 static int ndb_builder_finalize_tag(struct ndb_builder *builder,
   5717 				    union ndb_packed_str offset)
   5718 {
   5719 	if (!cursor_push_u32(&builder->note_cur, offset.offset))
   5720 		return 0;
   5721 	builder->current_tag->count++;
   5722 	return 1;
   5723 }
   5724 
   5725 /// Unescape and push json strings
   5726 static int ndb_builder_make_json_str(struct ndb_builder *builder,
   5727 				     const char *str, int len,
   5728 				     union ndb_packed_str *pstr,
   5729 				     int *written, int pack_ids)
   5730 {
   5731 	// let's not care about de-duping these. we should just unescape
   5732 	// in-place directly into the strings table.
   5733 	if (written)
   5734 		*written = len;
   5735 
   5736 	const char *p, *end, *start;
   5737 	unsigned char *builder_start;
   5738 
   5739 	// always try compact strings first
   5740 	if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids))
   5741 		return 1;
   5742 
   5743 	end = str + len;
   5744 	start = str; // Initialize start to the beginning of the string
   5745 
   5746 	*pstr = ndb_offset_str(builder->strings.p - builder->strings.start);
   5747 	builder_start = builder->strings.p;
   5748 
   5749 	for (p = str; p < end; p++) {
   5750 		if (*p == '\\' && p+1 < end) {
   5751 			// Push the chunk of unescaped characters before this escape sequence
   5752 			if (start < p && !cursor_push(&builder->strings,
   5753 						(unsigned char *)start,
   5754 						p - start)) {
   5755 				return 0;
   5756 			}
   5757 
   5758 			if (!cursor_push_unescaped_char(&builder->strings, *p, *(p+1)))
   5759 				return 0;
   5760 
   5761 			p++; // Skip the character following the backslash
   5762 			start = p + 1; // Update the start pointer to the next character
   5763 		}
   5764 	}
   5765 
   5766 	// Handle the last chunk after the last escape sequence (or if there are no escape sequences at all)
   5767 	if (start < p && !cursor_push(&builder->strings, (unsigned char *)start,
   5768 				      p - start)) {
   5769 		return 0;
   5770 	}
   5771 
   5772 	if (written)
   5773 		*written = builder->strings.p - builder_start;
   5774 
   5775 	// TODO: dedupe these!?
   5776 	return cursor_push_byte(&builder->strings, '\0');
   5777 }
   5778 
   5779 static int ndb_builder_push_json_tag(struct ndb_builder *builder,
   5780 				     const char *str, int len)
   5781 {
   5782 	union ndb_packed_str pstr;
   5783 	int pack_ids = 1;
   5784 	if (!ndb_builder_make_json_str(builder, str, len, &pstr, NULL, pack_ids))
   5785 		return 0;
   5786 	return ndb_builder_finalize_tag(builder, pstr);
   5787 }
   5788 
   5789 // Push a json array into an ndb tag ["p", "abcd..."] -> struct ndb_tag
   5790 static int ndb_builder_tag_from_json_array(struct ndb_json_parser *p,
   5791 					   jsmntok_t *array)
   5792 {
   5793 	jsmntok_t *str_tok;
   5794 	const char *str;
   5795 
   5796 	if (array->size == 0)
   5797 		return 0;
   5798 
   5799 	if (!ndb_builder_new_tag(&p->builder))
   5800 		return 0;
   5801 
   5802 	for (int i = 0; i < array->size; i++) {
   5803 		str_tok = &array[i+1];
   5804 		str = p->json + str_tok->start;
   5805 
   5806 		if (!ndb_builder_push_json_tag(&p->builder, str,
   5807 					       toksize(str_tok))) {
   5808 			return 0;
   5809 		}
   5810 	}
   5811 
   5812 	return 1;
   5813 }
   5814 
   5815 // Push json tags into ndb data
   5816 //   [["t", "hashtag"], ["p", "abcde..."]] -> struct ndb_tags
   5817 static inline int ndb_builder_process_json_tags(struct ndb_json_parser *p,
   5818 						jsmntok_t *array)
   5819 {
   5820 	jsmntok_t *tag = array;
   5821 
   5822 	if (array->size == 0)
   5823 		return 1;
   5824 
   5825 	for (int i = 0; i < array->size; i++) {
   5826 		if (!ndb_builder_tag_from_json_array(p, &tag[i+1]))
   5827 			return 0;
   5828 		tag += tag[i+1].size;
   5829 	}
   5830 
   5831 	return 1;
   5832 }
   5833 
   5834 static int parse_unsigned_int(const char *start, int len, unsigned int *num)
   5835 {
   5836 	unsigned int number = 0;
   5837 	const char *p = start, *end = start + len;
   5838 	int digits = 0;
   5839 
   5840 	while (p < end) {
   5841 		char c = *p;
   5842 
   5843 		if (c < '0' || c > '9')
   5844 			break;
   5845 
   5846 		// Check for overflow
   5847 		char digit = c - '0';
   5848 		if (number > (UINT_MAX - digit) / 10)
   5849 			return 0; // Overflow detected
   5850 
   5851 		number = number * 10 + digit;
   5852 
   5853 		p++;
   5854 		digits++;
   5855 	}
   5856 
   5857 	if (digits == 0)
   5858 		return 0;
   5859 
   5860 	*num = number;
   5861 	return 1;
   5862 }
   5863 
   5864 int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce,
   5865 			       unsigned char *buf, int bufsize, struct ndb_id_cb *cb)
   5866 {
   5867 	jsmntok_t *tok = NULL;
   5868 	int tok_len, res;
   5869 	struct ndb_json_parser parser;
   5870 	struct ndb_event *ev = &fce->event;
   5871 
   5872 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
   5873 
   5874 	if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
   5875 		return res;
   5876 
   5877 	if (parser.toks[0].type == JSMN_OBJECT) {
   5878 		ndb_debug("got raw json in client_event_from_json\n");
   5879 		fce->evtype = NDB_FCE_EVENT;
   5880 		return ndb_parse_json_note(&parser, &ev->note);
   5881 	}
   5882 
   5883 	if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY)
   5884 		return 0;
   5885 
   5886 	parser.i = 1;
   5887 	tok = &parser.toks[parser.i++];
   5888 	tok_len = toksize(tok);
   5889 	if (tok->type != JSMN_STRING)
   5890 		return 0;
   5891 
   5892 	if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
   5893 		fce->evtype = NDB_FCE_EVENT;
   5894 		return ndb_parse_json_note(&parser, &ev->note);
   5895 	}
   5896 
   5897 	return 0;
   5898 }
   5899 
   5900 
   5901 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
   5902 			   unsigned char *buf, int bufsize,
   5903 			   struct ndb_id_cb *cb)
   5904 {
   5905 	jsmntok_t *tok = NULL;
   5906 	int tok_len, res;
   5907 	struct ndb_json_parser parser;
   5908 	struct ndb_event *ev = &tce->event;
   5909 
   5910 	tce->subid_len = 0;
   5911 	tce->subid = "";
   5912 
   5913 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
   5914 
   5915 	if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
   5916 		return res;
   5917 
   5918 	if (parser.toks[0].type == JSMN_OBJECT) {
   5919 		ndb_debug("got raw json in ws_event_from_json\n");
   5920 		tce->evtype = NDB_TCE_EVENT;
   5921 		return ndb_parse_json_note(&parser, &ev->note);
   5922 	}
   5923 
   5924 	if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY)
   5925 		return 0;
   5926 
   5927 	parser.i = 1;
   5928 	tok = &parser.toks[parser.i++];
   5929 	tok_len = toksize(tok);
   5930 	if (tok->type != JSMN_STRING)
   5931 		return 0;
   5932 
   5933 	if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
   5934 		tce->evtype = NDB_TCE_EVENT;
   5935 
   5936 		tok = &parser.toks[parser.i++];
   5937 		if (tok->type != JSMN_STRING)
   5938 			return 0;
   5939 
   5940 		tce->subid = json + tok->start;
   5941 		tce->subid_len = toksize(tok);
   5942 
   5943 		return ndb_parse_json_note(&parser, &ev->note);
   5944 	} else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) {
   5945 		tce->evtype = NDB_TCE_EOSE;
   5946 
   5947 		tok = &parser.toks[parser.i++];
   5948 		if (tok->type != JSMN_STRING)
   5949 			return 0;
   5950 
   5951 		tce->subid = json + tok->start;
   5952 		tce->subid_len = toksize(tok);
   5953 		return 1;
   5954 	} else if (tok_len == 2 && !memcmp("OK", json + tok->start, 2)) {
   5955 		if (parser.num_tokens != 5)
   5956 			return 0;
   5957 
   5958 		struct ndb_command_result *cr = &tce->command_result;
   5959 
   5960 		tce->evtype = NDB_TCE_OK;
   5961 
   5962 		tok = &parser.toks[parser.i++];
   5963 		if (tok->type != JSMN_STRING)
   5964 			return 0;
   5965 
   5966 		tce->subid = json + tok->start;
   5967 		tce->subid_len = toksize(tok);
   5968 
   5969 		tok = &parser.toks[parser.i++];
   5970 		if (tok->type != JSMN_PRIMITIVE || toksize(tok) == 0)
   5971 			return 0;
   5972 
   5973 		cr->ok = (json + tok->start)[0] == 't';
   5974 
   5975 		tok = &parser.toks[parser.i++];
   5976 		if (tok->type != JSMN_STRING)
   5977 			return 0;
   5978 
   5979 		tce->command_result.msg = json + tok->start;
   5980 		tce->command_result.msglen = toksize(tok);
   5981 
   5982 		return 1;
   5983 	} else if (tok_len == 4 && !memcmp("AUTH", json + tok->start, 4)) {
   5984 		tce->evtype = NDB_TCE_AUTH;
   5985 
   5986 		tok = &parser.toks[parser.i++];
   5987 		if (tok->type != JSMN_STRING)
   5988 			return 0;
   5989 
   5990 		tce->subid = json + tok->start;
   5991 		tce->subid_len = toksize(tok);
   5992 
   5993 		return 1;
   5994 	}
   5995 
   5996 	return 0;
   5997 }
   5998 
   5999 static enum ndb_filter_fieldtype
   6000 ndb_filter_parse_field(const char *tok, int len, char *tagchar)
   6001 {
   6002 	*tagchar = 0;
   6003 
   6004 	if (len == 0)
   6005 		return 0;
   6006 
   6007 	if (len == 7 && !strncmp(tok, "authors", 7)) {
   6008 		return NDB_FILTER_AUTHORS;
   6009 	} else if (len == 3 && !strncmp(tok, "ids", 3)) {
   6010 		return NDB_FILTER_IDS;
   6011 	} else if (len == 5 && !strncmp(tok, "kinds", 5)) {
   6012 		return NDB_FILTER_KINDS;
   6013 	} else if (len == 2 && tok[0] == '#') {
   6014 		*tagchar = tok[1];
   6015 		return NDB_FILTER_TAGS;
   6016 	} else if (len == 5 && !strncmp(tok, "since", 5)) {
   6017 		return NDB_FILTER_SINCE;
   6018 	} else if (len == 5 && !strncmp(tok, "until", 5)) {
   6019 		return NDB_FILTER_UNTIL;
   6020 	} else if (len == 5 && !strncmp(tok, "limit", 5)) {
   6021 		return NDB_FILTER_LIMIT;
   6022 	}
   6023 
   6024 	return 0;
   6025 }
   6026 
   6027 static int ndb_filter_parse_json_ids(struct ndb_json_parser *parser,
   6028 				     struct ndb_filter *filter)
   6029 {
   6030 	jsmntok_t *tok;
   6031 	const char *start;
   6032 	unsigned char hexbuf[32];
   6033 	int tok_len, i, size;
   6034 
   6035 	tok = &parser->toks[parser->i++];
   6036 
   6037 	if (tok->type != JSMN_ARRAY) {
   6038 		ndb_debug("parse_json_ids: not an array\n");
   6039 		return 0;
   6040 	}
   6041 
   6042 	size = tok->size;
   6043 
   6044 	for (i = 0; i < size; parser->i++, i++) {
   6045 		tok = &parser->toks[parser->i];
   6046 		start = parser->json + tok->start;
   6047 		tok_len = toksize(tok);
   6048 
   6049 		if (tok->type != JSMN_STRING) {
   6050 			ndb_debug("parse_json_ids: not a string '%d'\n", tok->type);
   6051 			return 0;
   6052 		}
   6053 
   6054 		if (tok_len != 64) {
   6055 			ndb_debug("parse_json_ids: not len 64: '%.*s'\n", tok_len, start);
   6056 			return 0;
   6057 		}
   6058 
   6059 		// id
   6060 		if (!hex_decode(start, tok_len, hexbuf, sizeof(hexbuf))) {
   6061 			ndb_debug("parse_json_ids: hex decode failed\n");
   6062 			return 0;
   6063 		}
   6064 
   6065 		ndb_debug("adding id elem\n");
   6066 		if (!ndb_filter_add_id_element(filter, hexbuf)) {
   6067 			ndb_debug("parse_json_ids: failed to add id element\n");
   6068 			return 0;
   6069 		}
   6070 	}
   6071 
   6072 	parser->i--;
   6073 	return 1;
   6074 }
   6075 
   6076 static int ndb_filter_parse_json_elems(struct ndb_json_parser *parser,
   6077 				       struct ndb_filter *filter)
   6078 {
   6079 	jsmntok_t *tok;
   6080 	const char *start;
   6081 	int tok_len;
   6082 	unsigned char hexbuf[32];
   6083 	enum ndb_generic_element_type typ;
   6084 	tok = NULL;
   6085 	int i, size;
   6086 
   6087 	tok = &parser->toks[parser->i++];
   6088 
   6089 	if (tok->type != JSMN_ARRAY)
   6090 		return 0;
   6091 
   6092 	size = tok->size;
   6093 
   6094 	for (i = 0; i < size; i++, parser->i++) {
   6095 		tok = &parser->toks[parser->i];
   6096 		start = parser->json + tok->start;
   6097 		tok_len = toksize(tok);
   6098 
   6099 		if (tok->type != JSMN_STRING)
   6100 			return 0;
   6101 
   6102 		if (i == 0) {
   6103 			if (tok_len == 64 && hex_decode(start, 64, hexbuf, sizeof(hexbuf))) {
   6104 				typ = NDB_ELEMENT_ID;
   6105 				if (!ndb_filter_add_id_element(filter, hexbuf)) {
   6106 					ndb_debug("failed to push id elem\n");
   6107 					return 0;
   6108 				}
   6109 			} else {
   6110 				typ = NDB_ELEMENT_STRING;
   6111 				if (!ndb_filter_add_str_element_len(filter, start, tok_len))
   6112 					return 0;
   6113 			}
   6114 		} else if (typ == NDB_ELEMENT_ID) {
   6115 			if (!hex_decode(start, 64, hexbuf, sizeof(hexbuf)))
   6116 				return 0;
   6117 			if (!ndb_filter_add_id_element(filter, hexbuf))
   6118 				return 0;
   6119 		} else if (typ == NDB_ELEMENT_STRING) {
   6120 			if (!ndb_filter_add_str_element_len(filter, start, tok_len))
   6121 				return 0;
   6122 		} else {
   6123 			// ???
   6124 			return 0;
   6125 		}
   6126 	}
   6127 
   6128 	parser->i--;
   6129 	return 1;
   6130 }
   6131 
   6132 static int ndb_filter_parse_json_int(struct ndb_json_parser *parser,
   6133 				     struct ndb_filter *filter)
   6134 {
   6135 	jsmntok_t *tok;
   6136 	const char *start;
   6137 	int tok_len;
   6138 	unsigned int value;
   6139 
   6140 	tok = &parser->toks[parser->i];
   6141 	start = parser->json + tok->start;
   6142 	tok_len = toksize(tok);
   6143 
   6144 	if (tok->type != JSMN_PRIMITIVE)
   6145 		return 0;
   6146 
   6147 	if (!parse_unsigned_int(start, tok_len, &value))
   6148 		return 0;
   6149 
   6150 	if (!ndb_filter_add_int_element(filter, (uint64_t)value))
   6151 		return 0;
   6152 
   6153 	ndb_debug("added int elem %d\n", value);
   6154 
   6155 	return 1;
   6156 }
   6157 
   6158 
   6159 static int ndb_filter_parse_json_ints(struct ndb_json_parser *parser,
   6160 				      struct ndb_filter *filter)
   6161 {
   6162 	jsmntok_t *tok;
   6163 	int size, i;
   6164 
   6165 	tok = &parser->toks[parser->i++];
   6166 
   6167 	if (tok->type != JSMN_ARRAY)
   6168 		return 0;
   6169 
   6170 	size = tok->size;
   6171 
   6172 	for (i = 0; i < size; parser->i++, i++) {
   6173 		if (!ndb_filter_parse_json_int(parser, filter))
   6174 			return 0;
   6175 	}
   6176 
   6177 	parser->i--;
   6178 	return 1;
   6179 }
   6180 
   6181 
   6182 static int ndb_filter_parse_json(struct ndb_json_parser *parser,
   6183 				 struct ndb_filter *filter)
   6184 {
   6185 	jsmntok_t *tok = NULL;
   6186 	const char *json = parser->json;
   6187 	const char *start;
   6188 	char tag;
   6189 	int tok_len;
   6190 	enum ndb_filter_fieldtype field;
   6191 
   6192 	if (parser->toks[parser->i++].type != JSMN_OBJECT)
   6193 		return 0;
   6194 
   6195 	for (; parser->i < parser->num_tokens; parser->i++) {
   6196 		tok = &parser->toks[parser->i++];
   6197 		start = json + tok->start;
   6198 		tok_len = toksize(tok);
   6199 
   6200 		if (!(field = ndb_filter_parse_field(start, tok_len, &tag))) {
   6201 			ndb_debug("failed field '%.*s'\n", tok_len, start);
   6202 			continue;
   6203 		}
   6204 
   6205 		if (tag) {
   6206 			ndb_debug("starting tag field '%c'\n", tag);
   6207 			if (!ndb_filter_start_tag_field(filter, tag)) {
   6208 				ndb_debug("failed to start tag field '%c'\n", tag);
   6209 				return 0;
   6210 			}
   6211 		} else if (!ndb_filter_start_field(filter, field)) {
   6212 			ndb_debug("field already started\n");
   6213 			return 0;
   6214 		}
   6215 
   6216 		// we parsed a top-level field
   6217 		switch(field) {
   6218 		case NDB_FILTER_AUTHORS:
   6219 		case NDB_FILTER_IDS:
   6220 			if (!ndb_filter_parse_json_ids(parser, filter)) {
   6221 				ndb_debug("failed to parse filter ids/authors\n");
   6222 				return 0;
   6223 			}
   6224 			break;
   6225 		case NDB_FILTER_SINCE:
   6226 		case NDB_FILTER_UNTIL:
   6227 		case NDB_FILTER_LIMIT:
   6228 			if (!ndb_filter_parse_json_int(parser, filter)) {
   6229 				ndb_debug("failed to parse filter since/until/limit\n");
   6230 				return 0;
   6231 			}
   6232 			break;
   6233 		case NDB_FILTER_KINDS:
   6234 			if (!ndb_filter_parse_json_ints(parser, filter)) {
   6235 				ndb_debug("failed to parse filter kinds\n");
   6236 				return 0;
   6237 			}
   6238 			break;
   6239 		case NDB_FILTER_TAGS:
   6240 			if (!ndb_filter_parse_json_elems(parser, filter)) {
   6241 				ndb_debug("failed to parse filter tags\n");
   6242 				return 0;
   6243 			}
   6244 			break;
   6245 		}
   6246 
   6247 		ndb_filter_end_field(filter);
   6248 	}
   6249 
   6250 	return ndb_filter_end(filter);
   6251 }
   6252 
   6253 int ndb_parse_json_note(struct ndb_json_parser *parser, struct ndb_note **note)
   6254 {
   6255 	jsmntok_t *tok = NULL;
   6256 	unsigned char hexbuf[64];
   6257 	const char *json = parser->json;
   6258 	const char *start;
   6259 	int i, tok_len, parsed;
   6260 
   6261 	parsed = 0;
   6262 
   6263 	if (parser->toks[parser->i].type != JSMN_OBJECT)
   6264 		return 0;
   6265 
   6266 	// TODO: build id buffer and verify at end
   6267 
   6268 	for (i = parser->i + 1; i < parser->num_tokens; i++) {
   6269 		tok = &parser->toks[i];
   6270 		start = json + tok->start;
   6271 		tok_len = toksize(tok);
   6272 
   6273 		//printf("toplevel %.*s %d\n", tok_len, json + tok->start, tok->type);
   6274 		if (tok_len == 0 || i + 1 >= parser->num_tokens)
   6275 			continue;
   6276 
   6277 		if (start[0] == 'p' && jsoneq(json, tok, tok_len, "pubkey")) {
   6278 			// pubkey
   6279 			tok = &parser->toks[i+1];
   6280 			hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf));
   6281 			parsed |= NDB_PARSED_PUBKEY;
   6282 			ndb_builder_set_pubkey(&parser->builder, hexbuf);
   6283 		} else if (tok_len == 2 && start[0] == 'i' && start[1] == 'd') {
   6284 			// id
   6285 			tok = &parser->toks[i+1];
   6286 			hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf));
   6287 			parsed |= NDB_PARSED_ID;
   6288 			ndb_builder_set_id(&parser->builder, hexbuf);
   6289 		} else if (tok_len == 3 && start[0] == 's' && start[1] == 'i' && start[2] == 'g') {
   6290 			// sig
   6291 			tok = &parser->toks[i+1];
   6292 			hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf));
   6293 			parsed |= NDB_PARSED_SIG;
   6294 			ndb_builder_set_sig(&parser->builder, hexbuf);
   6295 		} else if (start[0] == 'k' && jsoneq(json, tok, tok_len, "kind")) {
   6296 			// kind
   6297 			tok = &parser->toks[i+1];
   6298 			start = json + tok->start;
   6299 			if (tok->type != JSMN_PRIMITIVE || tok_len <= 0)
   6300 				return 0;
   6301 			if (!parse_unsigned_int(start, toksize(tok),
   6302 						&parser->builder.note->kind))
   6303 					return 0;
   6304 			parsed |= NDB_PARSED_KIND;
   6305 		} else if (start[0] == 'c') {
   6306 			if (jsoneq(json, tok, tok_len, "created_at")) {
   6307 				// created_at
   6308 				tok = &parser->toks[i+1];
   6309 				start = json + tok->start;
   6310 				if (tok->type != JSMN_PRIMITIVE || tok_len <= 0)
   6311 					return 0;
   6312 				// TODO: update to int64 in 2106 ... xD
   6313 				unsigned int bigi;
   6314 				if (!parse_unsigned_int(start, toksize(tok), &bigi))
   6315 					return 0;
   6316 				parser->builder.note->created_at = bigi;
   6317 				parsed |= NDB_PARSED_CREATED_AT;
   6318 			} else if (jsoneq(json, tok, tok_len, "content")) {
   6319 				// content
   6320 				tok = &parser->toks[i+1];
   6321 				union ndb_packed_str pstr;
   6322 				tok_len = toksize(tok);
   6323 				int written, pack_ids = 0;
   6324 				if (!ndb_builder_make_json_str(&parser->builder,
   6325 							json + tok->start,
   6326 							tok_len, &pstr,
   6327 							&written, pack_ids)) {
   6328 					ndb_debug("ndb_builder_make_json_str failed\n");
   6329 					return 0;
   6330 				}
   6331 				parser->builder.note->content_length = written;
   6332 				parser->builder.note->content = pstr;
   6333 				parsed |= NDB_PARSED_CONTENT;
   6334 			}
   6335 		} else if (start[0] == 't' && jsoneq(json, tok, tok_len, "tags")) {
   6336 			tok = &parser->toks[i+1];
   6337 			ndb_builder_process_json_tags(parser, tok);
   6338 			i += tok->size;
   6339 			parsed |= NDB_PARSED_TAGS;
   6340 		}
   6341 	}
   6342 
   6343 	//ndb_debug("parsed %d = %d, &->%d", parsed, NDB_PARSED_ALL, parsed & NDB_PARSED_ALL);
   6344 	if (parsed != NDB_PARSED_ALL)
   6345 		return 0;
   6346 
   6347 	return ndb_builder_finalize(&parser->builder, note, NULL);
   6348 }
   6349 
   6350 int ndb_filter_from_json(const char *json, int len, struct ndb_filter *filter,
   6351 			 unsigned char *buf, int bufsize)
   6352 {
   6353 	struct ndb_json_parser parser;
   6354 	int res;
   6355 
   6356 	if (filter->finalized)
   6357 		return 0;
   6358 
   6359 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
   6360 	if ((res = ndb_json_parser_parse(&parser, NULL)) < 0)
   6361 		return res;
   6362 
   6363 	if (parser.num_tokens < 1)
   6364 		return 0;
   6365 
   6366 	return ndb_filter_parse_json(&parser, filter);
   6367 }
   6368 
   6369 int ndb_note_from_json(const char *json, int len, struct ndb_note **note,
   6370 		       unsigned char *buf, int bufsize)
   6371 {
   6372 	struct ndb_json_parser parser;
   6373 	int res;
   6374 
   6375 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
   6376 	if ((res = ndb_json_parser_parse(&parser, NULL)) < 0)
   6377 		return res;
   6378 
   6379 	if (parser.num_tokens < 1)
   6380 		return 0;
   6381 
   6382 	return ndb_parse_json_note(&parser, note);
   6383 }
   6384 
   6385 void ndb_builder_set_pubkey(struct ndb_builder *builder, unsigned char *pubkey)
   6386 {
   6387 	memcpy(builder->note->pubkey, pubkey, 32);
   6388 }
   6389 
   6390 void ndb_builder_set_id(struct ndb_builder *builder, unsigned char *id)
   6391 {
   6392 	memcpy(builder->note->id, id, 32);
   6393 }
   6394 
   6395 void ndb_builder_set_sig(struct ndb_builder *builder, unsigned char *sig)
   6396 {
   6397 	memcpy(builder->note->sig, sig, 64);
   6398 }
   6399 
   6400 void ndb_builder_set_kind(struct ndb_builder *builder, uint32_t kind)
   6401 {
   6402 	builder->note->kind = kind;
   6403 }
   6404 
   6405 void ndb_builder_set_created_at(struct ndb_builder *builder, uint64_t created_at)
   6406 {
   6407 	builder->note->created_at = created_at;
   6408 }
   6409 
   6410 int ndb_builder_new_tag(struct ndb_builder *builder)
   6411 {
   6412 	builder->note->tags.count++;
   6413 	struct ndb_tag tag = {0};
   6414 	builder->current_tag = (struct ndb_tag *)builder->note_cur.p;
   6415 	return cursor_push_tag(&builder->note_cur, &tag);
   6416 }
   6417 
   6418 void ndb_stat_counts_init(struct ndb_stat_counts *counts)
   6419 {
   6420 	counts->count = 0;
   6421 	counts->key_size = 0;
   6422 	counts->value_size = 0;
   6423 }
   6424 
   6425 static void ndb_stat_init(struct ndb_stat *stat)
   6426 {
   6427 	// init stats
   6428 	int i;
   6429 
   6430 	for (i = 0; i < NDB_CKIND_COUNT; i++) {
   6431 		ndb_stat_counts_init(&stat->common_kinds[i]);
   6432 	}
   6433 
   6434 	for (i = 0; i < NDB_DBS; i++) {
   6435 		ndb_stat_counts_init(&stat->dbs[i]);
   6436 	}
   6437 
   6438 	ndb_stat_counts_init(&stat->other_kinds);
   6439 }
   6440 
   6441 int ndb_stat(struct ndb *ndb, struct ndb_stat *stat)
   6442 {
   6443 	int rc;
   6444 	MDB_cursor *cur;
   6445 	MDB_val k, v;
   6446 	MDB_dbi db;
   6447 	struct ndb_txn txn;
   6448 	struct ndb_note *note;
   6449 	int i;
   6450 	enum ndb_common_kind common_kind;
   6451 
   6452 	// initialize to 0
   6453 	ndb_stat_init(stat);
   6454 
   6455 	if (!ndb_begin_query(ndb, &txn)) {
   6456 		fprintf(stderr, "ndb_stat failed at ndb_begin_query\n");
   6457 		return 0;
   6458 	}
   6459 
   6460 	// stat each dbi in the database
   6461 	for (i = 0; i < NDB_DBS; i++)
   6462 	{
   6463 		db = ndb->lmdb.dbs[i];
   6464 
   6465 		if ((rc = mdb_cursor_open(txn.mdb_txn, db, &cur))) {
   6466 			fprintf(stderr, "ndb_stat: mdb_cursor_open failed, error '%s'\n",
   6467 					mdb_strerror(rc));
   6468 			return 0;
   6469 		}
   6470 
   6471 		// loop over every entry and count kv sizes
   6472 		while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   6473 			// we gather more detailed per-kind stats if we're in
   6474 			// the notes db
   6475 			if (i == NDB_DB_NOTE) {
   6476 				note = v.mv_data;
   6477 				common_kind = ndb_kind_to_common_kind(note->kind);
   6478 
   6479 				// uncommon kind? just count them in bulk
   6480 				if ((int)common_kind == -1) {
   6481 					stat->other_kinds.count++;
   6482 					stat->other_kinds.key_size += k.mv_size;
   6483 					stat->other_kinds.value_size += v.mv_size;
   6484 				} else {
   6485 					stat->common_kinds[common_kind].count++;
   6486 					stat->common_kinds[common_kind].key_size += k.mv_size;
   6487 					stat->common_kinds[common_kind].value_size += v.mv_size;
   6488 				}
   6489 			}
   6490 
   6491 			stat->dbs[i].count++;
   6492 			stat->dbs[i].key_size += k.mv_size;
   6493 			stat->dbs[i].value_size += v.mv_size;
   6494 		}
   6495 
   6496 		// close the cursor, they are per-dbi
   6497 		mdb_cursor_close(cur);
   6498 	}
   6499 
   6500 	ndb_end_query(&txn);
   6501 
   6502 	return 1;
   6503 }
   6504 
   6505 /// Push an element to the current tag
   6506 ///
   6507 /// Basic idea is to call ndb_builder_new_tag
   6508 int ndb_builder_push_tag_str(struct ndb_builder *builder,
   6509 				    const char *str, int len)
   6510 {
   6511 	union ndb_packed_str pstr;
   6512 	int pack_ids = 1;
   6513 	if (!ndb_builder_make_str(builder, str, len, &pstr, pack_ids))
   6514 		return 0;
   6515 	return ndb_builder_finalize_tag(builder, pstr);
   6516 }
   6517 
   6518 //
   6519 // CONFIG
   6520 //
   6521 void ndb_default_config(struct ndb_config *config)
   6522 {
   6523 	int cores = get_cpu_cores();
   6524 	config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB
   6525 	config->ingester_threads = cores == -1 ? 4 : cores;
   6526 	config->flags = 0;
   6527 	config->ingest_filter = NULL;
   6528 	config->filter_context = NULL;
   6529 	config->sub_cb_ctx = NULL;
   6530 	config->sub_cb = NULL;
   6531 }
   6532 
   6533 void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *context)
   6534 {
   6535 	config->sub_cb_ctx = context;
   6536 	config->sub_cb = fn;
   6537 }
   6538 
   6539 void ndb_config_set_ingest_threads(struct ndb_config *config, int threads)
   6540 {
   6541 	config->ingester_threads = threads;
   6542 }
   6543 
   6544 void ndb_config_set_flags(struct ndb_config *config, int flags)
   6545 {
   6546 	config->flags = flags;
   6547 }
   6548 
   6549 void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize)
   6550 {
   6551 	config->mapsize = mapsize;
   6552 }
   6553 
   6554 void ndb_config_set_ingest_filter(struct ndb_config *config,
   6555 				  ndb_ingest_filter_fn fn, void *filter_ctx)
   6556 {
   6557 	config->ingest_filter = fn;
   6558 	config->filter_context = filter_ctx;
   6559 }
   6560 
   6561 int ndb_print_tag_index(struct ndb_txn *txn)
   6562 {
   6563 	MDB_cursor *cur;
   6564 	MDB_val k, v;
   6565 	int i;
   6566 
   6567 	if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TAGS], &cur))
   6568 		return 0;
   6569 
   6570 	i = 1;
   6571 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   6572 		printf("%d ", i);
   6573 		print_tag_kv(txn, &k, &v);
   6574 		i++;
   6575 	}
   6576 
   6577 	return 1;
   6578 }
   6579 
   6580 int ndb_print_kind_keys(struct ndb_txn *txn)
   6581 {
   6582 	MDB_cursor *cur;
   6583 	MDB_val k, v;
   6584 	int i;
   6585 	struct ndb_u64_ts *tsid;
   6586 
   6587 	if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_KIND], &cur))
   6588 		return 0;
   6589 
   6590 	i = 1;
   6591 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   6592 		tsid = k.mv_data;
   6593 		printf("%d note_kind %" PRIu64 " %" PRIu64 "\n",
   6594 			i, tsid->u64, tsid->timestamp);
   6595 
   6596 		i++;
   6597 	}
   6598 
   6599 	return 1;
   6600 }
   6601 
   6602 // used by ndb.c
   6603 int ndb_print_search_keys(struct ndb_txn *txn)
   6604 {
   6605 	MDB_cursor *cur;
   6606 	MDB_val k, v;
   6607 	int i;
   6608 	struct ndb_text_search_key search_key;
   6609 
   6610 	if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TEXT], &cur))
   6611 		return 0;
   6612 
   6613 	i = 1;
   6614 	while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
   6615 		if (!ndb_unpack_text_search_key(k.mv_data, k.mv_size, &search_key)) {
   6616 			fprintf(stderr, "error decoding key %d\n", i);
   6617 			continue;
   6618 		}
   6619 
   6620 		ndb_print_text_search_key(&search_key);
   6621 		printf("\n");
   6622 
   6623 		i++;
   6624 	}
   6625 
   6626 	return 1;
   6627 }
   6628 
   6629 struct ndb_tags *ndb_note_tags(struct ndb_note *note)
   6630 {
   6631 	return &note->tags;
   6632 }
   6633 
   6634 struct ndb_str ndb_note_str(struct ndb_note *note, union ndb_packed_str *pstr)
   6635 {
   6636 	struct ndb_str str;
   6637 	str.flag = pstr->packed.flag;
   6638 
   6639 	if (str.flag == NDB_PACKED_STR) {
   6640 		str.str = pstr->packed.str;
   6641 		return str;
   6642 	}
   6643 
   6644 	str.str = ((const char *)note) + note->strings + (pstr->offset & 0xFFFFFF);
   6645 	return str;
   6646 }
   6647 
   6648 struct ndb_str ndb_tag_str(struct ndb_note *note, struct ndb_tag *tag, int ind)
   6649 {
   6650 	return ndb_note_str(note, &tag->strs[ind]);
   6651 }
   6652 
   6653 int ndb_str_len(struct ndb_str *str)
   6654 {
   6655 	if (str->flag == NDB_PACKED_ID)
   6656 		return 32;
   6657 	return strlen(str->str);
   6658 }
   6659 
   6660 struct ndb_str ndb_iter_tag_str(struct ndb_iterator *iter, int ind)
   6661 {
   6662 	return ndb_tag_str(iter->note, iter->tag, ind);
   6663 }
   6664 
   6665 unsigned char * ndb_note_id(struct ndb_note *note)
   6666 {
   6667 	return note->id;
   6668 }
   6669 
   6670 unsigned char * ndb_note_pubkey(struct ndb_note *note)
   6671 {
   6672 	return note->pubkey;
   6673 }
   6674 
   6675 unsigned char * ndb_note_sig(struct ndb_note *note)
   6676 {
   6677 	return note->sig;
   6678 }
   6679 
   6680 uint32_t ndb_note_created_at(struct ndb_note *note)
   6681 {
   6682 	return note->created_at;
   6683 }
   6684 
   6685 uint32_t ndb_note_kind(struct ndb_note *note)
   6686 {
   6687 	return note->kind;
   6688 }
   6689 
   6690 void _ndb_note_set_kind(struct ndb_note *note, uint32_t kind)
   6691 {
   6692 	note->kind = kind;
   6693 }
   6694 
   6695 const char *ndb_note_content(struct ndb_note *note)
   6696 {
   6697 	return ndb_note_str(note, &note->content).str;
   6698 }
   6699 
   6700 uint32_t ndb_note_content_length(struct ndb_note *note)
   6701 {
   6702 	return note->content_length;
   6703 }
   6704 
   6705 struct ndb_note * ndb_note_from_bytes(unsigned char *bytes)
   6706 {
   6707 	struct ndb_note *note = (struct ndb_note *)bytes;
   6708 	if (note->version != 1)
   6709 		return 0;
   6710 	return note;
   6711 }
   6712 
   6713 void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter)
   6714 {
   6715 	iter->note = note;
   6716 	iter->tag = NULL;
   6717 	iter->index = -1;
   6718 }
   6719 
   6720 // Helper function to get a pointer to the nth tag
   6721 static struct ndb_tag *ndb_tags_tag(struct ndb_tags *tags, size_t index) {
   6722     return (struct ndb_tag *)((uint8_t *)tags + sizeof(struct ndb_tags) + index * sizeof(struct ndb_tag));
   6723 }
   6724 
   6725 int ndb_tags_iterate_next(struct ndb_iterator *iter)
   6726 {
   6727 	struct ndb_tags *tags;
   6728 
   6729 	if (iter->tag == NULL || iter->index == -1) {
   6730 		iter->tag = ndb_tags_tag(&iter->note->tags, 0);
   6731 		iter->index = 0;
   6732 		return iter->note->tags.count != 0;
   6733 	}
   6734 
   6735 	tags = &iter->note->tags;
   6736 
   6737 	if (++iter->index < tags->count) {
   6738 		uint32_t tag_data_size = iter->tag->count * sizeof(iter->tag->strs[0]);
   6739 		iter->tag = (struct ndb_tag *)(iter->tag->strs[0].bytes + tag_data_size);
   6740 		return 1;
   6741 	}
   6742 
   6743 	return 0;
   6744 }
   6745 
   6746 uint16_t ndb_tags_count(struct ndb_tags *tags)
   6747 {
   6748 	return tags->count;
   6749 }
   6750 
   6751 uint16_t ndb_tag_count(struct ndb_tag *tags)
   6752 {
   6753 	return tags->count;
   6754 }
   6755 
   6756 enum ndb_common_kind ndb_kind_to_common_kind(int kind)
   6757 {
   6758 	switch (kind)
   6759 	{
   6760 		case 0:     return NDB_CKIND_PROFILE;
   6761 		case 1:     return NDB_CKIND_TEXT;
   6762 		case 3:     return NDB_CKIND_CONTACTS;
   6763 		case 4:     return NDB_CKIND_DM;
   6764 		case 5:     return NDB_CKIND_DELETE;
   6765 		case 6:     return NDB_CKIND_REPOST;
   6766 		case 7:     return NDB_CKIND_REACTION;
   6767 		case 9735:  return NDB_CKIND_ZAP;
   6768 		case 9734:  return NDB_CKIND_ZAP_REQUEST;
   6769 		case 23194: return NDB_CKIND_NWC_REQUEST;
   6770 		case 23195: return NDB_CKIND_NWC_RESPONSE;
   6771 		case 27235: return NDB_CKIND_HTTP_AUTH;
   6772 		case 30000: return NDB_CKIND_LIST;
   6773 		case 30023: return NDB_CKIND_LONGFORM;
   6774 		case 30315: return NDB_CKIND_STATUS;
   6775 	}
   6776 
   6777 	return -1;
   6778 }
   6779 
   6780 const char *ndb_kind_name(enum ndb_common_kind ck)
   6781 {
   6782 	switch (ck) {
   6783 		case NDB_CKIND_PROFILE:      return "profile";
   6784 		case NDB_CKIND_TEXT:         return "text";
   6785 		case NDB_CKIND_CONTACTS:     return "contacts";
   6786 		case NDB_CKIND_DM:           return "dm";
   6787 		case NDB_CKIND_DELETE:       return "delete";
   6788 		case NDB_CKIND_REPOST:       return "repost";
   6789 		case NDB_CKIND_REACTION:     return "reaction";
   6790 		case NDB_CKIND_ZAP:          return "zap";
   6791 		case NDB_CKIND_ZAP_REQUEST:  return "zap_request";
   6792 		case NDB_CKIND_NWC_REQUEST:  return "nwc_request";
   6793 		case NDB_CKIND_NWC_RESPONSE: return "nwc_response";
   6794 		case NDB_CKIND_HTTP_AUTH:    return "http_auth";
   6795 		case NDB_CKIND_LIST:         return "list";
   6796 		case NDB_CKIND_LONGFORM:     return "longform";
   6797 		case NDB_CKIND_STATUS:       return "status";
   6798 		case NDB_CKIND_COUNT:        return "unknown";
   6799 	}
   6800 
   6801 	return "unknown";
   6802 }
   6803 
   6804 const char *ndb_db_name(enum ndb_dbs db)
   6805 {
   6806 	switch (db) {
   6807 		case NDB_DB_NOTE:
   6808 			return "note";
   6809 		case NDB_DB_META:
   6810 			return "note_metadata";
   6811 		case NDB_DB_PROFILE:
   6812 			return "profile";
   6813 		case NDB_DB_NOTE_ID:
   6814 			return "note_index";
   6815 		case NDB_DB_PROFILE_PK:
   6816 			return "profile_pubkey_index";
   6817 		case NDB_DB_NDB_META:
   6818 			return "nostrdb_metadata";
   6819 		case NDB_DB_PROFILE_SEARCH:
   6820 			return "profile_search";
   6821 		case NDB_DB_PROFILE_LAST_FETCH:
   6822 			return "profile_last_fetch";
   6823 		case NDB_DB_NOTE_KIND:
   6824 			return "note_kind_index";
   6825 		case NDB_DB_NOTE_TEXT:
   6826 			return "note_fulltext";
   6827 		case NDB_DB_NOTE_BLOCKS:
   6828 			return "note_blocks";
   6829 		case NDB_DB_NOTE_TAGS:
   6830 			return "note_tags";
   6831 		case NDB_DB_NOTE_PUBKEY:
   6832 			return "note_pubkey_index";
   6833 		case NDB_DB_NOTE_PUBKEY_KIND:
   6834 			return "note_pubkey_kind_index";
   6835 		case NDB_DBS:
   6836 			return "count";
   6837 	}
   6838 
   6839 	return "unknown";
   6840 }
   6841 
   6842 static struct ndb_blocks *ndb_note_to_blocks(struct ndb_note *note)
   6843 {
   6844 	const char *content;
   6845 	size_t content_len;
   6846 	struct ndb_blocks *blocks;
   6847 
   6848 	content = ndb_note_content(note);
   6849 	content_len = ndb_note_content_length(note);
   6850 
   6851 	// something weird is going on
   6852 	if (content_len >= INT32_MAX)
   6853 		return NULL;
   6854 
   6855 	unsigned char *buffer = malloc(content_len);
   6856 	if (!buffer)
   6857 		return NULL;
   6858 
   6859 	if (!ndb_parse_content(buffer, content_len, content, content_len, &blocks)) {
   6860 		free(buffer);
   6861 		return NULL;
   6862 	}
   6863 
   6864 	//blocks = realloc(blocks, ndb_blocks_total_size(blocks));
   6865 	//if (blocks == NULL)
   6866 		//return NULL;
   6867 
   6868 	blocks->flags |= NDB_BLOCK_FLAG_OWNED;
   6869 
   6870 	return blocks;
   6871 }
   6872 
   6873 struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, uint64_t note_key)
   6874 {
   6875 	struct ndb_blocks *blocks, *blocks_to_writer;
   6876 	size_t blocks_size;
   6877 	struct ndb_note *note;
   6878 	size_t note_len;
   6879 
   6880 	if ((blocks = ndb_lookup_by_key(txn, note_key, NDB_DB_NOTE_BLOCKS, &note_len))) {
   6881 		return blocks;
   6882 	}
   6883 
   6884 	// If we don't have note blocks, let's lazily generate them. This is
   6885 	// migration-friendly instead of doing them all at once
   6886 	if (!(note = ndb_get_note_by_key(txn, note_key, &note_len))) {
   6887 		// no note found, can't return note blocks
   6888 		return NULL;
   6889 	}
   6890 
   6891 	 if (!(blocks = ndb_note_to_blocks(note)))
   6892 		 return NULL;
   6893 
   6894 	 // send a copy to the writer
   6895 	 blocks_size = ndb_blocks_total_size(blocks);
   6896 	 blocks_to_writer = malloc(blocks_size);
   6897 	 memcpy(blocks_to_writer, blocks, blocks_size);
   6898 	 assert(blocks->flags & NDB_BLOCK_FLAG_OWNED);
   6899 
   6900 	 // we generated new blocks, let's store them in the DB
   6901 	 struct ndb_writer_blocks write_blocks = {
   6902 		 .blocks = blocks_to_writer,
   6903 		 .note_key = note_key
   6904 	 };
   6905 
   6906 	 assert(write_blocks.blocks != blocks);
   6907 
   6908 	 struct ndb_writer_msg msg = { .type = NDB_WRITER_BLOCKS };
   6909 	 msg.blocks = write_blocks;
   6910 
   6911 	 ndb_writer_queue_msg(&ndb->writer, &msg);
   6912 
   6913 	 return blocks;
   6914 }
   6915 
   6916 // please call ndb_monitor_lock before calling this
   6917 static struct ndb_subscription *
   6918 ndb_monitor_find_subscription(struct ndb_monitor *monitor, uint64_t subid, int *index)
   6919 {
   6920 	struct ndb_subscription *sub, *tsub;
   6921 	int i;
   6922 
   6923 	for (i = 0, sub = NULL; i < monitor->num_subscriptions; i++) {
   6924 		tsub = &monitor->subscriptions[i];
   6925 		if (tsub->subid == subid) {
   6926 			sub = tsub;
   6927 			if (index)
   6928 				*index = i;
   6929 			break;
   6930 		}
   6931 	}
   6932 
   6933 	return sub;
   6934 }
   6935 
   6936 int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
   6937 		       int note_id_capacity)
   6938 {
   6939 	struct ndb_subscription *sub;
   6940 	int res;
   6941 
   6942 	if (subid == 0)
   6943 		return 0;
   6944 
   6945 	ndb_monitor_lock(&ndb->monitor);
   6946 
   6947 	if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL)))
   6948 		res = 0;
   6949 	else
   6950 		res = prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity);
   6951 
   6952 	ndb_monitor_unlock(&ndb->monitor);
   6953 
   6954 	return res;
   6955 }
   6956 
   6957 int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
   6958                        int note_id_capacity)
   6959 {
   6960 	struct ndb_subscription *sub;
   6961 	struct prot_queue *queue_inbox;
   6962 
   6963         // this is not a valid subscription id
   6964 	if (subid == 0)
   6965 		return 0;
   6966 
   6967 	ndb_monitor_lock(&ndb->monitor);
   6968 
   6969         if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) {
   6970 		ndb_monitor_unlock(&ndb->monitor);
   6971 		return 0;
   6972 	}
   6973 
   6974 	queue_inbox = &sub->inbox;
   6975 
   6976 	ndb_monitor_unlock(&ndb->monitor);
   6977 
   6978 	// there is technically a race condition if the thread yeilds at this
   6979 	// comment and a subscription is added/removed. A deadlock in the
   6980 	// writer queue would be much worse though. This function is dubious
   6981 	// anyways.
   6982 
   6983         return prot_queue_pop_all(queue_inbox, note_ids, note_id_capacity);
   6984 }
   6985 
   6986 int ndb_unsubscribe(struct ndb *ndb, uint64_t subid)
   6987 {
   6988 	struct ndb_subscription *sub;
   6989 	int index, res, elems_to_move;
   6990 
   6991 	ndb_monitor_lock(&ndb->monitor);
   6992 
   6993 	if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, &index))) {
   6994 		res = 0;
   6995 		goto done;
   6996 	}
   6997 
   6998 	ndb_subscription_destroy(sub);
   6999 
   7000 	elems_to_move = (--ndb->monitor.num_subscriptions) - index;
   7001 
   7002 	memmove(&ndb->monitor.subscriptions[index],
   7003 		&ndb->monitor.subscriptions[index+1],
   7004 		elems_to_move * sizeof(*sub));
   7005 
   7006 	res = 1;
   7007 
   7008 done:
   7009 	ndb_monitor_unlock(&ndb->monitor);
   7010 
   7011 	return res;
   7012 }
   7013 
   7014 int ndb_num_subscriptions(struct ndb *ndb)
   7015 {
   7016 	return ndb->monitor.num_subscriptions;
   7017 }
   7018 
   7019 uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filters)
   7020 {
   7021 	static uint64_t subids = 0;
   7022 	struct ndb_subscription *sub;
   7023 	size_t buflen;
   7024 	uint64_t subid;
   7025 	char *buf;
   7026 
   7027 	ndb_monitor_lock(&ndb->monitor);
   7028 
   7029 	if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) {
   7030 		fprintf(stderr, "too many subscriptions\n");
   7031 		subid = 0;
   7032 		goto done;
   7033 	}
   7034 
   7035 	sub = &ndb->monitor.subscriptions[ndb->monitor.num_subscriptions];
   7036 	subid = ++subids;
   7037 	sub->subid = subid;
   7038 
   7039 	ndb_filter_group_init(&sub->group);
   7040 	if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) {
   7041 		subid = 0;
   7042 		goto done;
   7043 	}
   7044 
   7045 	// 500k ought to be enough for anyone
   7046 	buflen = sizeof(uint64_t) * DEFAULT_QUEUE_SIZE;
   7047 	buf = malloc(buflen);
   7048 
   7049 	if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) {
   7050 		fprintf(stderr, "failed to push prot queue\n");
   7051 		subid = 0;
   7052 		goto done;
   7053 	}
   7054 
   7055 	ndb->monitor.num_subscriptions++;
   7056 done:
   7057 	ndb_monitor_unlock(&ndb->monitor);
   7058 
   7059 	return subid;
   7060 }