nostrdb.c (107753B)
1 2 #include "nostrdb.h" 3 #include "jsmn.h" 4 #include "hex.h" 5 #include "cursor.h" 6 #include "random.h" 7 #include "sha256.h" 8 #include "lmdb.h" 9 #include "util.h" 10 #include "cpu.h" 11 #include "threadpool.h" 12 #include "protected_queue.h" 13 #include "memchr.h" 14 #include <stdlib.h> 15 #include <limits.h> 16 #include <assert.h> 17 18 #include "bindings/c/profile_json_parser.h" 19 #include "bindings/c/profile_builder.h" 20 #include "bindings/c/meta_builder.h" 21 #include "bindings/c/meta_reader.h" 22 #include "bindings/c/profile_verifier.h" 23 #include "secp256k1.h" 24 #include "secp256k1_ecdh.h" 25 #include "secp256k1_schnorrsig.h" 26 27 #define max(a,b) ((a) > (b) ? (a) : (b)) 28 #define min(a,b) ((a) < (b) ? (a) : (b)) 29 30 // the maximum number of things threads pop and push in bulk 31 static const int THREAD_QUEUE_BATCH = 4096; 32 33 // the maximum size of inbox queues 34 static const int DEFAULT_QUEUE_SIZE = 1000000; 35 36 // increase if we need bigger filters 37 #define NDB_FILTER_PAGES 64 38 39 #define ndb_flag_set(flags, f) ((flags & f) == f) 40 41 #define NDB_PARSED_ID (1 << 0) 42 #define NDB_PARSED_PUBKEY (1 << 1) 43 #define NDB_PARSED_SIG (1 << 2) 44 #define NDB_PARSED_CREATED_AT (1 << 3) 45 #define NDB_PARSED_KIND (1 << 4) 46 #define NDB_PARSED_CONTENT (1 << 5) 47 #define NDB_PARSED_TAGS (1 << 6) 48 #define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS) 49 50 typedef int (*ndb_migrate_fn)(struct ndb *); 51 typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len, 52 int word_index); 53 54 struct ndb_migration { 55 ndb_migrate_fn fn; 56 }; 57 58 struct ndb_profile_record_builder { 59 flatcc_builder_t *builder; 60 void *flatbuf; 61 }; 62 63 // controls whether to continue or stop the json parser 64 enum ndb_idres { 65 NDB_IDRES_CONT, 66 NDB_IDRES_STOP, 67 }; 68 69 // closure data for the id-detecting ingest controller 70 struct ndb_ingest_controller 71 { 72 MDB_txn *read_txn; 73 struct ndb_lmdb *lmdb; 74 }; 75 76 enum ndb_writer_msgtype { 77 NDB_WRITER_QUIT, // kill thread immediately 78 NDB_WRITER_NOTE, // write a note to the db 79 NDB_WRITER_PROFILE, // write a profile to the db 80 NDB_WRITER_DBMETA, // write ndb metadata 81 NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched 82 }; 83 84 // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) 85 enum ndb_meta_key { 86 NDB_META_KEY_VERSION = 1 87 }; 88 89 struct ndb_json_parser { 90 const char *json; 91 int json_len; 92 struct ndb_builder builder; 93 jsmn_parser json_parser; 94 jsmntok_t *toks, *toks_end; 95 int i; 96 int num_tokens; 97 }; 98 99 // useful to pass to threads on its own 100 struct ndb_lmdb { 101 MDB_env *env; 102 MDB_dbi dbs[NDB_DBS]; 103 }; 104 105 struct ndb_writer { 106 struct ndb_lmdb *lmdb; 107 108 void *queue_buf; 109 int queue_buflen; 110 pthread_t thread_id; 111 112 struct prot_queue inbox; 113 }; 114 115 struct ndb_ingester { 116 uint32_t flags; 117 struct threadpool tp; 118 struct ndb_writer *writer; 119 void *filter_context; 120 ndb_ingest_filter_fn filter; 121 }; 122 123 124 struct ndb { 125 struct ndb_lmdb lmdb; 126 struct ndb_ingester ingester; 127 struct ndb_writer writer; 128 int version; 129 uint32_t flags; // setting flags 130 // lmdb environ handles, etc 131 }; 132 133 134 // A clustered key with an id and a timestamp 135 struct ndb_tsid { 136 unsigned char id[32]; 137 uint64_t timestamp; 138 }; 139 140 // A u64 + timestamp id. Just using this for kinds at the moment. 141 struct ndb_u64_tsid { 142 uint64_t u64; // kind, etc 143 uint64_t timestamp; 144 }; 145 146 struct ndb_word 147 { 148 const char *word; 149 int word_len; 150 }; 151 152 struct ndb_search_words 153 { 154 struct ndb_word words[MAX_TEXT_SEARCH_WORDS]; 155 int num_words; 156 }; 157 158 // ndb_text_search_key 159 // 160 // This is compressed when in lmdb: 161 // 162 // note_id: varint 163 // strlen: varint 164 // str: cstr 165 // timestamp: varint 166 // word_index: varint 167 // 168 static int ndb_make_text_search_key(unsigned char *buf, int bufsize, 169 int word_index, int word_len, const char *str, 170 uint64_t timestamp, uint64_t note_id, 171 int *keysize) 172 { 173 struct cursor cur; 174 int size, pad; 175 make_cursor(buf, buf + bufsize, &cur); 176 177 // TODO: need update this to uint64_t 178 // we push this first because our query function can pull this off 179 // quicky to check matches 180 if (!push_varint(&cur, (int32_t)note_id)) 181 return 0; 182 183 // string length 184 if (!push_varint(&cur, word_len)) 185 return 0; 186 187 // non-null terminated, lowercase string 188 if (!cursor_push_lowercase(&cur, str, word_len)) 189 return 0; 190 191 // TODO: need update this to uint64_t 192 if (!push_varint(&cur, (int)timestamp)) 193 return 0; 194 195 // the index of the word in the content so that we can do more accurate 196 // phrase searches 197 if (!push_varint(&cur, word_index)) 198 return 0; 199 200 size = cur.p - cur.start; 201 202 // pad to 8-byte alignment 203 pad = ((size + 7) & ~7) - size; 204 if (pad > 0) { 205 if (!cursor_memset(&cur, 0, pad)) { 206 return 0; 207 } 208 } 209 210 *keysize = cur.p - cur.start; 211 assert((*keysize % 8) == 0); 212 213 return 1; 214 } 215 216 static int ndb_make_noted_text_search_key(unsigned char *buf, int bufsize, 217 int wordlen, const char *word, 218 uint64_t timestamp, uint64_t note_id, 219 int *keysize) 220 { 221 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 222 timestamp, note_id, keysize); 223 } 224 225 static int ndb_make_text_search_key_low(unsigned char *buf, int bufsize, 226 int wordlen, const char *word, 227 int *keysize) 228 { 229 uint64_t timestamp, note_id; 230 timestamp = 0; 231 note_id = 0; 232 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 233 timestamp, note_id, keysize); 234 } 235 236 static int ndb_make_text_search_key_high(unsigned char *buf, int bufsize, 237 int wordlen, const char *word, 238 int *keysize) 239 { 240 uint64_t timestamp, note_id; 241 timestamp = INT32_MAX; 242 note_id = INT32_MAX; 243 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 244 timestamp, note_id, keysize); 245 } 246 247 typedef int (*ndb_text_search_key_order_fn)(unsigned char *buf, int bufsize, int wordlen, const char *word, int *keysize); 248 249 /** From LMDB: Compare two items lexically */ 250 static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) { 251 int diff; 252 ssize_t len_diff; 253 unsigned int len; 254 255 len = a->mv_size; 256 len_diff = (ssize_t) a->mv_size - (ssize_t) b->mv_size; 257 if (len_diff > 0) { 258 len = b->mv_size; 259 len_diff = 1; 260 } 261 262 diff = memcmp(a->mv_data, b->mv_data, len); 263 return diff ? diff : len_diff<0 ? -1 : len_diff; 264 } 265 266 static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b) 267 { 268 struct cursor ca, cb; 269 int sa, sb; 270 int nid_a, nid_b; 271 MDB_val a2, b2; 272 273 make_cursor(a->mv_data, a->mv_data + a->mv_size, &ca); 274 make_cursor(b->mv_data, b->mv_data + b->mv_size, &cb); 275 276 // note_id 277 if (unlikely(!pull_varint(&ca, &nid_a) || !pull_varint(&cb, &nid_b))) 278 return 0; 279 280 // string size 281 if (unlikely(!pull_varint(&ca, &sa) || !pull_varint(&cb, &sb))) 282 return 0; 283 284 a2.mv_data = ca.p; 285 a2.mv_size = sa; 286 287 b2.mv_data = cb.p; 288 b2.mv_size = sb; 289 290 int cmp = mdb_cmp_memn(&a2, &b2); 291 if (cmp) return cmp; 292 293 // skip over string 294 ca.p += sa; 295 cb.p += sb; 296 297 // timestamp 298 if (unlikely(!pull_varint(&ca, &sa) || !pull_varint(&cb, &sb))) 299 return 0; 300 301 if (sa < sb) return -1; 302 else if (sa > sb) return 1; 303 304 // note_id 305 if (nid_a < nid_b) return -1; 306 else if (nid_a > nid_b) return 1; 307 308 // word index 309 if (unlikely(!pull_varint(&ca, &sa) || !pull_varint(&cb, &sb))) 310 return 0; 311 312 if (sa < sb) return -1; 313 else if (sa > sb) return 1; 314 315 return 0; 316 } 317 318 static inline int ndb_unpack_text_search_key_noteid( 319 struct cursor *cur, uint64_t *note_id) 320 { 321 int inote_id; 322 if (!pull_varint(cur, &inote_id)) 323 return 0; 324 325 *note_id = inote_id; 326 return 1; 327 } 328 329 // faster peek of just the string instead of unpacking everything 330 // this is used to quickly discard range query matches if there is no 331 // common prefix 332 static inline int ndb_unpack_text_search_key_string(struct cursor *cur, 333 const char **str, 334 int *str_len) 335 { 336 if (!pull_varint(cur, str_len)) 337 return 0; 338 339 *str = (const char *)cur->p; 340 341 if (!cursor_skip(cur, *str_len)) 342 return 0; 343 344 return 1; 345 } 346 347 // should be called after ndb_unpack_text_search_key_string. It continues 348 // the unpacking of a text search key if we've already started it. 349 static inline int 350 ndb_unpack_remaining_text_search_key(struct cursor *cur, 351 struct ndb_text_search_key *key) 352 { 353 int timestamp; 354 355 if (!pull_varint(cur, ×tamp)) 356 return 0; 357 358 if (!pull_varint(cur, &key->word_index)) 359 return 0; 360 361 key->timestamp = timestamp; 362 363 return 1; 364 } 365 366 // unpack a fulltext search key 367 // 368 // full version of string + unpack remaining. This is split up because text 369 // searching only requires to pull the string for prefix searching, and the 370 // remaining is optional 371 static inline int ndb_unpack_text_search_key(unsigned char *p, int len, 372 struct ndb_text_search_key *key) 373 { 374 struct cursor c; 375 make_cursor(p, p + len, &c); 376 377 if (!ndb_unpack_text_search_key_noteid(&c, &key->note_id)) 378 return 0; 379 380 if (!ndb_unpack_text_search_key_string(&c, &key->str, &key->str_len)) 381 return 0; 382 383 return ndb_unpack_remaining_text_search_key(&c, key); 384 } 385 386 // Copies only lowercase characters to the destination string and fills the rest with null bytes. 387 // `dst` and `src` are pointers to the destination and source strings, respectively. 388 // `n` is the maximum number of characters to copy. 389 static void lowercase_strncpy(char *dst, const char *src, int n) { 390 int j = 0, i = 0; 391 392 if (!dst || !src || n == 0) { 393 return; 394 } 395 396 while (src[i] != '\0' && j < n) { 397 dst[j++] = tolower(src[i++]); 398 } 399 400 // Null-terminate and fill the destination string 401 while (j < n) { 402 dst[j++] = '\0'; 403 } 404 } 405 406 int ndb_filter_init(struct ndb_filter *filter) 407 { 408 struct cursor cur; 409 int page_size, elem_pages, data_pages, buf_size; 410 411 page_size = 4096; // assuming this, not a big deal if we're wrong 412 elem_pages = NDB_FILTER_PAGES / 4; 413 data_pages = NDB_FILTER_PAGES - elem_pages; 414 buf_size = page_size * NDB_FILTER_PAGES; 415 416 unsigned char *buf = malloc(buf_size); 417 if (!buf) 418 return 0; 419 420 // init memory arena for the cursor 421 make_cursor(buf, buf + buf_size, &cur); 422 423 cursor_slice(&cur, &filter->elem_buf, page_size * elem_pages); 424 cursor_slice(&cur, &filter->data_buf, page_size * data_pages); 425 426 // make sure we are fully allocated 427 assert(cur.p == cur.end); 428 429 // make sure elem_buf is the start of the buffer 430 assert(filter->elem_buf.start == cur.start); 431 432 filter->num_elements = 0; 433 filter->elements[0] = (struct ndb_filter_elements*) buf; 434 filter->current = NULL; 435 436 return 1; 437 } 438 439 void ndb_filter_reset(struct ndb_filter *filter) 440 { 441 filter->num_elements = 0; 442 filter->elem_buf.p = filter->elem_buf.start; 443 filter->data_buf.p = filter->data_buf.start; 444 filter->current = NULL; 445 } 446 447 void ndb_filter_free(struct ndb_filter *filter) 448 { 449 if (filter->elem_buf.start) 450 free(filter->elem_buf.start); 451 452 memset(filter, 0, sizeof(*filter)); 453 } 454 455 static const char *ndb_filter_field_name(enum ndb_filter_fieldtype field) 456 { 457 switch (field) { 458 case NDB_FILTER_IDS: return "ids"; 459 case NDB_FILTER_AUTHORS: return "authors"; 460 case NDB_FILTER_KINDS: return "kinds"; 461 case NDB_FILTER_GENERIC: return "generic"; 462 case NDB_FILTER_SINCE: return "since"; 463 case NDB_FILTER_UNTIL: return "until"; 464 case NDB_FILTER_LIMIT: return "limit"; 465 } 466 467 return "unknown"; 468 } 469 470 static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filter_fieldtype field, char generic) 471 { 472 int i; 473 struct ndb_filter_elements *els, *el; 474 475 if (filter->current) { 476 fprintf(stderr, "ndb_filter_start_field: filter field already in progress, did you forget to call ndb_filter_end_field?\n"); 477 return 0; 478 } 479 480 // you can only start and end fields once 481 for (i = 0; i < filter->num_elements; i++) { 482 el = filter->elements[i]; 483 if (el->field.type == field) { 484 fprintf(stderr, "ndb_filter_start_field: field '%s' already exists\n", 485 ndb_filter_field_name(field)); 486 return 0; 487 } 488 } 489 490 els = (struct ndb_filter_elements *) filter->elem_buf.p ; 491 filter->current = els; 492 493 // advance elem buffer to the variable data section 494 if (!cursor_skip(&filter->elem_buf, sizeof(struct ndb_filter_elements))) { 495 fprintf(stderr, "ndb_filter_start_field: '%s' oom (todo: realloc?)\n", 496 ndb_filter_field_name(field)); 497 return 0; 498 } 499 500 els->field.type = field; 501 els->field.generic = generic; 502 els->field.elem_type = 0; 503 els->count = 0; 504 505 return 1; 506 } 507 508 int ndb_filter_start_field(struct ndb_filter *filter, enum ndb_filter_fieldtype field) 509 { 510 return ndb_filter_start_field_impl(filter, field, 0); 511 } 512 513 int ndb_filter_start_generic_field(struct ndb_filter *filter, char tag) 514 { 515 return ndb_filter_start_field_impl(filter, NDB_FILTER_GENERIC, tag); 516 } 517 518 static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_element el) 519 { 520 unsigned char *data; 521 const char *str; 522 523 if (!filter->current) 524 return 0; 525 526 data = filter->data_buf.p; 527 528 switch (filter->current->field.type) { 529 case NDB_FILTER_IDS: 530 case NDB_FILTER_AUTHORS: 531 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 532 return 0; 533 el.id = data; 534 break; 535 case NDB_FILTER_KINDS: 536 break; 537 case NDB_FILTER_SINCE: 538 case NDB_FILTER_UNTIL: 539 case NDB_FILTER_LIMIT: 540 // only one allowed for since/until 541 if (filter->current->count != 0) 542 return 0; 543 break; 544 case NDB_FILTER_GENERIC: 545 str = (const char *)filter->data_buf.p; 546 if (!cursor_push_c_str(&filter->data_buf, el.string)) 547 return 0; 548 // push a pointer of the string in the databuf as an element 549 el.string = str; 550 break; 551 } 552 553 if (!cursor_push(&filter->elem_buf, (unsigned char*)&el, sizeof(el))) 554 return 0; 555 556 filter->current->count++; 557 558 return 1; 559 } 560 561 static int ndb_filter_set_elem_type(struct ndb_filter *filter, 562 enum ndb_generic_element_type elem_type) 563 { 564 enum ndb_generic_element_type current_elem_type; 565 566 if (!filter->current) 567 return 0; 568 569 current_elem_type = filter->current->field.elem_type; 570 571 // element types must be uniform 572 if (current_elem_type != elem_type && current_elem_type != NDB_ELEMENT_UNKNOWN) { 573 fprintf(stderr, "ndb_filter_set_elem_type: element types must be uniform\n"); 574 return 0; 575 } 576 577 filter->current->field.elem_type = elem_type; 578 579 return 1; 580 } 581 582 int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str) 583 { 584 union ndb_filter_element el; 585 586 if (!filter->current) 587 return 0; 588 589 // only generic queries are allowed to have strings 590 switch (filter->current->field.type) { 591 case NDB_FILTER_SINCE: 592 case NDB_FILTER_UNTIL: 593 case NDB_FILTER_LIMIT: 594 case NDB_FILTER_IDS: 595 case NDB_FILTER_AUTHORS: 596 case NDB_FILTER_KINDS: 597 return 0; 598 case NDB_FILTER_GENERIC: 599 break; 600 } 601 602 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_STRING)) 603 return 0; 604 605 el.string = str; 606 return ndb_filter_add_element(filter, el); 607 } 608 609 int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer) 610 { 611 union ndb_filter_element el; 612 if (!filter->current) 613 return 0; 614 615 switch (filter->current->field.type) { 616 case NDB_FILTER_IDS: 617 case NDB_FILTER_AUTHORS: 618 case NDB_FILTER_GENERIC: 619 return 0; 620 case NDB_FILTER_KINDS: 621 case NDB_FILTER_SINCE: 622 case NDB_FILTER_UNTIL: 623 case NDB_FILTER_LIMIT: 624 break; 625 } 626 627 el.integer = integer; 628 629 return ndb_filter_add_element(filter, el); 630 } 631 632 int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id) 633 { 634 union ndb_filter_element el; 635 636 if (!filter->current) 637 return 0; 638 639 // only certain filter types allow pushing id elements 640 switch (filter->current->field.type) { 641 case NDB_FILTER_SINCE: 642 case NDB_FILTER_UNTIL: 643 case NDB_FILTER_LIMIT: 644 return 0; 645 case NDB_FILTER_IDS: 646 case NDB_FILTER_AUTHORS: 647 case NDB_FILTER_KINDS: 648 case NDB_FILTER_GENERIC: 649 break; 650 } 651 652 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_ID)) 653 return 0; 654 655 // this is needed so that generic filters know its an id 656 el.id = id; 657 658 return ndb_filter_add_element(filter, el); 659 } 660 661 // TODO: build a hashtable so this is O(1) 662 static int ndb_generic_filter_matches(struct ndb_filter_elements *els, 663 struct ndb_note *note) 664 { 665 int i; 666 union ndb_filter_element el; 667 struct ndb_iterator iter, *it = &iter; 668 struct ndb_str str; 669 670 ndb_tags_iterate_start(note, it); 671 672 while (ndb_tags_iterate_next(it)) { 673 // we're looking for tags with 2 or more entries: ["p", id], etc 674 if (it->tag->count < 2) 675 continue; 676 677 str = ndb_note_str(note, &it->tag->strs[0]); 678 679 // we only care about packed strings (single char, etc) 680 if (str.flag != NDB_PACKED_STR) 681 continue; 682 683 // do we have #e matching e (or p, etc) 684 if (str.str[0] != els->field.generic || str.str[1] != 0) 685 continue; 686 687 str = ndb_note_str(note, &it->tag->strs[1]); 688 689 switch (els->field.elem_type) { 690 case NDB_ELEMENT_ID: 691 // if our filter element type is an id, then we 692 // expect a packed id in the tag, otherwise skip 693 if (str.flag != NDB_PACKED_ID) 694 continue; 695 break; 696 case NDB_ELEMENT_STRING: 697 // if our filter element type is a string, then 698 // we should not expect an id 699 if (str.flag == NDB_PACKED_ID) 700 continue; 701 break; 702 case NDB_ELEMENT_UNKNOWN: 703 default: 704 // For some reason the element type is not set. It's 705 // possible nothing was added to the generic filter? 706 // Let's just fail here and log a note for debugging 707 fprintf(stderr, "UNUSUAL ndb_generic_filter_matches: have unknown element type %d\n", els->field.elem_type); 708 return 0; 709 } 710 711 for (i = 0; i < els->count; i++) { 712 el = els->elements[i]; 713 switch (els->field.elem_type) { 714 case NDB_ELEMENT_ID: 715 if (!memcmp(el.id, str.id, 32)) 716 return 1; 717 break; 718 case NDB_ELEMENT_STRING: 719 if (!strcmp(el.string, str.str)) 720 return 1; 721 break; 722 case NDB_ELEMENT_UNKNOWN: 723 return 0; 724 } 725 } 726 } 727 728 return 0; 729 } 730 731 // returns 1 if a filter matches a note 732 int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note) 733 { 734 int i, j; 735 struct ndb_filter_elements *els; 736 737 for (i = 0; i < filter->num_elements; i++) { 738 els = filter->elements[i]; 739 740 switch (els->field.type) { 741 case NDB_FILTER_KINDS: 742 for (j = 0; j < els->count; j++) { 743 if ((unsigned int)els->elements[j].integer == note->kind) 744 goto cont; 745 } 746 break; 747 // TODO: add filter hashtable for large id lists 748 case NDB_FILTER_IDS: 749 for (j = 0; j < els->count; j++) { 750 if (!memcmp(els->elements[j].id, note->id, 32)) 751 goto cont; 752 } 753 break; 754 case NDB_FILTER_AUTHORS: 755 for (j = 0; j < els->count; j++) { 756 if (!memcmp(els->elements[j].id, note->pubkey, 32)) 757 goto cont; 758 } 759 break; 760 case NDB_FILTER_GENERIC: 761 if (ndb_generic_filter_matches(els, note)) 762 continue; 763 break; 764 case NDB_FILTER_SINCE: 765 assert(els->count == 1); 766 if (note->created_at >= els->elements[0].integer) 767 continue; 768 break; 769 case NDB_FILTER_UNTIL: 770 assert(els->count == 1); 771 if (note->created_at < els->elements[0].integer) 772 continue; 773 case NDB_FILTER_LIMIT: 774 cont: 775 continue; 776 } 777 778 // all need to match 779 return 0; 780 } 781 782 return 1; 783 } 784 785 void ndb_filter_end_field(struct ndb_filter *filter) 786 { 787 filter->elements[filter->num_elements++] = filter->current; 788 filter->current = NULL; 789 } 790 791 static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id, 792 uint64_t timestamp, const char *search) 793 { 794 memcpy(key->id, id, 32); 795 key->timestamp = timestamp; 796 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 797 key->search[sizeof(key->search) - 1] = '\0'; 798 } 799 800 static int ndb_write_profile_search_index(struct ndb_txn *txn, 801 struct ndb_search_key *index_key, 802 uint64_t profile_key) 803 { 804 int rc; 805 MDB_val key, val; 806 807 key.mv_data = index_key; 808 key.mv_size = sizeof(*index_key); 809 val.mv_data = &profile_key; 810 val.mv_size = sizeof(profile_key); 811 812 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 813 &key, &val, 0))) 814 { 815 ndb_debug("ndb_write_profile_search_index failed: %s\n", 816 mdb_strerror(rc)); 817 return 0; 818 } 819 820 return 1; 821 } 822 823 824 // map usernames and display names to profile keys for user searching 825 static int ndb_write_profile_search_indices(struct ndb_txn *txn, 826 struct ndb_note *note, 827 uint64_t profile_key, 828 void *profile_root) 829 { 830 struct ndb_search_key index; 831 NdbProfileRecord_table_t profile_record; 832 NdbProfile_table_t profile; 833 834 profile_record = NdbProfileRecord_as_root(profile_root); 835 profile = NdbProfileRecord_profile_get(profile_record); 836 837 const char *name = NdbProfile_name_get(profile); 838 const char *display_name = NdbProfile_display_name_get(profile); 839 840 // words + pubkey + created 841 if (name) { 842 ndb_make_search_key(&index, note->pubkey, note->created_at, 843 name); 844 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 845 return 0; 846 } 847 848 if (display_name) { 849 // don't write the same name/display_name twice 850 if (name && !strcmp(display_name, name)) { 851 return 1; 852 } 853 ndb_make_search_key(&index, note->pubkey, note->created_at, 854 display_name); 855 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 856 return 0; 857 } 858 859 return 1; 860 } 861 862 863 static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags) 864 { 865 txn->lmdb = &ndb->lmdb; 866 MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn; 867 if (!txn->lmdb->env) 868 return 0; 869 return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0; 870 } 871 872 int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn) 873 { 874 return _ndb_begin_query(ndb, txn, MDB_RDONLY); 875 } 876 877 // this should only be used in migrations, etc 878 static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn) 879 { 880 return _ndb_begin_query(ndb, txn, 0); 881 } 882 883 884 // Migrations 885 // 886 887 static int ndb_migrate_user_search_indices(struct ndb *ndb) 888 { 889 int rc; 890 MDB_cursor *cur; 891 MDB_val k, v; 892 void *profile_root; 893 NdbProfileRecord_table_t record; 894 struct ndb_txn txn; 895 struct ndb_note *note; 896 uint64_t note_key, profile_key; 897 size_t len; 898 int count; 899 900 if (!ndb_begin_rw_query(ndb, &txn)) { 901 fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_rw_query failed\n"); 902 return 0; 903 } 904 905 if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) { 906 fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); 907 return 0; 908 } 909 910 count = 0; 911 912 // loop through all profiles and write search indices 913 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 914 profile_root = v.mv_data; 915 profile_key = *((uint64_t*)k.mv_data); 916 record = NdbProfileRecord_as_root(profile_root); 917 note_key = NdbProfileRecord_note_key(record); 918 note = ndb_get_note_by_key(&txn, note_key, &len); 919 920 if (note == NULL) { 921 fprintf(stderr, "ndb_migrate_user_search_indices: note lookup failed\n"); 922 return 0; 923 } 924 925 if (!ndb_write_profile_search_indices(&txn, note, profile_key, 926 profile_root)) { 927 928 fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n"); 929 return 0; 930 } 931 932 count++; 933 } 934 935 fprintf(stderr, "migrated %d profiles to include search indices\n", count); 936 937 mdb_cursor_close(cur); 938 939 ndb_end_query(&txn); 940 941 return 1; 942 } 943 944 static int ndb_migrate_lower_user_search_indices(struct ndb *ndb) 945 { 946 MDB_txn *txn; 947 948 if (mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn)) { 949 fprintf(stderr, "ndb_migrate_lower_user_search_indices: ndb_txn_begin failed\n"); 950 return 0; 951 } 952 953 // just drop the search db so we can rebuild it 954 if (mdb_drop(txn, ndb->lmdb.dbs[NDB_DB_PROFILE_SEARCH], 0)) { 955 fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n"); 956 return 0; 957 } 958 959 mdb_txn_commit(txn); 960 961 return ndb_migrate_user_search_indices(ndb); 962 } 963 964 int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile); 965 966 967 int ndb_db_version(struct ndb *ndb) 968 { 969 int rc; 970 uint64_t version, version_key; 971 MDB_val k, v; 972 MDB_txn *txn; 973 974 version_key = NDB_META_KEY_VERSION; 975 k.mv_data = &version_key; 976 k.mv_size = sizeof(version_key); 977 978 if ((rc = mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn))) { 979 fprintf(stderr, "ndb_db_version: mdb_txn_begin failed, error %d\n", rc); 980 return -1; 981 } 982 983 if (mdb_get(txn, ndb->lmdb.dbs[NDB_DB_NDB_META], &k, &v)) { 984 version = -1; 985 } else { 986 if (v.mv_size != 8) { 987 fprintf(stderr, "run_migrations: invalid version size?"); 988 return 0; 989 } 990 version = *((uint64_t*)v.mv_data); 991 } 992 993 mdb_txn_abort(txn); 994 return version; 995 } 996 997 // custom kind+timestamp comparison function. This is used by lmdb to perform 998 // b+ tree searches over the kind+timestamp index 999 static int ndb_u64_tsid_compare(const MDB_val *a, const MDB_val *b) 1000 { 1001 struct ndb_u64_tsid *tsa, *tsb; 1002 tsa = a->mv_data; 1003 tsb = b->mv_data; 1004 1005 if (tsa->u64 < tsb->u64) 1006 return -1; 1007 else if (tsa->u64 > tsb->u64) 1008 return 1; 1009 1010 if (tsa->timestamp < tsb->timestamp) 1011 return -1; 1012 else if (tsa->timestamp > tsb->timestamp) 1013 return 1; 1014 1015 return 0; 1016 } 1017 1018 static int ndb_tsid_compare(const MDB_val *a, const MDB_val *b) 1019 { 1020 struct ndb_tsid *tsa, *tsb; 1021 MDB_val a2 = *a, b2 = *b; 1022 1023 a2.mv_size = sizeof(tsa->id); 1024 b2.mv_size = sizeof(tsb->id); 1025 1026 int cmp = mdb_cmp_memn(&a2, &b2); 1027 if (cmp) return cmp; 1028 1029 tsa = a->mv_data; 1030 tsb = b->mv_data; 1031 1032 if (tsa->timestamp < tsb->timestamp) 1033 return -1; 1034 else if (tsa->timestamp > tsb->timestamp) 1035 return 1; 1036 return 0; 1037 } 1038 1039 static inline void ndb_tsid_low(struct ndb_tsid *key, unsigned char *id) 1040 { 1041 memcpy(key->id, id, 32); 1042 key->timestamp = 0; 1043 } 1044 1045 static inline void ndb_tsid_init(struct ndb_tsid *key, unsigned char *id, 1046 uint64_t timestamp) 1047 { 1048 memcpy(key->id, id, 32); 1049 key->timestamp = timestamp; 1050 } 1051 1052 static inline void ndb_u64_tsid_init(struct ndb_u64_tsid *key, uint64_t integer, 1053 uint64_t timestamp) 1054 { 1055 key->u64 = integer; 1056 key->timestamp = timestamp; 1057 } 1058 1059 // useful for range-searching for the latest key with a clustered created_at timen 1060 static inline void ndb_tsid_high(struct ndb_tsid *key, const unsigned char *id) 1061 { 1062 memcpy(key->id, id, 32); 1063 key->timestamp = UINT64_MAX; 1064 } 1065 1066 enum ndb_ingester_msgtype { 1067 NDB_INGEST_EVENT, // write json to the ingester queue for processing 1068 NDB_INGEST_QUIT, // kill ingester thread immediately 1069 }; 1070 1071 struct ndb_ingester_event { 1072 char *json; 1073 unsigned client : 1; // ["EVENT", {...}] messages 1074 unsigned len : 31; 1075 }; 1076 1077 struct ndb_writer_note { 1078 struct ndb_note *note; 1079 size_t note_len; 1080 }; 1081 1082 struct ndb_writer_profile { 1083 struct ndb_writer_note note; 1084 struct ndb_profile_record_builder record; 1085 }; 1086 1087 struct ndb_ingester_msg { 1088 enum ndb_ingester_msgtype type; 1089 union { 1090 struct ndb_ingester_event event; 1091 }; 1092 }; 1093 1094 struct ndb_writer_ndb_meta { 1095 // these are 64 bit because I'm paranoid of db-wide alignment issues 1096 uint64_t version; 1097 }; 1098 1099 // Used in the writer thread when writing ndb_profile_fetch_record's 1100 // kv = pubkey: recor 1101 struct ndb_writer_last_fetch { 1102 unsigned char pubkey[32]; 1103 uint64_t fetched_at; 1104 }; 1105 1106 // The different types of messages that the writer thread can write to the 1107 // database 1108 struct ndb_writer_msg { 1109 enum ndb_writer_msgtype type; 1110 union { 1111 struct ndb_writer_note note; 1112 struct ndb_writer_profile profile; 1113 struct ndb_writer_ndb_meta ndb_meta; 1114 struct ndb_writer_last_fetch last_fetch; 1115 }; 1116 }; 1117 1118 static inline int ndb_writer_queue_msg(struct ndb_writer *writer, 1119 struct ndb_writer_msg *msg) 1120 { 1121 return prot_queue_push(&writer->inbox, msg); 1122 } 1123 1124 static int ndb_migrate_utf8_profile_names(struct ndb *ndb) 1125 { 1126 int rc; 1127 MDB_cursor *cur; 1128 MDB_val k, v; 1129 void *profile_root; 1130 NdbProfileRecord_table_t record; 1131 struct ndb_txn txn; 1132 struct ndb_note *note, *copied_note; 1133 uint64_t note_key; 1134 size_t len; 1135 int count, failed; 1136 struct ndb_writer_msg out; 1137 1138 if (!ndb_begin_rw_query(ndb, &txn)) { 1139 fprintf(stderr, "ndb_migrate_utf8_profile_names: ndb_begin_rw_query failed\n"); 1140 return 0; 1141 } 1142 1143 if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) { 1144 fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc); 1145 return 0; 1146 } 1147 1148 count = 0; 1149 failed = 0; 1150 1151 // loop through all profiles and write search indices 1152 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1153 profile_root = v.mv_data; 1154 record = NdbProfileRecord_as_root(profile_root); 1155 note_key = NdbProfileRecord_note_key(record); 1156 note = ndb_get_note_by_key(&txn, note_key, &len); 1157 1158 if (note == NULL) { 1159 fprintf(stderr, "ndb_migrate_utf8_profile_names: note lookup failed\n"); 1160 return 0; 1161 } 1162 1163 struct ndb_profile_record_builder *b = &out.profile.record; 1164 1165 // reprocess profile 1166 if (!ndb_process_profile_note(note, b)) { 1167 failed++; 1168 continue; 1169 } 1170 1171 // the writer needs to own this note, and its expected to free it 1172 copied_note = malloc(len); 1173 memcpy(copied_note, note, len); 1174 1175 out.type = NDB_WRITER_PROFILE; 1176 out.profile.note.note = copied_note; 1177 out.profile.note.note_len = len; 1178 1179 ndb_writer_queue_msg(&ndb->writer, &out); 1180 1181 count++; 1182 } 1183 1184 fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count); 1185 1186 if (failed != 0) { 1187 fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed); 1188 } 1189 1190 mdb_cursor_close(cur); 1191 1192 ndb_end_query(&txn); 1193 1194 return 1; 1195 } 1196 1197 static struct ndb_migration MIGRATIONS[] = { 1198 { .fn = ndb_migrate_user_search_indices }, 1199 { .fn = ndb_migrate_lower_user_search_indices }, 1200 { .fn = ndb_migrate_utf8_profile_names } 1201 }; 1202 1203 1204 int ndb_end_query(struct ndb_txn *txn) 1205 { 1206 // this works on read or write queries. 1207 return mdb_txn_commit(txn->mdb_txn) == 0; 1208 } 1209 1210 int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32], 1211 unsigned char sig[64]) 1212 { 1213 secp256k1_xonly_pubkey xonly_pubkey; 1214 int ok; 1215 1216 ok = secp256k1_xonly_pubkey_parse((secp256k1_context*)ctx, &xonly_pubkey, 1217 pubkey) != 0; 1218 if (!ok) return 0; 1219 1220 ok = secp256k1_schnorrsig_verify((secp256k1_context*)ctx, sig, id, 32, 1221 &xonly_pubkey) > 0; 1222 if (!ok) return 0; 1223 1224 return 1; 1225 } 1226 1227 static inline int ndb_writer_queue_msgs(struct ndb_writer *writer, 1228 struct ndb_writer_msg *msgs, 1229 int num_msgs) 1230 { 1231 return prot_queue_push_all(&writer->inbox, msgs, num_msgs); 1232 } 1233 1234 static int ndb_writer_queue_note(struct ndb_writer *writer, 1235 struct ndb_note *note, size_t note_len) 1236 { 1237 struct ndb_writer_msg msg; 1238 msg.type = NDB_WRITER_NOTE; 1239 1240 msg.note.note = note; 1241 msg.note.note_len = note_len; 1242 1243 return prot_queue_push(&writer->inbox, &msg); 1244 } 1245 1246 static void ndb_writer_last_profile_fetch(struct ndb_txn *txn, 1247 const unsigned char *pubkey, 1248 uint64_t fetched_at) 1249 { 1250 int rc; 1251 MDB_val key, val; 1252 1253 key.mv_data = (unsigned char*)pubkey; 1254 key.mv_size = 32; 1255 val.mv_data = &fetched_at; 1256 val.mv_size = sizeof(fetched_at); 1257 1258 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], 1259 &key, &val, 0))) 1260 { 1261 ndb_debug("write version to ndb_meta failed: %s\n", 1262 mdb_strerror(rc)); 1263 return; 1264 } 1265 1266 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 1267 } 1268 1269 1270 // We just received a profile that we haven't processed yet, but it could 1271 // be an older one! Make sure we only write last fetched profile if it's a new 1272 // one 1273 // 1274 // To do this, we first check the latest profile in the database. If the 1275 // created_date for this profile note is newer, then we write a 1276 // last_profile_fetch record, otherwise we do not. 1277 // 1278 // WARNING: This function is only valid when called from the writer thread 1279 static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn, 1280 struct ndb_note *note) 1281 { 1282 size_t len; 1283 uint64_t profile_key, note_key; 1284 void *root; 1285 struct ndb_note *last_profile; 1286 NdbProfileRecord_table_t record; 1287 1288 if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) { 1289 record = NdbProfileRecord_as_root(root); 1290 note_key = NdbProfileRecord_note_key(record); 1291 last_profile = ndb_get_note_by_key(txn, note_key, &len); 1292 if (last_profile == NULL) { 1293 return 0; 1294 } 1295 1296 // found profile, let's see if it's newer than ours 1297 if (note->created_at > last_profile->created_at) { 1298 // this is a new profile note, record last fetched time 1299 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 1300 } 1301 } else { 1302 // couldn't fetch profile. record last fetched time 1303 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 1304 } 1305 1306 return 1; 1307 } 1308 1309 int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, 1310 uint64_t fetched_at) 1311 { 1312 struct ndb_writer_msg msg; 1313 msg.type = NDB_WRITER_PROFILE_LAST_FETCH; 1314 memcpy(&msg.last_fetch.pubkey[0], pubkey, 32); 1315 msg.last_fetch.fetched_at = fetched_at; 1316 1317 return ndb_writer_queue_msg(&ndb->writer, &msg); 1318 } 1319 1320 // get some value based on a clustered id key 1321 int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id, 1322 MDB_val *val) 1323 { 1324 MDB_val k, v; 1325 MDB_cursor *cur; 1326 int success = 0, rc; 1327 struct ndb_tsid tsid; 1328 1329 // position at the most recent 1330 ndb_tsid_high(&tsid, id); 1331 1332 k.mv_data = &tsid; 1333 k.mv_size = sizeof(tsid); 1334 1335 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur))) { 1336 ndb_debug("ndb_get_tsid: failed to open cursor: '%s'\n", mdb_strerror(rc)); 1337 return 0; 1338 } 1339 1340 // Position cursor at the next key greater than or equal to the specified key 1341 if (mdb_cursor_get(cur, &k, &v, MDB_SET_RANGE)) { 1342 // Failed :(. It could be the last element? 1343 if (mdb_cursor_get(cur, &k, &v, MDB_LAST)) 1344 goto cleanup; 1345 } else { 1346 // if set range worked and our key exists, it should be 1347 // the one right before this one 1348 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 1349 goto cleanup; 1350 } 1351 1352 if (memcmp(k.mv_data, id, 32) == 0) { 1353 *val = v; 1354 success = 1; 1355 } 1356 1357 cleanup: 1358 mdb_cursor_close(cur); 1359 return success; 1360 } 1361 1362 static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key, 1363 enum ndb_dbs store, size_t *len) 1364 { 1365 MDB_val k, v; 1366 1367 k.mv_data = &key; 1368 k.mv_size = sizeof(key); 1369 1370 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 1371 ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n"); 1372 return NULL; 1373 } 1374 1375 if (len) 1376 *len = v.mv_size; 1377 1378 return v.mv_data; 1379 } 1380 1381 static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind, 1382 enum ndb_dbs store, const unsigned char *pk, 1383 size_t *len, uint64_t *primkey) 1384 { 1385 MDB_val k, v; 1386 void *res = NULL; 1387 if (len) 1388 *len = 0; 1389 1390 if (!ndb_get_tsid(txn, ind, pk, &k)) { 1391 //ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n"); 1392 return 0; 1393 } 1394 1395 if (primkey) 1396 *primkey = *(uint64_t*)k.mv_data; 1397 1398 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 1399 ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n"); 1400 return 0; 1401 } 1402 1403 res = v.mv_data; 1404 assert(((uint64_t)res % 4) == 0); 1405 if (len) 1406 *len = v.mv_size; 1407 return res; 1408 } 1409 1410 void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pk, size_t *len, uint64_t *key) 1411 { 1412 return ndb_lookup_tsid(txn, NDB_DB_PROFILE_PK, NDB_DB_PROFILE, pk, len, key); 1413 } 1414 1415 struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *key) 1416 { 1417 return ndb_lookup_tsid(txn, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len, key); 1418 } 1419 1420 static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn, 1421 enum ndb_dbs db, 1422 const unsigned char *id) 1423 { 1424 MDB_val k; 1425 1426 if (!ndb_get_tsid(txn, db, id, &k)) 1427 return 0; 1428 1429 return *(uint32_t*)k.mv_data; 1430 } 1431 1432 uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id) 1433 { 1434 return ndb_get_indexkey_by_id(txn, NDB_DB_NOTE_ID, id); 1435 } 1436 1437 uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id) 1438 { 1439 return ndb_get_indexkey_by_id(txn, NDB_DB_PROFILE_PK, id); 1440 } 1441 1442 struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 1443 { 1444 return ndb_lookup_by_key(txn, key, NDB_DB_NOTE, len); 1445 } 1446 1447 void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 1448 { 1449 return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len); 1450 } 1451 1452 uint64_t 1453 ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey) 1454 { 1455 MDB_val k, v; 1456 1457 k.mv_data = (unsigned char*)pubkey; 1458 k.mv_size = 32; 1459 1460 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) { 1461 //ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n"); 1462 return 0; 1463 } 1464 1465 return *((uint64_t*)v.mv_data); 1466 } 1467 1468 1469 static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id) 1470 { 1471 MDB_val val; 1472 1473 if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val)) 1474 return 0; 1475 1476 return 1; 1477 } 1478 1479 static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb, 1480 MDB_txn *mdb_txn) 1481 { 1482 txn->lmdb = lmdb; 1483 txn->mdb_txn = mdb_txn; 1484 } 1485 1486 static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid) 1487 { 1488 unsigned char id[32]; 1489 struct ndb_ingest_controller *c = data; 1490 struct ndb_txn txn; 1491 1492 hex_decode(hexid, 64, id, sizeof(id)); 1493 1494 // let's see if we already have it 1495 1496 ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn); 1497 if (!ndb_has_note(&txn, id)) 1498 return NDB_IDRES_CONT; 1499 1500 return NDB_IDRES_STOP; 1501 } 1502 1503 static int ndbprofile_parse_json(flatcc_builder_t *B, 1504 const char *buf, size_t bufsiz, int flags, NdbProfile_ref_t *profile) 1505 { 1506 flatcc_json_parser_t parser, *ctx = &parser; 1507 flatcc_json_parser_init(ctx, B, buf, buf + bufsiz, flags); 1508 1509 if (flatcc_builder_start_buffer(B, 0, 0, 0)) 1510 return 0; 1511 1512 NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile); 1513 if (ctx->error) 1514 return 0; 1515 1516 if (!flatcc_builder_end_buffer(B, *profile)) 1517 return 0; 1518 1519 ctx->end_loc = buf; 1520 1521 1522 return 1; 1523 } 1524 1525 void ndb_profile_record_builder_init(struct ndb_profile_record_builder *b) 1526 { 1527 b->builder = malloc(sizeof(*b->builder)); 1528 b->flatbuf = NULL; 1529 } 1530 1531 void ndb_profile_record_builder_free(struct ndb_profile_record_builder *b) 1532 { 1533 if (b->builder) 1534 free(b->builder); 1535 if (b->flatbuf) 1536 free(b->flatbuf); 1537 1538 b->builder = NULL; 1539 b->flatbuf = NULL; 1540 } 1541 1542 int ndb_process_profile_note(struct ndb_note *note, 1543 struct ndb_profile_record_builder *profile) 1544 { 1545 int res; 1546 1547 NdbProfile_ref_t profile_table; 1548 flatcc_builder_t *builder; 1549 1550 ndb_profile_record_builder_init(profile); 1551 builder = profile->builder; 1552 flatcc_builder_init(builder); 1553 1554 NdbProfileRecord_start_as_root(builder); 1555 1556 //printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note)); 1557 if (!(res = ndbprofile_parse_json(builder, ndb_note_content(note), 1558 note->content_length, 1559 flatcc_json_parser_f_skip_unknown, 1560 &profile_table))) 1561 { 1562 ndb_debug("profile_parse_json failed %d '%.*s'\n", res, 1563 note->content_length, ndb_note_content(note)); 1564 ndb_profile_record_builder_free(profile); 1565 return 0; 1566 } 1567 1568 uint64_t received_at = time(NULL); 1569 const char *lnurl = "fixme"; 1570 1571 NdbProfileRecord_profile_add(builder, profile_table); 1572 NdbProfileRecord_received_at_add(builder, received_at); 1573 1574 flatcc_builder_ref_t lnurl_off; 1575 lnurl_off = flatcc_builder_create_string_str(builder, lnurl); 1576 1577 NdbProfileRecord_lnurl_add(builder, lnurl_off); 1578 1579 //*profile = flatcc_builder_finalize_aligned_buffer(builder, profile_len); 1580 return 1; 1581 } 1582 1583 static int ndb_ingester_process_note(secp256k1_context *ctx, 1584 struct ndb_note *note, 1585 size_t note_size, 1586 struct ndb_writer_msg *out, 1587 struct ndb_ingester *ingester) 1588 { 1589 enum ndb_ingest_filter_action action; 1590 action = NDB_INGEST_ACCEPT; 1591 1592 if (ingester->filter) 1593 action = ingester->filter(ingester->filter_context, note); 1594 1595 if (action == NDB_INGEST_REJECT) 1596 return 0; 1597 1598 // some special situations we might want to skip sig validation, 1599 // like during large imports 1600 if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) { 1601 // if we're skipping validation we don't need to verify 1602 } else { 1603 // verify! If it's an invalid note we don't need to 1604 // bother writing it to the database 1605 if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) { 1606 ndb_debug("signature verification failed\n"); 1607 return 0; 1608 } 1609 } 1610 1611 // we didn't find anything. let's send it 1612 // to the writer thread 1613 note = realloc(note, note_size); 1614 assert(((uint64_t)note % 4) == 0); 1615 1616 if (note->kind == 0) { 1617 struct ndb_profile_record_builder *b = 1618 &out->profile.record; 1619 1620 ndb_process_profile_note(note, b); 1621 1622 out->type = NDB_WRITER_PROFILE; 1623 out->profile.note.note = note; 1624 out->profile.note.note_len = note_size; 1625 } else { 1626 out->type = NDB_WRITER_NOTE; 1627 out->note.note = note; 1628 out->note.note_len = note_size; 1629 } 1630 1631 return 1; 1632 } 1633 1634 1635 static int ndb_ingester_process_event(secp256k1_context *ctx, 1636 struct ndb_ingester *ingester, 1637 struct ndb_ingester_event *ev, 1638 struct ndb_writer_msg *out, 1639 MDB_txn *read_txn 1640 ) 1641 { 1642 struct ndb_tce tce; 1643 struct ndb_fce fce; 1644 struct ndb_note *note; 1645 struct ndb_ingest_controller controller; 1646 struct ndb_id_cb cb; 1647 void *buf; 1648 int ok; 1649 size_t bufsize, note_size; 1650 1651 ok = 0; 1652 1653 // we will use this to check if we already have it in the DB during 1654 // ID parsing 1655 controller.read_txn = read_txn; 1656 controller.lmdb = ingester->writer->lmdb; 1657 cb.fn = ndb_ingester_json_controller; 1658 cb.data = &controller; 1659 1660 // since we're going to be passing this allocated note to a different 1661 // thread, we can't use thread-local buffers. just allocate a block 1662 bufsize = max(ev->len * 8.0, 4096); 1663 buf = malloc(bufsize); 1664 if (!buf) { 1665 ndb_debug("couldn't malloc buf\n"); 1666 return 0; 1667 } 1668 1669 note_size = 1670 ev->client ? 1671 ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) : 1672 ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb); 1673 1674 if (note_size == -42) { 1675 // we already have this! 1676 //ndb_debug("already have id??\n"); 1677 goto cleanup; 1678 } else if (note_size == 0) { 1679 ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json); 1680 goto cleanup; 1681 } 1682 1683 //ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json); 1684 1685 if (ev->client) { 1686 switch (fce.evtype) { 1687 case NDB_FCE_EVENT: 1688 note = fce.event.note; 1689 if (note != buf) { 1690 ndb_debug("note buffer not equal to malloc'd buffer\n"); 1691 goto cleanup; 1692 } 1693 1694 if (!ndb_ingester_process_note(ctx, note, note_size, 1695 out, ingester)) { 1696 goto cleanup; 1697 } else { 1698 // we're done with the original json, free it 1699 free(ev->json); 1700 return 1; 1701 } 1702 } 1703 } else { 1704 switch (tce.evtype) { 1705 case NDB_TCE_NOTICE: goto cleanup; 1706 case NDB_TCE_EOSE: goto cleanup; 1707 case NDB_TCE_OK: goto cleanup; 1708 case NDB_TCE_EVENT: 1709 note = tce.event.note; 1710 if (note != buf) { 1711 ndb_debug("note buffer not equal to malloc'd buffer\n"); 1712 goto cleanup; 1713 } 1714 1715 if (!ndb_ingester_process_note(ctx, note, note_size, 1716 out, ingester)) { 1717 goto cleanup; 1718 } else { 1719 // we're done with the original json, free it 1720 free(ev->json); 1721 return 1; 1722 } 1723 } 1724 } 1725 1726 1727 cleanup: 1728 free(ev->json); 1729 free(buf); 1730 1731 return ok; 1732 } 1733 1734 static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db) 1735 { 1736 MDB_cursor *mc; 1737 MDB_val key, val; 1738 1739 if (mdb_cursor_open(txn, db, &mc)) 1740 return 0; 1741 1742 if (mdb_cursor_get(mc, &key, &val, MDB_LAST)) { 1743 mdb_cursor_close(mc); 1744 return 0; 1745 } 1746 1747 mdb_cursor_close(mc); 1748 1749 assert(key.mv_size == 8); 1750 return *((uint64_t*)key.mv_data); 1751 } 1752 1753 // 1754 // make a search key meant for user queries without any other note info 1755 static void ndb_make_search_key_low(struct ndb_search_key *key, const char *search) 1756 { 1757 memset(key->id, 0, sizeof(key->id)); 1758 key->timestamp = 0; 1759 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 1760 key->search[sizeof(key->search) - 1] = '\0'; 1761 } 1762 1763 int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query) 1764 { 1765 int rc; 1766 struct ndb_search_key s; 1767 MDB_val k, v; 1768 search->cursor = NULL; 1769 1770 MDB_cursor **cursor = (MDB_cursor **)&search->cursor; 1771 1772 ndb_make_search_key_low(&s, query); 1773 1774 k.mv_data = &s; 1775 k.mv_size = sizeof(s); 1776 1777 if ((rc = mdb_cursor_open(txn->mdb_txn, 1778 txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 1779 cursor))) { 1780 printf("search_profile: cursor opened failed: %s\n", 1781 mdb_strerror(rc)); 1782 return 0; 1783 } 1784 1785 // Position cursor at the next key greater than or equal to the specified key 1786 if (mdb_cursor_get(search->cursor, &k, &v, MDB_SET_RANGE)) { 1787 printf("search_profile: cursor get failed\n"); 1788 goto cleanup; 1789 } else { 1790 search->key = k.mv_data; 1791 assert(v.mv_size == 8); 1792 search->profile_key = *((uint64_t*)v.mv_data); 1793 return 1; 1794 } 1795 1796 cleanup: 1797 mdb_cursor_close(search->cursor); 1798 search->cursor = NULL; 1799 return 0; 1800 } 1801 1802 void ndb_search_profile_end(struct ndb_search *search) 1803 { 1804 if (search->cursor) 1805 mdb_cursor_close(search->cursor); 1806 } 1807 1808 int ndb_search_profile_next(struct ndb_search *search) 1809 { 1810 int rc; 1811 MDB_val k, v; 1812 unsigned char *init_id; 1813 1814 init_id = search->key->id; 1815 k.mv_data = search->key; 1816 k.mv_size = sizeof(*search->key); 1817 1818 retry: 1819 if ((rc = mdb_cursor_get(search->cursor, &k, &v, MDB_NEXT))) { 1820 ndb_debug("ndb_search_profile_next: %s\n", 1821 mdb_strerror(rc)); 1822 return 0; 1823 } else { 1824 search->key = k.mv_data; 1825 assert(v.mv_size == 8); 1826 search->profile_key = *((uint64_t*)v.mv_data); 1827 1828 // skip duplicate pubkeys 1829 if (!memcmp(init_id, search->key->id, 32)) 1830 goto retry; 1831 } 1832 1833 return 1; 1834 } 1835 1836 static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b) 1837 { 1838 int cmp; 1839 struct ndb_search_key *ska, *skb; 1840 1841 ska = a->mv_data; 1842 skb = b->mv_data; 1843 1844 MDB_val a2 = *a; 1845 MDB_val b2 = *b; 1846 1847 a2.mv_data = ska->search; 1848 a2.mv_size = sizeof(ska->search) + sizeof(ska->id); 1849 1850 cmp = mdb_cmp_memn(&a2, &b2); 1851 if (cmp) return cmp; 1852 1853 if (ska->timestamp < skb->timestamp) 1854 return -1; 1855 else if (ska->timestamp > skb->timestamp) 1856 return 1; 1857 1858 return 0; 1859 } 1860 1861 static int ndb_write_profile_pk_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key) 1862 1863 { 1864 MDB_val key, val; 1865 int rc; 1866 struct ndb_tsid tsid; 1867 MDB_dbi pk_db; 1868 1869 pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK]; 1870 1871 // write profile_pk + created_at index 1872 ndb_tsid_init(&tsid, note->pubkey, note->created_at); 1873 1874 key.mv_data = &tsid; 1875 key.mv_size = sizeof(tsid); 1876 val.mv_data = &profile_key; 1877 val.mv_size = sizeof(profile_key); 1878 1879 if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) { 1880 ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n", 1881 profile_key, mdb_strerror(rc)); 1882 return 0; 1883 } 1884 1885 return 1; 1886 } 1887 1888 static int ndb_write_profile(struct ndb_txn *txn, 1889 struct ndb_writer_profile *profile, 1890 uint64_t note_key) 1891 { 1892 uint64_t profile_key; 1893 struct ndb_note *note; 1894 void *flatbuf; 1895 size_t flatbuf_len; 1896 int rc; 1897 1898 MDB_val key, val; 1899 MDB_dbi profile_db; 1900 1901 note = profile->note.note; 1902 1903 // add note_key to profile record 1904 NdbProfileRecord_note_key_add(profile->record.builder, note_key); 1905 NdbProfileRecord_end_as_root(profile->record.builder); 1906 1907 flatbuf = profile->record.flatbuf = 1908 flatcc_builder_finalize_aligned_buffer(profile->record.builder, &flatbuf_len); 1909 1910 assert(((uint64_t)flatbuf % 8) == 0); 1911 1912 // TODO: this may not be safe!? 1913 flatbuf_len = (flatbuf_len + 7) & ~7; 1914 1915 //assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0); 1916 1917 // get dbs 1918 profile_db = txn->lmdb->dbs[NDB_DB_PROFILE]; 1919 1920 // get new key 1921 profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1; 1922 1923 // write profile to profile store 1924 key.mv_data = &profile_key; 1925 key.mv_size = sizeof(profile_key); 1926 val.mv_data = flatbuf; 1927 val.mv_size = flatbuf_len; 1928 1929 if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) { 1930 ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc)); 1931 return 0; 1932 } 1933 1934 // write last fetched record 1935 if (!ndb_maybe_write_last_profile_fetch(txn, note)) { 1936 ndb_debug("failed to write last profile fetched record\n"); 1937 } 1938 1939 // write profile pubkey index 1940 if (!ndb_write_profile_pk_index(txn, note, profile_key)) { 1941 ndb_debug("failed to write profile pubkey index\n"); 1942 return 0; 1943 } 1944 1945 // write name, display_name profile search indices 1946 if (!ndb_write_profile_search_indices(txn, note, profile_key, 1947 flatbuf)) { 1948 ndb_debug("failed to write profile search indices\n"); 1949 return 0; 1950 } 1951 1952 return 1; 1953 } 1954 1955 // find the last id tag in a note (e, p, etc) 1956 static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type) 1957 { 1958 unsigned char *last = NULL; 1959 struct ndb_iterator iter; 1960 struct ndb_str str; 1961 1962 // get the liked event id (last id) 1963 ndb_tags_iterate_start(note, &iter); 1964 1965 while (ndb_tags_iterate_next(&iter)) { 1966 if (iter.tag->count < 2) 1967 continue; 1968 1969 str = ndb_note_str(note, &iter.tag->strs[0]); 1970 1971 // assign liked to the last e tag 1972 if (str.flag == NDB_PACKED_STR && str.str[0] == type) { 1973 str = ndb_note_str(note, &iter.tag->strs[1]); 1974 if (str.flag == NDB_PACKED_ID) 1975 last = str.id; 1976 } 1977 } 1978 1979 return last; 1980 } 1981 1982 void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len) 1983 { 1984 MDB_val k, v; 1985 1986 k.mv_data = (unsigned char*)id; 1987 k.mv_size = 32; 1988 1989 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) { 1990 ndb_debug("ndb_get_note_meta: mdb_get note failed\n"); 1991 return NULL; 1992 } 1993 1994 if (len) 1995 *len = v.mv_size; 1996 1997 return v.mv_data; 1998 } 1999 2000 // When receiving a reaction note, look for the liked id and increase the 2001 // reaction counter in the note metadata database 2002 static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note) 2003 { 2004 size_t len; 2005 void *root; 2006 int reactions, rc; 2007 MDB_val key, val; 2008 NdbEventMeta_table_t meta; 2009 unsigned char *liked = ndb_note_last_id_tag(note, 'e'); 2010 2011 if (liked == NULL) 2012 return 0; 2013 2014 root = ndb_get_note_meta(txn, liked, &len); 2015 2016 flatcc_builder_t builder; 2017 flatcc_builder_init(&builder); 2018 NdbEventMeta_start_as_root(&builder); 2019 2020 // no meta record, let's make one 2021 if (root == NULL) { 2022 NdbEventMeta_reactions_add(&builder, 1); 2023 } else { 2024 // clone existing and add to it 2025 meta = NdbEventMeta_as_root(root); 2026 2027 reactions = NdbEventMeta_reactions_get(meta); 2028 NdbEventMeta_clone(&builder, meta); 2029 NdbEventMeta_reactions_add(&builder, reactions + 1); 2030 } 2031 2032 NdbProfileRecord_end_as_root(&builder); 2033 root = flatcc_builder_finalize_aligned_buffer(&builder, &len); 2034 assert(((uint64_t)root % 8) == 0); 2035 2036 if (root == NULL) { 2037 ndb_debug("failed to create note metadata record\n"); 2038 return 0; 2039 } 2040 2041 // metadata is keyed on id because we want to collect stats regardless 2042 // if we have the note yet or not 2043 key.mv_data = liked; 2044 key.mv_size = 32; 2045 2046 val.mv_data = root; 2047 val.mv_size = len; 2048 2049 // write the new meta record 2050 //ndb_debug("writing stats record for "); 2051 //print_hex(liked, 32); 2052 //ndb_debug("\n"); 2053 2054 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) { 2055 ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc)); 2056 return 0; 2057 } 2058 2059 free(root); 2060 2061 return 1; 2062 } 2063 2064 2065 static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note, 2066 uint64_t note_key) 2067 2068 { 2069 struct ndb_tsid tsid; 2070 int rc; 2071 MDB_val key, val; 2072 MDB_dbi id_db; 2073 2074 ndb_tsid_init(&tsid, note->id, note->created_at); 2075 2076 key.mv_data = &tsid; 2077 key.mv_size = sizeof(tsid); 2078 val.mv_data = ¬e_key; 2079 val.mv_size = sizeof(note_key); 2080 2081 id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 2082 2083 if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) { 2084 ndb_debug("write note id index to db failed: %s\n", 2085 mdb_strerror(rc)); 2086 return 0; 2087 } 2088 2089 return 1; 2090 } 2091 2092 static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note, 2093 uint64_t note_key) 2094 { 2095 struct ndb_u64_tsid tsid; 2096 int rc; 2097 MDB_val key, val; 2098 MDB_dbi kind_db; 2099 2100 ndb_u64_tsid_init(&tsid, note->kind, note->created_at); 2101 2102 key.mv_data = &tsid; 2103 key.mv_size = sizeof(tsid); 2104 val.mv_data = ¬e_key; 2105 val.mv_size = sizeof(note_key); 2106 2107 kind_db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 2108 2109 if ((rc = mdb_put(txn->mdb_txn, kind_db, &key, &val, 0))) { 2110 ndb_debug("write note kind index to db failed: %s\n", 2111 mdb_strerror(rc)); 2112 return 0; 2113 } 2114 2115 return 1; 2116 } 2117 2118 static void consume_whitespace_or_punctuation(struct cursor *cur) 2119 { 2120 while (cur->p < cur->end) { 2121 if (!is_right_boundary(*cur->p)) 2122 return; 2123 cur->p++; 2124 } 2125 } 2126 2127 static int ndb_write_word_to_index(struct ndb_txn *txn, const char *word, 2128 int word_len, int word_index, 2129 uint64_t timestamp, uint64_t note_id) 2130 { 2131 // cap to some reasonable key size 2132 unsigned char buffer[1024]; 2133 int keysize, rc; 2134 MDB_val k, v; 2135 MDB_dbi text_db; 2136 2137 // build our compressed text index key 2138 if (!ndb_make_text_search_key(buffer, sizeof(buffer), word_index, 2139 word_len, word, timestamp, note_id, 2140 &keysize)) { 2141 // probably too big 2142 2143 return 0; 2144 } 2145 2146 k.mv_data = buffer; 2147 k.mv_size = keysize; 2148 2149 v.mv_data = NULL; 2150 v.mv_size = 0; 2151 2152 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 2153 2154 if ((rc = mdb_put(txn->mdb_txn, text_db, &k, &v, 0))) { 2155 ndb_debug("write note text index to db failed: %s\n", 2156 mdb_strerror(rc)); 2157 return 0; 2158 } 2159 2160 return 1; 2161 } 2162 2163 2164 2165 // break a string into individual words for querying or for building the 2166 // fulltext search index. This is callback based so we don't need to 2167 // build up an intermediate structure 2168 static int ndb_parse_words(struct cursor *cur, void *ctx, ndb_word_parser_fn fn) 2169 { 2170 int word_len, words; 2171 const char *word; 2172 2173 words = 0; 2174 2175 while (cur->p < cur->end) { 2176 consume_whitespace_or_punctuation(cur); 2177 if (cur->p >= cur->end) 2178 break; 2179 word = (const char *)cur->p; 2180 2181 if (!consume_until_boundary(cur)) 2182 break; 2183 2184 // start of word or end 2185 word_len = cur->p - (unsigned char *)word; 2186 if (word_len == 0 && cur->p >= cur->end) 2187 break; 2188 2189 if (word_len == 0) { 2190 if (!cursor_skip(cur, 1)) 2191 break; 2192 continue; 2193 } 2194 2195 //ndb_debug("writing word index '%.*s'\n", word_len, word); 2196 2197 if (!fn(ctx, word, word_len, words)) 2198 continue; 2199 2200 words++; 2201 } 2202 2203 return 1; 2204 } 2205 2206 struct ndb_word_writer_ctx 2207 { 2208 struct ndb_txn *txn; 2209 struct ndb_note *note; 2210 uint64_t note_id; 2211 }; 2212 2213 static int ndb_fulltext_word_writer(void *ctx, 2214 const char *word, int word_len, int words) 2215 { 2216 struct ndb_word_writer_ctx *wctx = ctx; 2217 2218 if (!ndb_write_word_to_index(wctx->txn, word, word_len, words, 2219 wctx->note->created_at, wctx->note_id)) { 2220 // too big to write this one, just skip it 2221 ndb_debug("failed to write word '%.*s' to index\n", word_len, word); 2222 2223 return 0; 2224 } 2225 2226 //fprintf(stderr, "wrote '%.*s' to note text index\n", word_len, word); 2227 return 1; 2228 } 2229 2230 static int ndb_write_note_fulltext_index(struct ndb_txn *txn, 2231 struct ndb_note *note, 2232 uint64_t note_id) 2233 { 2234 struct cursor cur; 2235 unsigned char *content; 2236 struct ndb_str str; 2237 struct ndb_word_writer_ctx ctx; 2238 2239 str = ndb_note_str(note, ¬e->content); 2240 // I don't think this should happen? 2241 if (unlikely(str.flag == NDB_PACKED_ID)) 2242 return 0; 2243 2244 content = (unsigned char *)str.str; 2245 2246 make_cursor(content, content + note->content_length, &cur); 2247 2248 ctx.txn = txn; 2249 ctx.note = note; 2250 ctx.note_id = note_id; 2251 2252 ndb_parse_words(&cur, &ctx, ndb_fulltext_word_writer); 2253 2254 return 1; 2255 } 2256 2257 static int ndb_parse_search_words(void *ctx, const char *word_str, int word_len, int word_index) 2258 { 2259 struct ndb_search_words *words = ctx; 2260 struct ndb_word *word; 2261 2262 if (words->num_words + 1 > MAX_TEXT_SEARCH_WORDS) 2263 return 0; 2264 2265 word = &words->words[words->num_words++]; 2266 word->word = word_str; 2267 word->word_len = word_len; 2268 2269 return 1; 2270 } 2271 2272 static void ndb_search_words_init(struct ndb_search_words *words) 2273 { 2274 words->num_words = 0; 2275 } 2276 2277 static int prefix_count(const char *str1, int len1, const char *str2, int len2) { 2278 int i, count = 0; 2279 int min_len = len1 < len2 ? len1 : len2; 2280 2281 for (i = 0; i < min_len; i++) { 2282 // case insensitive 2283 if (tolower(str1[i]) == tolower(str2[i])) 2284 count++; 2285 else 2286 break; 2287 } 2288 2289 return count; 2290 } 2291 2292 static void ndb_print_text_search_key(struct ndb_text_search_key *key) 2293 { 2294 printf("K<'%.*s' %d %" PRIu64 " note_id:%" PRIu64 ">", key->str_len, key->str, 2295 key->word_index, 2296 key->timestamp, 2297 key->note_id); 2298 } 2299 2300 static int ndb_prefix_matches(struct ndb_text_search_result *result, 2301 struct ndb_word *search_word) 2302 { 2303 // Empty strings shouldn't happen but let's 2304 if (result->key.str_len < 2 || search_word->word_len < 2) 2305 return 0; 2306 2307 // make sure we at least have two matching prefix characters. exact 2308 // matches are nice but range searches allow us to match prefixes as 2309 // well. A double-char prefix is suffient, but maybe we could up this 2310 // in the future. 2311 // 2312 // TODO: How are we handling utf-8 prefix matches like 2313 // japanese? 2314 // 2315 if ( result->key.str[0] != tolower(search_word->word[0]) 2316 && result->key.str[1] != tolower(search_word->word[1]) 2317 ) 2318 return 0; 2319 2320 // count the number of prefix-matched characters. This will be used 2321 // for ranking search results 2322 result->prefix_chars = prefix_count(result->key.str, 2323 result->key.str_len, 2324 search_word->word, 2325 search_word->word_len); 2326 2327 if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5)) 2328 return 0; 2329 2330 return 1; 2331 } 2332 2333 // This is called when scanning the full text search index. Scanning stops 2334 // when we no longer have a prefix match for the word 2335 static int ndb_text_search_next_word(MDB_cursor *cursor, MDB_cursor_op op, 2336 MDB_val *k, struct ndb_word *search_word, 2337 struct ndb_text_search_result *last_result, 2338 struct ndb_text_search_result *result, 2339 MDB_cursor_op order_op) 2340 { 2341 struct cursor key_cursor; 2342 //struct ndb_text_search_key search_key; 2343 MDB_val v; 2344 int retries; 2345 retries = -1; 2346 2347 make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor); 2348 2349 // When op is MDB_SET_RANGE, this initializes the search. Position 2350 // the cursor at the next key greater than or equal to the specified 2351 // key. 2352 // 2353 // Subsequent searches should use MDB_NEXT 2354 if (mdb_cursor_get(cursor, k, &v, op)) { 2355 // we should only do this if we're going in reverse 2356 if (op == MDB_SET_RANGE && order_op == MDB_PREV) { 2357 // if set range worked and our key exists, it should be 2358 // the one right before this one 2359 if (mdb_cursor_get(cursor, k, &v, MDB_PREV)) 2360 return 0; 2361 } else { 2362 return 0; 2363 } 2364 } 2365 2366 retry: 2367 retries++; 2368 /* 2369 printf("continuing from "); 2370 if (ndb_unpack_text_search_key(k->mv_data, k->mv_size, &search_key)) { 2371 ndb_print_text_search_key(&search_key); 2372 } else { printf("??"); } 2373 printf("\n"); 2374 */ 2375 2376 make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor); 2377 2378 if (unlikely(!ndb_unpack_text_search_key_noteid(&key_cursor, &result->key.note_id))) { 2379 fprintf(stderr, "UNUSUAL: failed to unpack text search key note_id\n"); 2380 return 0; 2381 } 2382 2383 if (last_result) { 2384 if (last_result->key.note_id != result->key.note_id) 2385 return 0; 2386 } 2387 2388 // On success, this could still be not related at all. 2389 // It could just be adjacent to the word. Let's check 2390 // if we have a matching prefix at least. 2391 2392 // Before we unpack the entire key, let's quickly 2393 // unpack just the string to check the prefix. We don't 2394 // need to unpack the entire key if the prefix doesn't 2395 // match 2396 if (!ndb_unpack_text_search_key_string(&key_cursor, 2397 &result->key.str, 2398 &result->key.str_len)) { 2399 // this should never happen 2400 fprintf(stderr, "UNUSUAL: failed to unpack text search key string\n"); 2401 return 0; 2402 } 2403 2404 if (!ndb_prefix_matches(result, search_word)) { 2405 /* 2406 printf("result prefix '%.*s' didn't match search word '%.*s'\n", 2407 result->key.str_len, result->key.str, 2408 search_word->word_len, search_word->word); 2409 */ 2410 // we should only do this if we're going in reverse 2411 if (retries == 0 && op == MDB_SET_RANGE && order_op == MDB_PREV) { 2412 // if set range worked and our key exists, it should be 2413 // the one right before this one 2414 mdb_cursor_get(cursor, k, &v, MDB_PREV); 2415 goto retry; 2416 } else { 2417 return 0; 2418 } 2419 } 2420 2421 // Unpack the remaining text search key, we will need this information 2422 // when building up our search results. 2423 if (!ndb_unpack_remaining_text_search_key(&key_cursor, &result->key)) { 2424 // This should never happen 2425 fprintf(stderr, "UNUSUAL: failed to unpack text search key\n"); 2426 return 0; 2427 } 2428 2429 if (last_result) { 2430 if (result->key.word_index < last_result->key.word_index) { 2431 /* 2432 fprintf(stderr, "skipping '%.*s' because it is before last result '%.*s'\n", 2433 result->key.str_len, result->key.str, 2434 last_result->key.str_len, last_result->key.str); 2435 */ 2436 return 0; 2437 } 2438 } 2439 2440 return 1; 2441 } 2442 2443 static void ndb_text_search_results_init( 2444 struct ndb_text_search_results *results) { 2445 results->num_results = 0; 2446 } 2447 2448 void ndb_default_text_search_config(struct ndb_text_search_config *cfg) 2449 { 2450 cfg->order = NDB_ORDER_DESCENDING; 2451 cfg->limit = MAX_TEXT_SEARCH_RESULTS; 2452 } 2453 2454 void ndb_text_search_config_set_order(struct ndb_text_search_config *cfg, 2455 enum ndb_search_order order) 2456 { 2457 cfg->order = order; 2458 } 2459 2460 void ndb_text_search_config_set_limit(struct ndb_text_search_config *cfg, int limit) 2461 { 2462 cfg->limit = limit; 2463 } 2464 2465 int ndb_text_search(struct ndb_txn *txn, const char *query, 2466 struct ndb_text_search_results *results, 2467 struct ndb_text_search_config *config) 2468 { 2469 unsigned char buffer[1024], *buf; 2470 unsigned char saved_buf[1024], *saved; 2471 struct ndb_text_search_result *result, *last_result; 2472 struct ndb_text_search_result candidate, last_candidate; 2473 struct ndb_search_words search_words; 2474 //struct ndb_text_search_key search_key; 2475 struct ndb_word *search_word; 2476 struct cursor cur; 2477 ndb_text_search_key_order_fn key_order_fn; 2478 MDB_dbi text_db; 2479 MDB_cursor *cursor; 2480 MDB_val k, v; 2481 int i, j, keysize, saved_size, limit; 2482 MDB_cursor_op op, order_op; 2483 //int num_note_ids; 2484 2485 saved = NULL; 2486 ndb_text_search_results_init(results); 2487 ndb_search_words_init(&search_words); 2488 2489 // search config 2490 limit = MAX_TEXT_SEARCH_RESULTS; 2491 order_op = MDB_PREV; 2492 key_order_fn = ndb_make_text_search_key_high; 2493 if (config) { 2494 if (config->order == NDB_ORDER_ASCENDING) { 2495 order_op = MDB_NEXT; 2496 key_order_fn = ndb_make_text_search_key_low; 2497 } 2498 limit = min(limit, config->limit); 2499 } 2500 // end search config 2501 2502 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 2503 make_cursor((unsigned char *)query, (unsigned char *)query + strlen(query), &cur); 2504 2505 ndb_parse_words(&cur, &search_words, ndb_parse_search_words); 2506 if (search_words.num_words == 0) 2507 return 0; 2508 2509 if ((i = mdb_cursor_open(txn->mdb_txn, text_db, &cursor))) { 2510 fprintf(stderr, "nd_text_search: mdb_cursor_open failed, error %d\n", i); 2511 return 0; 2512 } 2513 2514 // for each word, we recursively find all of the submatches 2515 while (results->num_results < limit) { 2516 last_result = NULL; 2517 result = &results->results[results->num_results]; 2518 2519 // if we have saved, then we continue from the last root search 2520 // sequence 2521 if (saved) { 2522 buf = saved_buf; 2523 saved = NULL; 2524 keysize = saved_size; 2525 2526 k.mv_data = buf; 2527 k.mv_size = saved_size; 2528 2529 // reposition the cursor so we can continue 2530 if (mdb_cursor_get(cursor, &k, &v, MDB_SET_RANGE)) 2531 break; 2532 2533 op = order_op; 2534 } else { 2535 // construct a packed fulltext search key using this 2536 // word this key doesn't contain any timestamp or index 2537 // info, so it should range match instead of exact 2538 // match 2539 if (!key_order_fn(buffer, sizeof(buffer), 2540 search_words.words[0].word_len, 2541 search_words.words[0].word, &keysize)) 2542 { 2543 // word is too big to fit in 1024-sized key 2544 continue; 2545 } 2546 2547 buf = buffer; 2548 op = MDB_SET_RANGE; 2549 } 2550 2551 for (j = 0; j < search_words.num_words; j++) { 2552 search_word = &search_words.words[j]; 2553 2554 // shouldn't happen but let's be defensive a bit 2555 if (search_word->word_len == 0) 2556 continue; 2557 2558 // if we already matched a note in this phrase, make 2559 // sure we're including the note id in the query 2560 if (last_result) { 2561 // we are narrowing down a search. 2562 // if we already have this note id, just continue 2563 for (i = 0; i < results->num_results; i++) { 2564 if (results->results[i].key.note_id == last_result->key.note_id) 2565 goto cont; 2566 } 2567 2568 if (!ndb_make_noted_text_search_key( 2569 buffer, sizeof(buffer), 2570 search_word->word_len, 2571 search_word->word, 2572 last_result->key.timestamp, 2573 last_result->key.note_id, 2574 &keysize)) 2575 { 2576 continue; 2577 } 2578 2579 buf = buffer; 2580 } 2581 2582 k.mv_data = buf; 2583 k.mv_size = keysize; 2584 2585 if (!ndb_text_search_next_word(cursor, op, &k, 2586 search_word, 2587 last_result, 2588 &candidate, 2589 order_op)) { 2590 break; 2591 } 2592 2593 *result = candidate; 2594 op = MDB_SET_RANGE; 2595 2596 // save the first key match, since we will continue from 2597 // this on the next root word result 2598 if (j == 0 && !saved) { 2599 memcpy(saved_buf, k.mv_data, k.mv_size); 2600 saved = saved_buf; 2601 saved_size = k.mv_size; 2602 } 2603 2604 last_candidate = *result; 2605 last_result = &last_candidate; 2606 } 2607 2608 cont: 2609 // we matched all of the queries! 2610 if (j == search_words.num_words) { 2611 results->num_results++; 2612 } else if (j == 0) { 2613 break; 2614 } 2615 2616 } 2617 2618 mdb_cursor_close(cursor); 2619 2620 return 1; 2621 } 2622 2623 static uint64_t ndb_write_note(struct ndb_txn *txn, 2624 struct ndb_writer_note *note) 2625 { 2626 int rc; 2627 uint64_t note_key; 2628 MDB_dbi note_db; 2629 MDB_val key, val; 2630 2631 // let's quickly sanity check if we already have this note 2632 if (ndb_get_notekey_by_id(txn, note->note->id)) 2633 return 0; 2634 2635 // get dbs 2636 note_db = txn->lmdb->dbs[NDB_DB_NOTE]; 2637 2638 // get new key 2639 note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1; 2640 2641 // write note to event store 2642 key.mv_data = ¬e_key; 2643 key.mv_size = sizeof(note_key); 2644 val.mv_data = note->note; 2645 val.mv_size = note->note_len; 2646 2647 if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) { 2648 ndb_debug("write note to db failed: %s\n", mdb_strerror(rc)); 2649 return 0; 2650 } 2651 2652 // write id index key clustered with created_at 2653 if (!ndb_write_note_id_index(txn, note->note, note_key)) 2654 return 0; 2655 2656 // write note kind index 2657 if (!ndb_write_note_kind_index(txn, note->note, note_key)) 2658 return 0; 2659 2660 // only do fulltext index on text and longform notes 2661 if (note->note->kind == 1 || note->note->kind == 30023) { 2662 if (!ndb_write_note_fulltext_index(txn, note->note, note_key)) 2663 return 0; 2664 } 2665 2666 if (note->note->kind == 7) { 2667 ndb_write_reaction_stats(txn, note->note); 2668 } 2669 2670 return note_key; 2671 } 2672 2673 // only to be called from the writer thread 2674 static void ndb_write_version(struct ndb_txn *txn, uint64_t version) 2675 { 2676 int rc; 2677 MDB_val key, val; 2678 uint64_t version_key; 2679 2680 version_key = NDB_META_KEY_VERSION; 2681 2682 key.mv_data = &version_key; 2683 key.mv_size = sizeof(version_key); 2684 val.mv_data = &version; 2685 val.mv_size = sizeof(version); 2686 2687 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { 2688 ndb_debug("write version to ndb_meta failed: %s\n", 2689 mdb_strerror(rc)); 2690 return; 2691 } 2692 2693 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 2694 } 2695 2696 static void *ndb_writer_thread(void *data) 2697 { 2698 struct ndb_writer *writer = data; 2699 struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; 2700 int i, popped, done, any_note; 2701 uint64_t note_nkey; 2702 MDB_txn *mdb_txn = NULL; 2703 struct ndb_txn txn; 2704 ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn); 2705 2706 done = 0; 2707 while (!done) { 2708 txn.mdb_txn = NULL; 2709 popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); 2710 //ndb_debug("writer popped %d items\n", popped); 2711 2712 any_note = 0; 2713 for (i = 0 ; i < popped; i++) { 2714 msg = &msgs[i]; 2715 switch (msg->type) { 2716 case NDB_WRITER_NOTE: any_note = 1; break; 2717 case NDB_WRITER_PROFILE: any_note = 1; break; 2718 case NDB_WRITER_DBMETA: any_note = 1; break; 2719 case NDB_WRITER_PROFILE_LAST_FETCH: any_note = 1; break; 2720 case NDB_WRITER_QUIT: break; 2721 } 2722 } 2723 2724 if (any_note && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn)) 2725 { 2726 fprintf(stderr, "writer thread txn_begin failed"); 2727 // should definitely not happen unless DB is full 2728 // or something ? 2729 assert(false); 2730 } 2731 2732 for (i = 0; i < popped; i++) { 2733 msg = &msgs[i]; 2734 2735 switch (msg->type) { 2736 case NDB_WRITER_QUIT: 2737 // quits are handled before this 2738 done = 1; 2739 continue; 2740 case NDB_WRITER_PROFILE: 2741 note_nkey = 2742 ndb_write_note(&txn, &msg->note); 2743 if (msg->profile.record.builder) { 2744 // only write if parsing didn't fail 2745 ndb_write_profile(&txn, &msg->profile, 2746 note_nkey); 2747 } 2748 break; 2749 case NDB_WRITER_NOTE: 2750 ndb_write_note(&txn, &msg->note); 2751 //printf("wrote note "); 2752 //print_hex(msg->note.note->id, 32); 2753 //printf("\n"); 2754 break; 2755 case NDB_WRITER_DBMETA: 2756 ndb_write_version(&txn, msg->ndb_meta.version); 2757 break; 2758 case NDB_WRITER_PROFILE_LAST_FETCH: 2759 ndb_writer_last_profile_fetch(&txn, 2760 msg->last_fetch.pubkey, 2761 msg->last_fetch.fetched_at 2762 ); 2763 break; 2764 } 2765 } 2766 2767 // commit writes 2768 if (any_note && !ndb_end_query(&txn)) { 2769 fprintf(stderr, "writer thread txn commit failed"); 2770 assert(false); 2771 } 2772 2773 // free notes 2774 for (i = 0; i < popped; i++) { 2775 msg = &msgs[i]; 2776 if (msg->type == NDB_WRITER_NOTE) 2777 free(msg->note.note); 2778 else if (msg->type == NDB_WRITER_PROFILE) { 2779 free(msg->profile.note.note); 2780 ndb_profile_record_builder_free(&msg->profile.record); 2781 } 2782 } 2783 } 2784 2785 ndb_debug("quitting writer thread\n"); 2786 return NULL; 2787 } 2788 2789 static void *ndb_ingester_thread(void *data) 2790 { 2791 secp256k1_context *ctx; 2792 struct thread *thread = data; 2793 struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx; 2794 struct ndb_lmdb *lmdb = ingester->writer->lmdb; 2795 struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg; 2796 struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out; 2797 int i, to_write, popped, done, any_event; 2798 MDB_txn *read_txn = NULL; 2799 int rc; 2800 2801 ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); 2802 ndb_debug("started ingester thread\n"); 2803 2804 done = 0; 2805 while (!done) { 2806 to_write = 0; 2807 any_event = 0; 2808 2809 popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH); 2810 //ndb_debug("ingester popped %d items\n", popped); 2811 2812 for (i = 0; i < popped; i++) { 2813 msg = &msgs[i]; 2814 if (msg->type == NDB_INGEST_EVENT) { 2815 any_event = 1; 2816 break; 2817 } 2818 } 2819 2820 if (any_event && (rc = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn))) { 2821 // this is bad 2822 fprintf(stderr, "UNUSUAL ndb_ingester: mdb_txn_begin failed: '%s'\n", 2823 mdb_strerror(rc)); 2824 continue; 2825 } 2826 2827 for (i = 0; i < popped; i++) { 2828 msg = &msgs[i]; 2829 switch (msg->type) { 2830 case NDB_INGEST_QUIT: 2831 done = 1; 2832 break; 2833 2834 case NDB_INGEST_EVENT: 2835 out = &outs[to_write]; 2836 if (ndb_ingester_process_event(ctx, ingester, 2837 &msg->event, out, 2838 read_txn)) { 2839 to_write++; 2840 } 2841 } 2842 } 2843 2844 if (any_event) 2845 mdb_txn_abort(read_txn); 2846 2847 if (to_write > 0) { 2848 //ndb_debug("pushing %d events to write queue\n", to_write); 2849 if (!ndb_writer_queue_msgs(ingester->writer, outs, to_write)) { 2850 ndb_debug("failed pushing %d events to write queue\n", to_write); 2851 } 2852 } 2853 } 2854 2855 ndb_debug("quitting ingester thread\n"); 2856 secp256k1_context_destroy(ctx); 2857 return NULL; 2858 } 2859 2860 2861 static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb) 2862 { 2863 writer->lmdb = lmdb; 2864 writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE; 2865 writer->queue_buf = malloc(writer->queue_buflen); 2866 if (writer->queue_buf == NULL) { 2867 fprintf(stderr, "ndb: failed to allocate space for writer queue"); 2868 return 0; 2869 } 2870 2871 // init the writer queue. 2872 prot_queue_init(&writer->inbox, writer->queue_buf, 2873 writer->queue_buflen, sizeof(struct ndb_writer_msg)); 2874 2875 // spin up the writer thread 2876 if (pthread_create(&writer->thread_id, NULL, ndb_writer_thread, writer)) 2877 { 2878 fprintf(stderr, "ndb writer thread failed to create\n"); 2879 return 0; 2880 } 2881 2882 return 1; 2883 } 2884 2885 // initialize the ingester queue and then spawn the thread 2886 static int ndb_ingester_init(struct ndb_ingester *ingester, 2887 struct ndb_writer *writer, 2888 struct ndb_config *config) 2889 { 2890 int elem_size, num_elems; 2891 static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT }; 2892 2893 // TODO: configurable queue sizes 2894 elem_size = sizeof(struct ndb_ingester_msg); 2895 num_elems = DEFAULT_QUEUE_SIZE; 2896 2897 ingester->writer = writer; 2898 ingester->flags = config->flags; 2899 ingester->filter = config->ingest_filter; 2900 ingester->filter_context = config->filter_context; 2901 2902 if (!threadpool_init(&ingester->tp, config->ingester_threads, 2903 elem_size, num_elems, &quit_msg, ingester, 2904 ndb_ingester_thread)) 2905 { 2906 fprintf(stderr, "ndb ingester threadpool failed to init\n"); 2907 return 0; 2908 } 2909 2910 return 1; 2911 } 2912 2913 static int ndb_writer_destroy(struct ndb_writer *writer) 2914 { 2915 struct ndb_writer_msg msg; 2916 2917 // kill thread 2918 msg.type = NDB_WRITER_QUIT; 2919 if (!prot_queue_push(&writer->inbox, &msg)) { 2920 // queue is too full to push quit message. just kill it. 2921 pthread_exit(&writer->thread_id); 2922 } else { 2923 pthread_join(writer->thread_id, NULL); 2924 } 2925 2926 // cleanup 2927 prot_queue_destroy(&writer->inbox); 2928 2929 free(writer->queue_buf); 2930 2931 return 1; 2932 } 2933 2934 static int ndb_ingester_destroy(struct ndb_ingester *ingester) 2935 { 2936 threadpool_destroy(&ingester->tp); 2937 return 1; 2938 } 2939 2940 static int ndb_ingester_queue_event(struct ndb_ingester *ingester, 2941 char *json, unsigned len, unsigned client) 2942 { 2943 struct ndb_ingester_msg msg; 2944 msg.type = NDB_INGEST_EVENT; 2945 2946 msg.event.json = json; 2947 msg.event.len = len; 2948 msg.event.client = client; 2949 2950 return threadpool_dispatch(&ingester->tp, &msg); 2951 } 2952 2953 static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize) 2954 { 2955 int rc; 2956 MDB_txn *txn; 2957 2958 if ((rc = mdb_env_create(&lmdb->env))) { 2959 fprintf(stderr, "mdb_env_create failed, error %d\n", rc); 2960 return 0; 2961 } 2962 2963 if ((rc = mdb_env_set_mapsize(lmdb->env, mapsize))) { 2964 fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc); 2965 return 0; 2966 } 2967 2968 if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) { 2969 fprintf(stderr, "mdb_env_set_maxdbs failed, error %d\n", rc); 2970 return 0; 2971 } 2972 2973 if ((rc = mdb_env_open(lmdb->env, filename, 0, 0664))) { 2974 fprintf(stderr, "mdb_env_open failed, error %d\n", rc); 2975 return 0; 2976 } 2977 2978 // Initialize DBs 2979 if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) { 2980 fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc); 2981 return 0; 2982 } 2983 2984 // note flatbuffer db 2985 if ((rc = mdb_dbi_open(txn, "note", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NOTE]))) { 2986 fprintf(stderr, "mdb_dbi_open event failed, error %d\n", rc); 2987 return 0; 2988 } 2989 2990 // note metadata db 2991 if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) { 2992 fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc); 2993 return 0; 2994 } 2995 2996 // profile flatbuffer db 2997 if ((rc = mdb_dbi_open(txn, "profile", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_PROFILE]))) { 2998 fprintf(stderr, "mdb_dbi_open profile failed, error %d\n", rc); 2999 return 0; 3000 } 3001 3002 // profile search db 3003 if ((rc = mdb_dbi_open(txn, "profile_search", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_SEARCH]))) { 3004 fprintf(stderr, "mdb_dbi_open profile_search failed, error %d\n", rc); 3005 return 0; 3006 } 3007 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], ndb_search_key_cmp); 3008 3009 // ndb metadata (db version, etc) 3010 if ((rc = mdb_dbi_open(txn, "ndb_meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NDB_META]))) { 3011 fprintf(stderr, "mdb_dbi_open ndb_meta failed, error %d\n", rc); 3012 return 0; 3013 } 3014 3015 // profile last fetches 3016 if ((rc = mdb_dbi_open(txn, "profile_last_fetch", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH]))) { 3017 fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); 3018 return 0; 3019 } 3020 3021 // id+ts index flags 3022 unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; 3023 3024 // index dbs 3025 if ((rc = mdb_dbi_open(txn, "note_id", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_ID]))) { 3026 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 3027 return 0; 3028 } 3029 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare); 3030 3031 if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) { 3032 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 3033 return 0; 3034 } 3035 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare); 3036 3037 if ((rc = mdb_dbi_open(txn, "note_kind", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_KIND]))) { 3038 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 3039 return 0; 3040 } 3041 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_tsid_compare); 3042 3043 if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT, 3044 &lmdb->dbs[NDB_DB_NOTE_TEXT]))) { 3045 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 3046 return 0; 3047 } 3048 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare); 3049 3050 // Commit the transaction 3051 if ((rc = mdb_txn_commit(txn))) { 3052 fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc); 3053 return 0; 3054 } 3055 3056 return 1; 3057 } 3058 3059 static int ndb_queue_write_version(struct ndb *ndb, uint64_t version) 3060 { 3061 struct ndb_writer_msg msg; 3062 msg.type = NDB_WRITER_DBMETA; 3063 msg.ndb_meta.version = version; 3064 return ndb_writer_queue_msg(&ndb->writer, &msg); 3065 } 3066 3067 static int ndb_run_migrations(struct ndb *ndb) 3068 { 3069 uint64_t version, latest_version, i; 3070 3071 latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); 3072 3073 if ((version = ndb_db_version(ndb)) == -1) { 3074 ndb_debug("run_migrations: no version found, assuming new db\n"); 3075 version = latest_version; 3076 3077 // no version found. fresh db? 3078 if (!ndb_queue_write_version(ndb, version)) { 3079 fprintf(stderr, "run_migrations: failed writing db version"); 3080 return 0; 3081 } 3082 3083 return 1; 3084 } else { 3085 ndb_debug("ndb: version %" PRIu64 " found\n", version); 3086 } 3087 3088 if (version < latest_version) 3089 ndb_debug("nostrdb: migrating v%d -> v%d\n", 3090 (int)version, (int)latest_version); 3091 3092 for (i = version; i < latest_version; i++) { 3093 if (!MIGRATIONS[i].fn(ndb)) { 3094 fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); 3095 return 0; 3096 } 3097 3098 if (!ndb_queue_write_version(ndb, i+1)) { 3099 fprintf(stderr, "run_migrations: failed writing db version"); 3100 return 0; 3101 } 3102 3103 version = i+1; 3104 } 3105 3106 ndb->version = version; 3107 3108 return 1; 3109 } 3110 3111 int ndb_init(struct ndb **pndb, const char *filename, struct ndb_config *config) 3112 { 3113 struct ndb *ndb; 3114 //MDB_dbi ind_id; // TODO: ind_pk, etc 3115 3116 ndb = *pndb = calloc(1, sizeof(struct ndb)); 3117 ndb->flags = config->flags; 3118 3119 if (ndb == NULL) { 3120 fprintf(stderr, "ndb_init: malloc failed\n"); 3121 return 0; 3122 } 3123 3124 if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize)) 3125 return 0; 3126 3127 if (!ndb_writer_init(&ndb->writer, &ndb->lmdb)) { 3128 fprintf(stderr, "ndb_writer_init failed\n"); 3129 return 0; 3130 } 3131 3132 if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) { 3133 fprintf(stderr, "failed to initialize %d ingester thread(s)\n", 3134 config->ingester_threads); 3135 return 0; 3136 } 3137 3138 if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE) && 3139 !ndb_run_migrations(ndb)) { 3140 fprintf(stderr, "failed to run migrations\n"); 3141 return 0; 3142 } 3143 3144 // Initialize LMDB environment and spin up threads 3145 return 1; 3146 } 3147 3148 void ndb_destroy(struct ndb *ndb) 3149 { 3150 if (ndb == NULL) 3151 return; 3152 3153 // ingester depends on writer and must be destroyed first 3154 ndb_ingester_destroy(&ndb->ingester); 3155 ndb_writer_destroy(&ndb->writer); 3156 3157 mdb_env_close(ndb->lmdb.env); 3158 3159 free(ndb); 3160 } 3161 3162 // Process a nostr event from a client 3163 // 3164 // ie: ["EVENT", {"content":"..."} ...] 3165 // 3166 // The client-sent variation of ndb_process_event 3167 int ndb_process_client_event(struct ndb *ndb, const char *json, int len) 3168 { 3169 // Since we need to return as soon as possible, and we're not 3170 // making any assumptions about the lifetime of the string, we 3171 // definitely need to copy the json here. In the future once we 3172 // have our thread that manages a websocket connection, we can 3173 // avoid the copy and just use the buffer we get from that 3174 // thread. 3175 char *json_copy = strdupn(json, len); 3176 if (json_copy == NULL) 3177 return 0; 3178 3179 return ndb_ingester_queue_event(&ndb->ingester, json_copy, len, 1); 3180 } 3181 3182 // Process anostr event from a relay, 3183 // 3184 // ie: ["EVENT", "subid", {"content":"..."}...] 3185 // 3186 // This function returns as soon as possible, first copying the passed 3187 // json and then queueing it up for processing. Worker threads then take 3188 // the json and process it. 3189 // 3190 // Processing: 3191 // 3192 // 1. The event is parsed into ndb_notes and the signature is validated 3193 // 2. A quick lookup is made on the database to see if we already have 3194 // the note id, if we do we don't need to waste time on json parsing 3195 // or note validation. 3196 // 3. Once validation is done we pass it to the writer queue for writing 3197 // to LMDB. 3198 // 3199 int ndb_process_event(struct ndb *ndb, const char *json, int json_len) 3200 { 3201 // Since we need to return as soon as possible, and we're not 3202 // making any assumptions about the lifetime of the string, we 3203 // definitely need to copy the json here. In the future once we 3204 // have our thread that manages a websocket connection, we can 3205 // avoid the copy and just use the buffer we get from that 3206 // thread. 3207 char *json_copy = strdupn(json, json_len); 3208 if (json_copy == NULL) 3209 return 0; 3210 3211 return ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len, 0); 3212 } 3213 3214 3215 int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client) 3216 { 3217 const char *start, *end, *very_end; 3218 start = ldjson; 3219 end = start + json_len; 3220 very_end = ldjson + json_len; 3221 int (* process)(struct ndb *, const char *, int); 3222 #if DEBUG 3223 int processed = 0; 3224 #endif 3225 process = client ? ndb_process_client_event : ndb_process_event; 3226 3227 while ((end = fast_strchr(start, '\n', very_end - start))) { 3228 //printf("processing '%.*s'\n", (int)(end-start), start); 3229 if (!process(ndb, start, end - start)) { 3230 ndb_debug("ndb_process_client_event failed\n"); 3231 return 0; 3232 } 3233 start = end + 1; 3234 #if DEBUG 3235 processed++; 3236 #endif 3237 } 3238 3239 #if DEBUG 3240 ndb_debug("ndb_process_events: processed %d events\n", processed); 3241 #endif 3242 3243 return 1; 3244 } 3245 3246 int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len) 3247 { 3248 return _ndb_process_events(ndb, ldjson, json_len, 1); 3249 } 3250 3251 int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len) 3252 { 3253 return _ndb_process_events(ndb, ldjson, json_len, 0); 3254 } 3255 3256 static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag) 3257 { 3258 return cursor_push_u16(cur, tag->count); 3259 } 3260 3261 int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf, 3262 int bufsize) 3263 { 3264 struct ndb_note *note; 3265 int half, size, str_indices_size; 3266 3267 // come on bruh 3268 if (bufsize < sizeof(struct ndb_note) * 2) 3269 return 0; 3270 3271 str_indices_size = bufsize / 32; 3272 size = bufsize - str_indices_size; 3273 half = size / 2; 3274 3275 //debug("size %d half %d str_indices %d\n", size, half, str_indices_size); 3276 3277 // make a safe cursor of our available memory 3278 make_cursor(buf, buf + bufsize, &builder->mem); 3279 3280 note = builder->note = (struct ndb_note *)buf; 3281 3282 // take slices of the memory into subcursors 3283 if (!(cursor_slice(&builder->mem, &builder->note_cur, half) && 3284 cursor_slice(&builder->mem, &builder->strings, half) && 3285 cursor_slice(&builder->mem, &builder->str_indices, str_indices_size))) { 3286 return 0; 3287 } 3288 3289 memset(note, 0, sizeof(*note)); 3290 builder->note_cur.p += sizeof(*note); 3291 3292 note->strings = builder->strings.start - buf; 3293 note->version = 1; 3294 3295 return 1; 3296 } 3297 3298 3299 3300 static inline int ndb_json_parser_init(struct ndb_json_parser *p, 3301 const char *json, int json_len, 3302 unsigned char *buf, int bufsize) 3303 { 3304 int half = bufsize / 2; 3305 3306 unsigned char *tok_start = buf + half; 3307 unsigned char *tok_end = buf + bufsize; 3308 3309 p->toks = (jsmntok_t*)tok_start; 3310 p->toks_end = (jsmntok_t*)tok_end; 3311 p->num_tokens = 0; 3312 p->json = json; 3313 p->json_len = json_len; 3314 3315 // ndb_builder gets the first half of the buffer, and jsmn gets the 3316 // second half. I like this way of alloating memory (without actually 3317 // dynamically allocating memory). You get one big chunk upfront and 3318 // then submodules can recursively subdivide it. Maybe you could do 3319 // something even more clever like golden-ratio style subdivision where 3320 // the more important stuff gets a larger chunk and then it spirals 3321 // downward into smaller chunks. Thanks for coming to my TED talk. 3322 3323 if (!ndb_builder_init(&p->builder, buf, half)) 3324 return 0; 3325 3326 jsmn_init(&p->json_parser); 3327 3328 return 1; 3329 } 3330 3331 static inline int ndb_json_parser_parse(struct ndb_json_parser *p, 3332 struct ndb_id_cb *cb) 3333 { 3334 jsmntok_t *tok; 3335 int cap = ((unsigned char *)p->toks_end - (unsigned char*)p->toks)/sizeof(*p->toks); 3336 int res = 3337 jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap, cb != NULL); 3338 3339 // got an ID! 3340 if (res == -42) { 3341 tok = &p->toks[p->json_parser.toknext-1]; 3342 3343 switch (cb->fn(cb->data, p->json + tok->start)) { 3344 case NDB_IDRES_CONT: 3345 res = jsmn_parse(&p->json_parser, p->json, p->json_len, 3346 p->toks, cap, 0); 3347 break; 3348 case NDB_IDRES_STOP: 3349 return -42; 3350 } 3351 } else if (res == 0) { 3352 return 0; 3353 } 3354 3355 p->num_tokens = res; 3356 p->i = 0; 3357 3358 return 1; 3359 } 3360 3361 static inline int toksize(jsmntok_t *tok) 3362 { 3363 return tok->end - tok->start; 3364 } 3365 3366 3367 3368 static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2) 3369 { 3370 switch (c2) { 3371 case 't': return cursor_push_byte(cur, '\t'); 3372 case 'n': return cursor_push_byte(cur, '\n'); 3373 case 'r': return cursor_push_byte(cur, '\r'); 3374 case 'b': return cursor_push_byte(cur, '\b'); 3375 case 'f': return cursor_push_byte(cur, '\f'); 3376 case '\\': return cursor_push_byte(cur, '\\'); 3377 case '/': return cursor_push_byte(cur, '/'); 3378 case '"': return cursor_push_byte(cur, '"'); 3379 case 'u': 3380 // these aren't handled yet 3381 return 0; 3382 default: 3383 return cursor_push_byte(cur, c1) && cursor_push_byte(cur, c2); 3384 } 3385 } 3386 3387 static int cursor_push_escaped_char(struct cursor *cur, char c) 3388 { 3389 switch (c) { 3390 case '"': return cursor_push_str(cur, "\\\""); 3391 case '\\': return cursor_push_str(cur, "\\\\"); 3392 case '\b': return cursor_push_str(cur, "\\b"); 3393 case '\f': return cursor_push_str(cur, "\\f"); 3394 case '\n': return cursor_push_str(cur, "\\n"); 3395 case '\r': return cursor_push_str(cur, "\\r"); 3396 case '\t': return cursor_push_str(cur, "\\t"); 3397 // TODO: \u hex hex hex hex 3398 } 3399 return cursor_push_byte(cur, c); 3400 } 3401 3402 static int cursor_push_hex_str(struct cursor *cur, unsigned char *buf, int len) 3403 { 3404 int i; 3405 3406 if (len % 2 != 0) 3407 return 0; 3408 3409 if (!cursor_push_byte(cur, '"')) 3410 return 0; 3411 3412 for (i = 0; i < len; i++) { 3413 unsigned int c = ((const unsigned char *)buf)[i]; 3414 if (!cursor_push_byte(cur, hexchar(c >> 4))) 3415 return 0; 3416 if (!cursor_push_byte(cur, hexchar(c & 0xF))) 3417 return 0; 3418 } 3419 3420 if (!cursor_push_byte(cur, '"')) 3421 return 0; 3422 3423 return 1; 3424 } 3425 3426 static int cursor_push_jsonstr(struct cursor *cur, const char *str) 3427 { 3428 int i; 3429 int len; 3430 3431 len = strlen(str); 3432 3433 if (!cursor_push_byte(cur, '"')) 3434 return 0; 3435 3436 for (i = 0; i < len; i++) { 3437 if (!cursor_push_escaped_char(cur, str[i])) 3438 return 0; 3439 } 3440 3441 if (!cursor_push_byte(cur, '"')) 3442 return 0; 3443 3444 return 1; 3445 } 3446 3447 3448 static inline int cursor_push_json_tag_str(struct cursor *cur, struct ndb_str str) 3449 { 3450 if (str.flag == NDB_PACKED_ID) 3451 return cursor_push_hex_str(cur, str.id, 32); 3452 3453 return cursor_push_jsonstr(cur, str.str); 3454 } 3455 3456 static int cursor_push_json_tag(struct cursor *cur, struct ndb_note *note, 3457 struct ndb_tag *tag) 3458 { 3459 int i; 3460 3461 if (!cursor_push_byte(cur, '[')) 3462 return 0; 3463 3464 for (i = 0; i < tag->count; i++) { 3465 if (!cursor_push_json_tag_str(cur, ndb_note_str(note, &tag->strs[i]))) 3466 return 0; 3467 if (i != tag->count-1 && !cursor_push_byte(cur, ',')) 3468 return 0; 3469 } 3470 3471 return cursor_push_byte(cur, ']'); 3472 } 3473 3474 static int cursor_push_json_tags(struct cursor *cur, struct ndb_note *note) 3475 { 3476 int i; 3477 struct ndb_iterator iter, *it = &iter; 3478 ndb_tags_iterate_start(note, it); 3479 3480 if (!cursor_push_byte(cur, '[')) 3481 return 0; 3482 3483 i = 0; 3484 while (ndb_tags_iterate_next(it)) { 3485 if (!cursor_push_json_tag(cur, note, it->tag)) 3486 return 0; 3487 if (i != note->tags.count-1 && !cursor_push_str(cur, ",")) 3488 return 0; 3489 i++; 3490 } 3491 3492 if (!cursor_push_byte(cur, ']')) 3493 return 0; 3494 3495 return 1; 3496 } 3497 3498 static int ndb_event_commitment(struct ndb_note *ev, unsigned char *buf, int buflen) 3499 { 3500 char timebuf[16] = {0}; 3501 char kindbuf[16] = {0}; 3502 char pubkey[65]; 3503 struct cursor cur; 3504 int ok; 3505 3506 if (!hex_encode(ev->pubkey, sizeof(ev->pubkey), pubkey, sizeof(pubkey))) 3507 return 0; 3508 3509 make_cursor(buf, buf + buflen, &cur); 3510 3511 // TODO: update in 2106 ... 3512 snprintf(timebuf, sizeof(timebuf), "%d", (uint32_t)ev->created_at); 3513 snprintf(kindbuf, sizeof(kindbuf), "%d", ev->kind); 3514 3515 ok = 3516 cursor_push_str(&cur, "[0,\"") && 3517 cursor_push_str(&cur, pubkey) && 3518 cursor_push_str(&cur, "\",") && 3519 cursor_push_str(&cur, timebuf) && 3520 cursor_push_str(&cur, ",") && 3521 cursor_push_str(&cur, kindbuf) && 3522 cursor_push_str(&cur, ",") && 3523 cursor_push_json_tags(&cur, ev) && 3524 cursor_push_str(&cur, ",") && 3525 cursor_push_jsonstr(&cur, ndb_note_str(ev, &ev->content).str) && 3526 cursor_push_str(&cur, "]"); 3527 3528 if (!ok) 3529 return 0; 3530 3531 return cur.p - cur.start; 3532 } 3533 3534 int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen) { 3535 int len; 3536 3537 if (!(len = ndb_event_commitment(note, buf, buflen))) 3538 return 0; 3539 3540 //fprintf(stderr, "%.*s\n", len, buf); 3541 3542 sha256((struct sha256*)note->id, buf, len); 3543 3544 return 1; 3545 } 3546 3547 int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32], 3548 unsigned char sig[64]) 3549 { 3550 unsigned char aux[32]; 3551 secp256k1_keypair *pair = (secp256k1_keypair*) keypair->pair; 3552 3553 if (!fill_random(aux, sizeof(aux))) 3554 return 0; 3555 3556 secp256k1_context *ctx = 3557 secp256k1_context_create(SECP256K1_CONTEXT_NONE); 3558 3559 return secp256k1_schnorrsig_sign32(ctx, sig, id, pair, aux); 3560 } 3561 3562 int ndb_create_keypair(struct ndb_keypair *kp) 3563 { 3564 secp256k1_keypair *keypair = (secp256k1_keypair*)kp->pair; 3565 secp256k1_xonly_pubkey pubkey; 3566 3567 secp256k1_context *ctx = 3568 secp256k1_context_create(SECP256K1_CONTEXT_NONE);; 3569 3570 /* Try to create a keypair with a valid context, it should only 3571 * fail if the secret key is zero or out of range. */ 3572 if (!secp256k1_keypair_create(ctx, keypair, kp->secret)) 3573 return 0; 3574 3575 if (!secp256k1_keypair_xonly_pub(ctx, &pubkey, NULL, keypair)) 3576 return 0; 3577 3578 /* Serialize the public key. Should always return 1 for a valid public key. */ 3579 return secp256k1_xonly_pubkey_serialize(ctx, kp->pubkey, &pubkey); 3580 } 3581 3582 int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair) 3583 { 3584 if (!hex_decode(secstr, strlen(secstr), keypair->secret, 32)) { 3585 fprintf(stderr, "could not hex decode secret key\n"); 3586 return 0; 3587 } 3588 3589 return ndb_create_keypair(keypair); 3590 } 3591 3592 int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note, 3593 struct ndb_keypair *keypair) 3594 { 3595 int strings_len = builder->strings.p - builder->strings.start; 3596 unsigned char *note_end = builder->note_cur.p + strings_len; 3597 int total_size = note_end - builder->note_cur.start; 3598 3599 // move the strings buffer next to the end of our ndb_note 3600 memmove(builder->note_cur.p, builder->strings.start, strings_len); 3601 3602 // set the strings location 3603 builder->note->strings = builder->note_cur.p - builder->note_cur.start; 3604 3605 // record the total size 3606 //builder->note->size = total_size; 3607 3608 *note = builder->note; 3609 3610 // generate id and sign if we're building this manually 3611 if (keypair) { 3612 // use the remaining memory for building our id buffer 3613 unsigned char *end = builder->mem.end; 3614 unsigned char *start = (unsigned char*)(*note) + total_size; 3615 3616 ndb_builder_set_pubkey(builder, keypair->pubkey); 3617 3618 if (!ndb_calculate_id(builder->note, start, end - start)) 3619 return 0; 3620 3621 if (!ndb_sign_id(keypair, (*note)->id, (*note)->sig)) 3622 return 0; 3623 } 3624 3625 // make sure we're aligned as a whole 3626 total_size = (total_size + 7) & ~7; 3627 assert((total_size % 8) == 0); 3628 return total_size; 3629 } 3630 3631 struct ndb_note * ndb_builder_note(struct ndb_builder *builder) 3632 { 3633 return builder->note; 3634 } 3635 3636 /// find an existing string via str_indices. these indices only exist in the 3637 /// builder phase just for this purpose. 3638 static inline int ndb_builder_find_str(struct ndb_builder *builder, 3639 const char *str, int len, 3640 union ndb_packed_str *pstr) 3641 { 3642 // find existing matching string to avoid duplicate strings 3643 int indices = cursor_count(&builder->str_indices, sizeof(uint32_t)); 3644 for (int i = 0; i < indices; i++) { 3645 uint32_t index = ((uint32_t*)builder->str_indices.start)[i]; 3646 const char *some_str = (const char*)builder->strings.start + index; 3647 3648 if (!memcmp(some_str, str, len)) { 3649 // found an existing matching str, use that index 3650 *pstr = ndb_offset_str(index); 3651 return 1; 3652 } 3653 } 3654 3655 return 0; 3656 } 3657 3658 static int ndb_builder_push_str(struct ndb_builder *builder, const char *str, 3659 int len, union ndb_packed_str *pstr) 3660 { 3661 uint32_t loc; 3662 3663 // no string found, push a new one 3664 loc = builder->strings.p - builder->strings.start; 3665 if (!(cursor_push(&builder->strings, (unsigned char*)str, len) && 3666 cursor_push_byte(&builder->strings, '\0'))) { 3667 return 0; 3668 } 3669 3670 *pstr = ndb_offset_str(loc); 3671 3672 // record in builder indices. ignore return value, if we can't cache it 3673 // then whatever 3674 cursor_push_u32(&builder->str_indices, loc); 3675 3676 return 1; 3677 } 3678 3679 static int ndb_builder_push_packed_id(struct ndb_builder *builder, 3680 unsigned char *id, 3681 union ndb_packed_str *pstr) 3682 { 3683 // Don't both find id duplicates. very rarely are they duplicated 3684 // and it slows things down quite a bit. If we really care about this 3685 // We can switch to a hash table. 3686 //if (ndb_builder_find_str(builder, (const char*)id, 32, pstr)) { 3687 // pstr->packed.flag = NDB_PACKED_ID; 3688 // return 1; 3689 //} 3690 3691 if (ndb_builder_push_str(builder, (const char*)id, 32, pstr)) { 3692 pstr->packed.flag = NDB_PACKED_ID; 3693 return 1; 3694 } 3695 3696 return 0; 3697 } 3698 3699 3700 /// Check for small strings to pack 3701 static inline int ndb_builder_try_compact_str(struct ndb_builder *builder, 3702 const char *str, int len, 3703 union ndb_packed_str *pstr, 3704 int pack_ids) 3705 { 3706 unsigned char id_buf[32]; 3707 3708 if (len == 0) { 3709 *pstr = ndb_char_to_packed_str(0); 3710 return 1; 3711 } else if (len == 1) { 3712 *pstr = ndb_char_to_packed_str(str[0]); 3713 return 1; 3714 } else if (len == 2) { 3715 *pstr = ndb_chars_to_packed_str(str[0], str[1]); 3716 return 1; 3717 } else if (pack_ids && len == 64 && hex_decode(str, 64, id_buf, 32)) { 3718 return ndb_builder_push_packed_id(builder, id_buf, pstr); 3719 } 3720 3721 return 0; 3722 } 3723 3724 3725 static int ndb_builder_push_unpacked_str(struct ndb_builder *builder, 3726 const char *str, int len, 3727 union ndb_packed_str *pstr) 3728 { 3729 if (ndb_builder_find_str(builder, str, len, pstr)) 3730 return 1; 3731 3732 return ndb_builder_push_str(builder, str, len, pstr); 3733 } 3734 3735 int ndb_builder_make_str(struct ndb_builder *builder, const char *str, int len, 3736 union ndb_packed_str *pstr, int pack_ids) 3737 { 3738 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 3739 return 1; 3740 3741 return ndb_builder_push_unpacked_str(builder, str, len, pstr); 3742 } 3743 3744 int ndb_builder_set_content(struct ndb_builder *builder, const char *content, 3745 int len) 3746 { 3747 int pack_ids = 0; 3748 builder->note->content_length = len; 3749 return ndb_builder_make_str(builder, content, len, 3750 &builder->note->content, pack_ids); 3751 } 3752 3753 3754 static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len, 3755 const char *s) 3756 { 3757 if (tok->type == JSMN_STRING && (int)strlen(s) == tok_len && 3758 memcmp(json + tok->start, s, tok_len) == 0) { 3759 return 1; 3760 } 3761 return 0; 3762 } 3763 3764 static int ndb_builder_finalize_tag(struct ndb_builder *builder, 3765 union ndb_packed_str offset) 3766 { 3767 if (!cursor_push_u32(&builder->note_cur, offset.offset)) 3768 return 0; 3769 builder->current_tag->count++; 3770 return 1; 3771 } 3772 3773 /// Unescape and push json strings 3774 static int ndb_builder_make_json_str(struct ndb_builder *builder, 3775 const char *str, int len, 3776 union ndb_packed_str *pstr, 3777 int *written, int pack_ids) 3778 { 3779 // let's not care about de-duping these. we should just unescape 3780 // in-place directly into the strings table. 3781 if (written) 3782 *written = len; 3783 3784 const char *p, *end, *start; 3785 unsigned char *builder_start; 3786 3787 // always try compact strings first 3788 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 3789 return 1; 3790 3791 end = str + len; 3792 start = str; // Initialize start to the beginning of the string 3793 3794 *pstr = ndb_offset_str(builder->strings.p - builder->strings.start); 3795 builder_start = builder->strings.p; 3796 3797 for (p = str; p < end; p++) { 3798 if (*p == '\\' && p+1 < end) { 3799 // Push the chunk of unescaped characters before this escape sequence 3800 if (start < p && !cursor_push(&builder->strings, 3801 (unsigned char *)start, 3802 p - start)) { 3803 return 0; 3804 } 3805 3806 if (!cursor_push_unescaped_char(&builder->strings, *p, *(p+1))) 3807 return 0; 3808 3809 p++; // Skip the character following the backslash 3810 start = p + 1; // Update the start pointer to the next character 3811 } 3812 } 3813 3814 // Handle the last chunk after the last escape sequence (or if there are no escape sequences at all) 3815 if (start < p && !cursor_push(&builder->strings, (unsigned char *)start, 3816 p - start)) { 3817 return 0; 3818 } 3819 3820 if (written) 3821 *written = builder->strings.p - builder_start; 3822 3823 // TODO: dedupe these!? 3824 return cursor_push_byte(&builder->strings, '\0'); 3825 } 3826 3827 static int ndb_builder_push_json_tag(struct ndb_builder *builder, 3828 const char *str, int len) 3829 { 3830 union ndb_packed_str pstr; 3831 int pack_ids = 1; 3832 if (!ndb_builder_make_json_str(builder, str, len, &pstr, NULL, pack_ids)) 3833 return 0; 3834 return ndb_builder_finalize_tag(builder, pstr); 3835 } 3836 3837 // Push a json array into an ndb tag ["p", "abcd..."] -> struct ndb_tag 3838 static int ndb_builder_tag_from_json_array(struct ndb_json_parser *p, 3839 jsmntok_t *array) 3840 { 3841 jsmntok_t *str_tok; 3842 const char *str; 3843 3844 if (array->size == 0) 3845 return 0; 3846 3847 if (!ndb_builder_new_tag(&p->builder)) 3848 return 0; 3849 3850 for (int i = 0; i < array->size; i++) { 3851 str_tok = &array[i+1]; 3852 str = p->json + str_tok->start; 3853 3854 if (!ndb_builder_push_json_tag(&p->builder, str, 3855 toksize(str_tok))) { 3856 return 0; 3857 } 3858 } 3859 3860 return 1; 3861 } 3862 3863 // Push json tags into ndb data 3864 // [["t", "hashtag"], ["p", "abcde..."]] -> struct ndb_tags 3865 static inline int ndb_builder_process_json_tags(struct ndb_json_parser *p, 3866 jsmntok_t *array) 3867 { 3868 jsmntok_t *tag = array; 3869 3870 if (array->size == 0) 3871 return 1; 3872 3873 for (int i = 0; i < array->size; i++) { 3874 if (!ndb_builder_tag_from_json_array(p, &tag[i+1])) 3875 return 0; 3876 tag += tag[i+1].size; 3877 } 3878 3879 return 1; 3880 } 3881 3882 static int parse_unsigned_int(const char *start, int len, unsigned int *num) 3883 { 3884 unsigned int number = 0; 3885 const char *p = start, *end = start + len; 3886 int digits = 0; 3887 3888 while (p < end) { 3889 char c = *p; 3890 3891 if (c < '0' || c > '9') 3892 break; 3893 3894 // Check for overflow 3895 char digit = c - '0'; 3896 if (number > (UINT_MAX - digit) / 10) 3897 return 0; // Overflow detected 3898 3899 number = number * 10 + digit; 3900 3901 p++; 3902 digits++; 3903 } 3904 3905 if (digits == 0) 3906 return 0; 3907 3908 *num = number; 3909 return 1; 3910 } 3911 3912 int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce, 3913 unsigned char *buf, int bufsize, struct ndb_id_cb *cb) 3914 { 3915 jsmntok_t *tok = NULL; 3916 int tok_len, res; 3917 struct ndb_json_parser parser; 3918 3919 ndb_json_parser_init(&parser, json, len, buf, bufsize); 3920 3921 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 3922 return res; 3923 3924 if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY) 3925 return 0; 3926 3927 parser.i = 1; 3928 tok = &parser.toks[parser.i++]; 3929 tok_len = toksize(tok); 3930 if (tok->type != JSMN_STRING) 3931 return 0; 3932 3933 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 3934 fce->evtype = NDB_FCE_EVENT; 3935 struct ndb_event *ev = &fce->event; 3936 return ndb_parse_json_note(&parser, &ev->note); 3937 } 3938 3939 return 0; 3940 } 3941 3942 3943 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, 3944 unsigned char *buf, int bufsize, 3945 struct ndb_id_cb *cb) 3946 { 3947 jsmntok_t *tok = NULL; 3948 int tok_len, res; 3949 struct ndb_json_parser parser; 3950 3951 tce->subid_len = 0; 3952 tce->subid = ""; 3953 3954 ndb_json_parser_init(&parser, json, len, buf, bufsize); 3955 3956 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 3957 return res; 3958 3959 if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY) 3960 return 0; 3961 3962 parser.i = 1; 3963 tok = &parser.toks[parser.i++]; 3964 tok_len = toksize(tok); 3965 if (tok->type != JSMN_STRING) 3966 return 0; 3967 3968 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 3969 tce->evtype = NDB_TCE_EVENT; 3970 struct ndb_event *ev = &tce->event; 3971 3972 tok = &parser.toks[parser.i++]; 3973 if (tok->type != JSMN_STRING) 3974 return 0; 3975 3976 tce->subid = json + tok->start; 3977 tce->subid_len = toksize(tok); 3978 3979 return ndb_parse_json_note(&parser, &ev->note); 3980 } else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) { 3981 tce->evtype = NDB_TCE_EOSE; 3982 3983 tok = &parser.toks[parser.i++]; 3984 if (tok->type != JSMN_STRING) 3985 return 0; 3986 3987 tce->subid = json + tok->start; 3988 tce->subid_len = toksize(tok); 3989 return 1; 3990 } else if (tok_len == 2 && !memcmp("OK", json + tok->start, 2)) { 3991 if (parser.num_tokens != 5) 3992 return 0; 3993 3994 struct ndb_command_result *cr = &tce->command_result; 3995 3996 tce->evtype = NDB_TCE_OK; 3997 3998 tok = &parser.toks[parser.i++]; 3999 if (tok->type != JSMN_STRING) 4000 return 0; 4001 4002 tce->subid = json + tok->start; 4003 tce->subid_len = toksize(tok); 4004 4005 tok = &parser.toks[parser.i++]; 4006 if (tok->type != JSMN_PRIMITIVE || toksize(tok) == 0) 4007 return 0; 4008 4009 cr->ok = (json + tok->start)[0] == 't'; 4010 4011 tok = &parser.toks[parser.i++]; 4012 if (tok->type != JSMN_STRING) 4013 return 0; 4014 4015 tce->command_result.msg = json + tok->start; 4016 tce->command_result.msglen = toksize(tok); 4017 4018 return 1; 4019 } else if (tok_len == 4 && !memcmp("AUTH", json + tok->start, 4)) { 4020 tce->evtype = NDB_TCE_AUTH; 4021 4022 tok = &parser.toks[parser.i++]; 4023 if (tok->type != JSMN_STRING) 4024 return 0; 4025 4026 tce->subid = json + tok->start; 4027 tce->subid_len = toksize(tok); 4028 4029 return 1; 4030 } 4031 4032 return 0; 4033 } 4034 4035 int ndb_parse_json_note(struct ndb_json_parser *parser, struct ndb_note **note) 4036 { 4037 jsmntok_t *tok = NULL; 4038 unsigned char hexbuf[64]; 4039 const char *json = parser->json; 4040 const char *start; 4041 int i, tok_len, parsed; 4042 4043 parsed = 0; 4044 4045 if (parser->toks[parser->i].type != JSMN_OBJECT) 4046 return 0; 4047 4048 // TODO: build id buffer and verify at end 4049 4050 for (i = parser->i + 1; i < parser->num_tokens; i++) { 4051 tok = &parser->toks[i]; 4052 start = json + tok->start; 4053 tok_len = toksize(tok); 4054 4055 //printf("toplevel %.*s %d\n", tok_len, json + tok->start, tok->type); 4056 if (tok_len == 0 || i + 1 >= parser->num_tokens) 4057 continue; 4058 4059 if (start[0] == 'p' && jsoneq(json, tok, tok_len, "pubkey")) { 4060 // pubkey 4061 tok = &parser->toks[i+1]; 4062 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 4063 parsed |= NDB_PARSED_PUBKEY; 4064 ndb_builder_set_pubkey(&parser->builder, hexbuf); 4065 } else if (tok_len == 2 && start[0] == 'i' && start[1] == 'd') { 4066 // id 4067 tok = &parser->toks[i+1]; 4068 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 4069 parsed |= NDB_PARSED_ID; 4070 ndb_builder_set_id(&parser->builder, hexbuf); 4071 } else if (tok_len == 3 && start[0] == 's' && start[1] == 'i' && start[2] == 'g') { 4072 // sig 4073 tok = &parser->toks[i+1]; 4074 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 4075 parsed |= NDB_PARSED_SIG; 4076 ndb_builder_set_sig(&parser->builder, hexbuf); 4077 } else if (start[0] == 'k' && jsoneq(json, tok, tok_len, "kind")) { 4078 // kind 4079 tok = &parser->toks[i+1]; 4080 start = json + tok->start; 4081 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 4082 return 0; 4083 if (!parse_unsigned_int(start, toksize(tok), 4084 &parser->builder.note->kind)) 4085 return 0; 4086 parsed |= NDB_PARSED_KIND; 4087 } else if (start[0] == 'c') { 4088 if (jsoneq(json, tok, tok_len, "created_at")) { 4089 // created_at 4090 tok = &parser->toks[i+1]; 4091 start = json + tok->start; 4092 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 4093 return 0; 4094 // TODO: update to int64 in 2106 ... xD 4095 unsigned int bigi; 4096 if (!parse_unsigned_int(start, toksize(tok), &bigi)) 4097 return 0; 4098 parser->builder.note->created_at = bigi; 4099 parsed |= NDB_PARSED_CREATED_AT; 4100 } else if (jsoneq(json, tok, tok_len, "content")) { 4101 // content 4102 tok = &parser->toks[i+1]; 4103 union ndb_packed_str pstr; 4104 tok_len = toksize(tok); 4105 int written, pack_ids = 0; 4106 if (!ndb_builder_make_json_str(&parser->builder, 4107 json + tok->start, 4108 tok_len, &pstr, 4109 &written, pack_ids)) { 4110 ndb_debug("ndb_builder_make_json_str failed\n"); 4111 return 0; 4112 } 4113 parser->builder.note->content_length = written; 4114 parser->builder.note->content = pstr; 4115 parsed |= NDB_PARSED_CONTENT; 4116 } 4117 } else if (start[0] == 't' && jsoneq(json, tok, tok_len, "tags")) { 4118 tok = &parser->toks[i+1]; 4119 ndb_builder_process_json_tags(parser, tok); 4120 i += tok->size; 4121 parsed |= NDB_PARSED_TAGS; 4122 } 4123 } 4124 4125 //ndb_debug("parsed %d = %d, &->%d", parsed, NDB_PARSED_ALL, parsed & NDB_PARSED_ALL); 4126 if (parsed != NDB_PARSED_ALL) 4127 return 0; 4128 4129 return ndb_builder_finalize(&parser->builder, note, NULL); 4130 } 4131 4132 int ndb_note_from_json(const char *json, int len, struct ndb_note **note, 4133 unsigned char *buf, int bufsize) 4134 { 4135 struct ndb_json_parser parser; 4136 int res; 4137 4138 ndb_json_parser_init(&parser, json, len, buf, bufsize); 4139 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 4140 return res; 4141 4142 if (parser.num_tokens < 1) 4143 return 0; 4144 4145 return ndb_parse_json_note(&parser, note); 4146 } 4147 4148 void ndb_builder_set_pubkey(struct ndb_builder *builder, unsigned char *pubkey) 4149 { 4150 memcpy(builder->note->pubkey, pubkey, 32); 4151 } 4152 4153 void ndb_builder_set_id(struct ndb_builder *builder, unsigned char *id) 4154 { 4155 memcpy(builder->note->id, id, 32); 4156 } 4157 4158 void ndb_builder_set_sig(struct ndb_builder *builder, unsigned char *sig) 4159 { 4160 memcpy(builder->note->sig, sig, 64); 4161 } 4162 4163 void ndb_builder_set_kind(struct ndb_builder *builder, uint32_t kind) 4164 { 4165 builder->note->kind = kind; 4166 } 4167 4168 void ndb_builder_set_created_at(struct ndb_builder *builder, uint64_t created_at) 4169 { 4170 builder->note->created_at = created_at; 4171 } 4172 4173 int ndb_builder_new_tag(struct ndb_builder *builder) 4174 { 4175 builder->note->tags.count++; 4176 struct ndb_tag tag = {0}; 4177 builder->current_tag = (struct ndb_tag *)builder->note_cur.p; 4178 return cursor_push_tag(&builder->note_cur, &tag); 4179 } 4180 4181 void ndb_stat_counts_init(struct ndb_stat_counts *counts) 4182 { 4183 counts->count = 0; 4184 counts->key_size = 0; 4185 counts->value_size = 0; 4186 } 4187 4188 static void ndb_stat_init(struct ndb_stat *stat) 4189 { 4190 // init stats 4191 int i; 4192 4193 for (i = 0; i < NDB_CKIND_COUNT; i++) { 4194 ndb_stat_counts_init(&stat->common_kinds[i]); 4195 } 4196 4197 for (i = 0; i < NDB_DBS; i++) { 4198 ndb_stat_counts_init(&stat->dbs[i]); 4199 } 4200 4201 ndb_stat_counts_init(&stat->other_kinds); 4202 } 4203 4204 int ndb_stat(struct ndb *ndb, struct ndb_stat *stat) 4205 { 4206 int rc; 4207 MDB_cursor *cur; 4208 MDB_val k, v; 4209 MDB_dbi db; 4210 struct ndb_txn txn; 4211 struct ndb_note *note; 4212 int i; 4213 enum ndb_common_kind common_kind; 4214 4215 // initialize to 0 4216 ndb_stat_init(stat); 4217 4218 if (!ndb_begin_query(ndb, &txn)) { 4219 fprintf(stderr, "ndb_stat failed at ndb_begin_query\n"); 4220 return 0; 4221 } 4222 4223 // stat each dbi in the database 4224 for (i = 0; i < NDB_DBS; i++) 4225 { 4226 db = ndb->lmdb.dbs[i]; 4227 4228 if ((rc = mdb_cursor_open(txn.mdb_txn, db, &cur))) { 4229 fprintf(stderr, "ndb_stat: mdb_cursor_open failed, error '%s'\n", 4230 mdb_strerror(rc)); 4231 return 0; 4232 } 4233 4234 // loop over every entry and count kv sizes 4235 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 4236 // we gather more detailed per-kind stats if we're in 4237 // the notes db 4238 if (i == NDB_DB_NOTE) { 4239 note = v.mv_data; 4240 common_kind = ndb_kind_to_common_kind(note->kind); 4241 4242 // uncommon kind? just count them in bulk 4243 if (common_kind == -1) { 4244 stat->other_kinds.count++; 4245 stat->other_kinds.key_size += k.mv_size; 4246 stat->other_kinds.value_size += v.mv_size; 4247 } else { 4248 stat->common_kinds[common_kind].count++; 4249 stat->common_kinds[common_kind].key_size += k.mv_size; 4250 stat->common_kinds[common_kind].value_size += v.mv_size; 4251 } 4252 } 4253 4254 stat->dbs[i].count++; 4255 stat->dbs[i].key_size += k.mv_size; 4256 stat->dbs[i].value_size += v.mv_size; 4257 } 4258 4259 // close the cursor, they are per-dbi 4260 mdb_cursor_close(cur); 4261 } 4262 4263 ndb_end_query(&txn); 4264 4265 return 1; 4266 } 4267 4268 /// Push an element to the current tag 4269 /// 4270 /// Basic idea is to call ndb_builder_new_tag 4271 inline int ndb_builder_push_tag_str(struct ndb_builder *builder, 4272 const char *str, int len) 4273 { 4274 union ndb_packed_str pstr; 4275 int pack_ids = 1; 4276 if (!ndb_builder_make_str(builder, str, len, &pstr, pack_ids)) 4277 return 0; 4278 return ndb_builder_finalize_tag(builder, pstr); 4279 } 4280 4281 // 4282 // CONFIG 4283 // 4284 void ndb_default_config(struct ndb_config *config) 4285 { 4286 int cores = get_cpu_cores(); 4287 config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB 4288 config->ingester_threads = cores == -1 ? 4 : cores; 4289 config->flags = 0; 4290 config->ingest_filter = NULL; 4291 config->filter_context = NULL; 4292 } 4293 4294 void ndb_config_set_ingest_threads(struct ndb_config *config, int threads) 4295 { 4296 config->ingester_threads = threads; 4297 } 4298 4299 void ndb_config_set_flags(struct ndb_config *config, int flags) 4300 { 4301 config->flags = flags; 4302 } 4303 4304 void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize) 4305 { 4306 config->mapsize = mapsize; 4307 } 4308 4309 void ndb_config_set_ingest_filter(struct ndb_config *config, 4310 ndb_ingest_filter_fn fn, void *filter_ctx) 4311 { 4312 config->ingest_filter = fn; 4313 config->filter_context = filter_ctx; 4314 } 4315 4316 // used by ndb.c 4317 int ndb_print_search_keys(struct ndb_txn *txn) 4318 { 4319 MDB_cursor *cur; 4320 MDB_val k, v; 4321 int i; 4322 struct ndb_text_search_key search_key; 4323 4324 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TEXT], &cur)) 4325 return 0; 4326 4327 i = 1; 4328 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 4329 if (!ndb_unpack_text_search_key(k.mv_data, k.mv_size, &search_key)) { 4330 fprintf(stderr, "error decoding key %d\n", i); 4331 continue; 4332 } 4333 4334 ndb_print_text_search_key(&search_key); 4335 printf("\n"); 4336 4337 i++; 4338 } 4339 4340 return 1; 4341 }