nostrdb.c (113574B)
1 2 #include "nostrdb.h" 3 #include "jsmn.h" 4 #include "hex.h" 5 #include "cursor.h" 6 #include "random.h" 7 #include "sha256.h" 8 #include "lmdb.h" 9 #include "util.h" 10 #include "cpu.h" 11 #include "threadpool.h" 12 #include "protected_queue.h" 13 #include "memchr.h" 14 #include <stdlib.h> 15 #include <limits.h> 16 #include <assert.h> 17 18 #include "bindings/c/profile_json_parser.h" 19 #include "bindings/c/profile_builder.h" 20 #include "bindings/c/meta_builder.h" 21 #include "bindings/c/meta_reader.h" 22 #include "bindings/c/profile_verifier.h" 23 #include "secp256k1.h" 24 #include "secp256k1_ecdh.h" 25 #include "secp256k1_schnorrsig.h" 26 27 #define max(a,b) ((a) > (b) ? (a) : (b)) 28 #define min(a,b) ((a) < (b) ? (a) : (b)) 29 30 // the maximum number of things threads pop and push in bulk 31 static const int THREAD_QUEUE_BATCH = 4096; 32 33 // the maximum size of inbox queues 34 static const int DEFAULT_QUEUE_SIZE = 1000000; 35 36 // increase if we need bigger filters 37 #define NDB_FILTER_PAGES 64 38 39 #define ndb_flag_set(flags, f) ((flags & f) == f) 40 41 #define NDB_PARSED_ID (1 << 0) 42 #define NDB_PARSED_PUBKEY (1 << 1) 43 #define NDB_PARSED_SIG (1 << 2) 44 #define NDB_PARSED_CREATED_AT (1 << 3) 45 #define NDB_PARSED_KIND (1 << 4) 46 #define NDB_PARSED_CONTENT (1 << 5) 47 #define NDB_PARSED_TAGS (1 << 6) 48 #define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS) 49 50 typedef int (*ndb_migrate_fn)(struct ndb *); 51 typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len, 52 int word_index); 53 54 // these must be byte-aligned, they are directly accessing the serialized data 55 // representation 56 #pragma pack(push, 1) 57 58 59 union ndb_packed_str { 60 struct { 61 char str[3]; 62 // we assume little endian everywhere. sorry not sorry. 63 unsigned char flag; // NDB_PACKED_STR, etc 64 } packed; 65 66 uint32_t offset; 67 unsigned char bytes[4]; 68 }; 69 70 struct ndb_tag { 71 uint16_t count; 72 union ndb_packed_str strs[0]; 73 }; 74 75 struct ndb_tags { 76 uint16_t padding; 77 uint16_t count; 78 struct ndb_tag tag[0]; 79 }; 80 81 // v1 82 struct ndb_note { 83 unsigned char version; // v=1 84 unsigned char padding[3]; // keep things aligned 85 unsigned char id[32]; 86 unsigned char pubkey[32]; 87 unsigned char sig[64]; 88 89 uint64_t created_at; 90 uint32_t kind; 91 uint32_t content_length; 92 union ndb_packed_str content; 93 uint32_t strings; 94 // nothing can come after tags since it contains variadic data 95 struct ndb_tags tags; 96 }; 97 98 #pragma pack(pop) 99 100 struct ndb_migration { 101 ndb_migrate_fn fn; 102 }; 103 104 struct ndb_profile_record_builder { 105 flatcc_builder_t *builder; 106 void *flatbuf; 107 }; 108 109 // controls whether to continue or stop the json parser 110 enum ndb_idres { 111 NDB_IDRES_CONT, 112 NDB_IDRES_STOP, 113 }; 114 115 // closure data for the id-detecting ingest controller 116 struct ndb_ingest_controller 117 { 118 MDB_txn *read_txn; 119 struct ndb_lmdb *lmdb; 120 }; 121 122 enum ndb_writer_msgtype { 123 NDB_WRITER_QUIT, // kill thread immediately 124 NDB_WRITER_NOTE, // write a note to the db 125 NDB_WRITER_PROFILE, // write a profile to the db 126 NDB_WRITER_DBMETA, // write ndb metadata 127 NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched 128 }; 129 130 // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) 131 enum ndb_meta_key { 132 NDB_META_KEY_VERSION = 1 133 }; 134 135 struct ndb_json_parser { 136 const char *json; 137 int json_len; 138 struct ndb_builder builder; 139 jsmn_parser json_parser; 140 jsmntok_t *toks, *toks_end; 141 int i; 142 int num_tokens; 143 }; 144 145 // useful to pass to threads on its own 146 struct ndb_lmdb { 147 MDB_env *env; 148 MDB_dbi dbs[NDB_DBS]; 149 }; 150 151 struct ndb_writer { 152 struct ndb_lmdb *lmdb; 153 154 void *queue_buf; 155 int queue_buflen; 156 pthread_t thread_id; 157 158 struct prot_queue inbox; 159 }; 160 161 struct ndb_ingester { 162 uint32_t flags; 163 struct threadpool tp; 164 struct ndb_writer *writer; 165 void *filter_context; 166 ndb_ingest_filter_fn filter; 167 }; 168 169 170 struct ndb { 171 struct ndb_lmdb lmdb; 172 struct ndb_ingester ingester; 173 struct ndb_writer writer; 174 int version; 175 uint32_t flags; // setting flags 176 // lmdb environ handles, etc 177 }; 178 179 180 // A clustered key with an id and a timestamp 181 struct ndb_tsid { 182 unsigned char id[32]; 183 uint64_t timestamp; 184 }; 185 186 // A u64 + timestamp id. Just using this for kinds at the moment. 187 struct ndb_u64_tsid { 188 uint64_t u64; // kind, etc 189 uint64_t timestamp; 190 }; 191 192 struct ndb_word 193 { 194 const char *word; 195 int word_len; 196 }; 197 198 struct ndb_search_words 199 { 200 struct ndb_word words[MAX_TEXT_SEARCH_WORDS]; 201 int num_words; 202 }; 203 204 // ndb_text_search_key 205 // 206 // This is compressed when in lmdb: 207 // 208 // note_id: varint 209 // strlen: varint 210 // str: cstr 211 // timestamp: varint 212 // word_index: varint 213 // 214 static int ndb_make_text_search_key(unsigned char *buf, int bufsize, 215 int word_index, int word_len, const char *str, 216 uint64_t timestamp, uint64_t note_id, 217 int *keysize) 218 { 219 struct cursor cur; 220 int size, pad; 221 make_cursor(buf, buf + bufsize, &cur); 222 223 // TODO: need update this to uint64_t 224 // we push this first because our query function can pull this off 225 // quicky to check matches 226 if (!push_varint(&cur, (int32_t)note_id)) 227 return 0; 228 229 // string length 230 if (!push_varint(&cur, word_len)) 231 return 0; 232 233 // non-null terminated, lowercase string 234 if (!cursor_push_lowercase(&cur, str, word_len)) 235 return 0; 236 237 // TODO: need update this to uint64_t 238 if (!push_varint(&cur, (int)timestamp)) 239 return 0; 240 241 // the index of the word in the content so that we can do more accurate 242 // phrase searches 243 if (!push_varint(&cur, word_index)) 244 return 0; 245 246 size = cur.p - cur.start; 247 248 // pad to 8-byte alignment 249 pad = ((size + 7) & ~7) - size; 250 if (pad > 0) { 251 if (!cursor_memset(&cur, 0, pad)) { 252 return 0; 253 } 254 } 255 256 *keysize = cur.p - cur.start; 257 assert((*keysize % 8) == 0); 258 259 return 1; 260 } 261 262 static int ndb_make_noted_text_search_key(unsigned char *buf, int bufsize, 263 int wordlen, const char *word, 264 uint64_t timestamp, uint64_t note_id, 265 int *keysize) 266 { 267 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 268 timestamp, note_id, keysize); 269 } 270 271 static int ndb_make_text_search_key_low(unsigned char *buf, int bufsize, 272 int wordlen, const char *word, 273 int *keysize) 274 { 275 uint64_t timestamp, note_id; 276 timestamp = 0; 277 note_id = 0; 278 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 279 timestamp, note_id, keysize); 280 } 281 282 static int ndb_make_text_search_key_high(unsigned char *buf, int bufsize, 283 int wordlen, const char *word, 284 int *keysize) 285 { 286 uint64_t timestamp, note_id; 287 timestamp = INT32_MAX; 288 note_id = INT32_MAX; 289 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 290 timestamp, note_id, keysize); 291 } 292 293 typedef int (*ndb_text_search_key_order_fn)(unsigned char *buf, int bufsize, int wordlen, const char *word, int *keysize); 294 295 /** From LMDB: Compare two items lexically */ 296 static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) { 297 int diff; 298 ssize_t len_diff; 299 unsigned int len; 300 301 len = a->mv_size; 302 len_diff = (ssize_t) a->mv_size - (ssize_t) b->mv_size; 303 if (len_diff > 0) { 304 len = b->mv_size; 305 len_diff = 1; 306 } 307 308 diff = memcmp(a->mv_data, b->mv_data, len); 309 return diff ? diff : len_diff<0 ? -1 : len_diff; 310 } 311 312 static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b) 313 { 314 struct cursor ca, cb; 315 int sa, sb; 316 int nid_a, nid_b; 317 MDB_val a2, b2; 318 319 make_cursor(a->mv_data, a->mv_data + a->mv_size, &ca); 320 make_cursor(b->mv_data, b->mv_data + b->mv_size, &cb); 321 322 // note_id 323 if (unlikely(!pull_varint(&ca, &nid_a) || !pull_varint(&cb, &nid_b))) 324 return 0; 325 326 // string size 327 if (unlikely(!pull_varint(&ca, &sa) || !pull_varint(&cb, &sb))) 328 return 0; 329 330 a2.mv_data = ca.p; 331 a2.mv_size = sa; 332 333 b2.mv_data = cb.p; 334 b2.mv_size = sb; 335 336 int cmp = mdb_cmp_memn(&a2, &b2); 337 if (cmp) return cmp; 338 339 // skip over string 340 ca.p += sa; 341 cb.p += sb; 342 343 // timestamp 344 if (unlikely(!pull_varint(&ca, &sa) || !pull_varint(&cb, &sb))) 345 return 0; 346 347 if (sa < sb) return -1; 348 else if (sa > sb) return 1; 349 350 // note_id 351 if (nid_a < nid_b) return -1; 352 else if (nid_a > nid_b) return 1; 353 354 // word index 355 if (unlikely(!pull_varint(&ca, &sa) || !pull_varint(&cb, &sb))) 356 return 0; 357 358 if (sa < sb) return -1; 359 else if (sa > sb) return 1; 360 361 return 0; 362 } 363 364 static inline int ndb_unpack_text_search_key_noteid( 365 struct cursor *cur, uint64_t *note_id) 366 { 367 int inote_id; 368 if (!pull_varint(cur, &inote_id)) 369 return 0; 370 371 *note_id = inote_id; 372 return 1; 373 } 374 375 // faster peek of just the string instead of unpacking everything 376 // this is used to quickly discard range query matches if there is no 377 // common prefix 378 static inline int ndb_unpack_text_search_key_string(struct cursor *cur, 379 const char **str, 380 int *str_len) 381 { 382 if (!pull_varint(cur, str_len)) 383 return 0; 384 385 *str = (const char *)cur->p; 386 387 if (!cursor_skip(cur, *str_len)) 388 return 0; 389 390 return 1; 391 } 392 393 // should be called after ndb_unpack_text_search_key_string. It continues 394 // the unpacking of a text search key if we've already started it. 395 static inline int 396 ndb_unpack_remaining_text_search_key(struct cursor *cur, 397 struct ndb_text_search_key *key) 398 { 399 int timestamp; 400 401 if (!pull_varint(cur, ×tamp)) 402 return 0; 403 404 if (!pull_varint(cur, &key->word_index)) 405 return 0; 406 407 key->timestamp = timestamp; 408 409 return 1; 410 } 411 412 // unpack a fulltext search key 413 // 414 // full version of string + unpack remaining. This is split up because text 415 // searching only requires to pull the string for prefix searching, and the 416 // remaining is optional 417 static inline int ndb_unpack_text_search_key(unsigned char *p, int len, 418 struct ndb_text_search_key *key) 419 { 420 struct cursor c; 421 make_cursor(p, p + len, &c); 422 423 if (!ndb_unpack_text_search_key_noteid(&c, &key->note_id)) 424 return 0; 425 426 if (!ndb_unpack_text_search_key_string(&c, &key->str, &key->str_len)) 427 return 0; 428 429 return ndb_unpack_remaining_text_search_key(&c, key); 430 } 431 432 // Copies only lowercase characters to the destination string and fills the rest with null bytes. 433 // `dst` and `src` are pointers to the destination and source strings, respectively. 434 // `n` is the maximum number of characters to copy. 435 static void lowercase_strncpy(char *dst, const char *src, int n) { 436 int j = 0, i = 0; 437 438 if (!dst || !src || n == 0) { 439 return; 440 } 441 442 while (src[i] != '\0' && j < n) { 443 dst[j++] = tolower(src[i++]); 444 } 445 446 // Null-terminate and fill the destination string 447 while (j < n) { 448 dst[j++] = '\0'; 449 } 450 } 451 452 int ndb_filter_init(struct ndb_filter *filter) 453 { 454 struct cursor cur; 455 int page_size, elem_pages, data_pages, buf_size; 456 457 page_size = 4096; // assuming this, not a big deal if we're wrong 458 elem_pages = NDB_FILTER_PAGES / 4; 459 data_pages = NDB_FILTER_PAGES - elem_pages; 460 buf_size = page_size * NDB_FILTER_PAGES; 461 462 unsigned char *buf = malloc(buf_size); 463 if (!buf) 464 return 0; 465 466 // init memory arena for the cursor 467 make_cursor(buf, buf + buf_size, &cur); 468 469 cursor_slice(&cur, &filter->elem_buf, page_size * elem_pages); 470 cursor_slice(&cur, &filter->data_buf, page_size * data_pages); 471 472 // make sure we are fully allocated 473 assert(cur.p == cur.end); 474 475 // make sure elem_buf is the start of the buffer 476 assert(filter->elem_buf.start == cur.start); 477 478 filter->num_elements = 0; 479 filter->elements[0] = (struct ndb_filter_elements*) buf; 480 filter->current = NULL; 481 482 return 1; 483 } 484 485 void ndb_filter_reset(struct ndb_filter *filter) 486 { 487 filter->num_elements = 0; 488 filter->elem_buf.p = filter->elem_buf.start; 489 filter->data_buf.p = filter->data_buf.start; 490 filter->current = NULL; 491 } 492 493 void ndb_filter_free(struct ndb_filter *filter) 494 { 495 if (filter->elem_buf.start) 496 free(filter->elem_buf.start); 497 498 memset(filter, 0, sizeof(*filter)); 499 } 500 501 static const char *ndb_filter_field_name(enum ndb_filter_fieldtype field) 502 { 503 switch (field) { 504 case NDB_FILTER_IDS: return "ids"; 505 case NDB_FILTER_AUTHORS: return "authors"; 506 case NDB_FILTER_KINDS: return "kinds"; 507 case NDB_FILTER_GENERIC: return "generic"; 508 case NDB_FILTER_SINCE: return "since"; 509 case NDB_FILTER_UNTIL: return "until"; 510 case NDB_FILTER_LIMIT: return "limit"; 511 } 512 513 return "unknown"; 514 } 515 516 static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filter_fieldtype field, char generic) 517 { 518 int i; 519 struct ndb_filter_elements *els, *el; 520 521 if (filter->current) { 522 fprintf(stderr, "ndb_filter_start_field: filter field already in progress, did you forget to call ndb_filter_end_field?\n"); 523 return 0; 524 } 525 526 // you can only start and end fields once 527 for (i = 0; i < filter->num_elements; i++) { 528 el = filter->elements[i]; 529 if (el->field.type == field) { 530 fprintf(stderr, "ndb_filter_start_field: field '%s' already exists\n", 531 ndb_filter_field_name(field)); 532 return 0; 533 } 534 } 535 536 els = (struct ndb_filter_elements *) filter->elem_buf.p ; 537 filter->current = els; 538 539 // advance elem buffer to the variable data section 540 if (!cursor_skip(&filter->elem_buf, sizeof(struct ndb_filter_elements))) { 541 fprintf(stderr, "ndb_filter_start_field: '%s' oom (todo: realloc?)\n", 542 ndb_filter_field_name(field)); 543 return 0; 544 } 545 546 els->field.type = field; 547 els->field.generic = generic; 548 els->field.elem_type = 0; 549 els->count = 0; 550 551 return 1; 552 } 553 554 int ndb_filter_start_field(struct ndb_filter *filter, enum ndb_filter_fieldtype field) 555 { 556 return ndb_filter_start_field_impl(filter, field, 0); 557 } 558 559 int ndb_filter_start_generic_field(struct ndb_filter *filter, char tag) 560 { 561 return ndb_filter_start_field_impl(filter, NDB_FILTER_GENERIC, tag); 562 } 563 564 static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_element el) 565 { 566 unsigned char *data; 567 const char *str; 568 569 if (!filter->current) 570 return 0; 571 572 data = filter->data_buf.p; 573 574 switch (filter->current->field.type) { 575 case NDB_FILTER_IDS: 576 case NDB_FILTER_AUTHORS: 577 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 578 return 0; 579 el.id = data; 580 break; 581 case NDB_FILTER_KINDS: 582 break; 583 case NDB_FILTER_SINCE: 584 case NDB_FILTER_UNTIL: 585 case NDB_FILTER_LIMIT: 586 // only one allowed for since/until 587 if (filter->current->count != 0) 588 return 0; 589 break; 590 case NDB_FILTER_GENERIC: 591 str = (const char *)filter->data_buf.p; 592 if (!cursor_push_c_str(&filter->data_buf, el.string)) 593 return 0; 594 // push a pointer of the string in the databuf as an element 595 el.string = str; 596 break; 597 } 598 599 if (!cursor_push(&filter->elem_buf, (unsigned char*)&el, sizeof(el))) 600 return 0; 601 602 filter->current->count++; 603 604 return 1; 605 } 606 607 static int ndb_filter_set_elem_type(struct ndb_filter *filter, 608 enum ndb_generic_element_type elem_type) 609 { 610 enum ndb_generic_element_type current_elem_type; 611 612 if (!filter->current) 613 return 0; 614 615 current_elem_type = filter->current->field.elem_type; 616 617 // element types must be uniform 618 if (current_elem_type != elem_type && current_elem_type != NDB_ELEMENT_UNKNOWN) { 619 fprintf(stderr, "ndb_filter_set_elem_type: element types must be uniform\n"); 620 return 0; 621 } 622 623 filter->current->field.elem_type = elem_type; 624 625 return 1; 626 } 627 628 int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str) 629 { 630 union ndb_filter_element el; 631 632 if (!filter->current) 633 return 0; 634 635 // only generic queries are allowed to have strings 636 switch (filter->current->field.type) { 637 case NDB_FILTER_SINCE: 638 case NDB_FILTER_UNTIL: 639 case NDB_FILTER_LIMIT: 640 case NDB_FILTER_IDS: 641 case NDB_FILTER_AUTHORS: 642 case NDB_FILTER_KINDS: 643 return 0; 644 case NDB_FILTER_GENERIC: 645 break; 646 } 647 648 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_STRING)) 649 return 0; 650 651 el.string = str; 652 return ndb_filter_add_element(filter, el); 653 } 654 655 int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer) 656 { 657 union ndb_filter_element el; 658 if (!filter->current) 659 return 0; 660 661 switch (filter->current->field.type) { 662 case NDB_FILTER_IDS: 663 case NDB_FILTER_AUTHORS: 664 case NDB_FILTER_GENERIC: 665 return 0; 666 case NDB_FILTER_KINDS: 667 case NDB_FILTER_SINCE: 668 case NDB_FILTER_UNTIL: 669 case NDB_FILTER_LIMIT: 670 break; 671 } 672 673 el.integer = integer; 674 675 return ndb_filter_add_element(filter, el); 676 } 677 678 int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id) 679 { 680 union ndb_filter_element el; 681 682 if (!filter->current) 683 return 0; 684 685 // only certain filter types allow pushing id elements 686 switch (filter->current->field.type) { 687 case NDB_FILTER_SINCE: 688 case NDB_FILTER_UNTIL: 689 case NDB_FILTER_LIMIT: 690 return 0; 691 case NDB_FILTER_IDS: 692 case NDB_FILTER_AUTHORS: 693 case NDB_FILTER_KINDS: 694 case NDB_FILTER_GENERIC: 695 break; 696 } 697 698 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_ID)) 699 return 0; 700 701 // this is needed so that generic filters know its an id 702 el.id = id; 703 704 return ndb_filter_add_element(filter, el); 705 } 706 707 // TODO: build a hashtable so this is O(1) 708 static int ndb_generic_filter_matches(struct ndb_filter_elements *els, 709 struct ndb_note *note) 710 { 711 int i; 712 union ndb_filter_element el; 713 struct ndb_iterator iter, *it = &iter; 714 struct ndb_str str; 715 716 ndb_tags_iterate_start(note, it); 717 718 while (ndb_tags_iterate_next(it)) { 719 // we're looking for tags with 2 or more entries: ["p", id], etc 720 if (it->tag->count < 2) 721 continue; 722 723 str = ndb_tag_str(note, it->tag, 0); 724 725 // we only care about packed strings (single char, etc) 726 if (str.flag != NDB_PACKED_STR) 727 continue; 728 729 // do we have #e matching e (or p, etc) 730 if (str.str[0] != els->field.generic || str.str[1] != 0) 731 continue; 732 733 str = ndb_tag_str(note, it->tag, 1); 734 735 switch (els->field.elem_type) { 736 case NDB_ELEMENT_ID: 737 // if our filter element type is an id, then we 738 // expect a packed id in the tag, otherwise skip 739 if (str.flag != NDB_PACKED_ID) 740 continue; 741 break; 742 case NDB_ELEMENT_STRING: 743 // if our filter element type is a string, then 744 // we should not expect an id 745 if (str.flag == NDB_PACKED_ID) 746 continue; 747 break; 748 case NDB_ELEMENT_UNKNOWN: 749 default: 750 // For some reason the element type is not set. It's 751 // possible nothing was added to the generic filter? 752 // Let's just fail here and log a note for debugging 753 fprintf(stderr, "UNUSUAL ndb_generic_filter_matches: have unknown element type %d\n", els->field.elem_type); 754 return 0; 755 } 756 757 for (i = 0; i < els->count; i++) { 758 el = els->elements[i]; 759 switch (els->field.elem_type) { 760 case NDB_ELEMENT_ID: 761 if (!memcmp(el.id, str.id, 32)) 762 return 1; 763 break; 764 case NDB_ELEMENT_STRING: 765 if (!strcmp(el.string, str.str)) 766 return 1; 767 break; 768 case NDB_ELEMENT_UNKNOWN: 769 return 0; 770 } 771 } 772 } 773 774 return 0; 775 } 776 777 // returns 1 if a filter matches a note 778 int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note) 779 { 780 int i, j; 781 struct ndb_filter_elements *els; 782 783 for (i = 0; i < filter->num_elements; i++) { 784 els = filter->elements[i]; 785 786 switch (els->field.type) { 787 case NDB_FILTER_KINDS: 788 for (j = 0; j < els->count; j++) { 789 if ((unsigned int)els->elements[j].integer == note->kind) 790 goto cont; 791 } 792 break; 793 // TODO: add filter hashtable for large id lists 794 case NDB_FILTER_IDS: 795 for (j = 0; j < els->count; j++) { 796 if (!memcmp(els->elements[j].id, note->id, 32)) 797 goto cont; 798 } 799 break; 800 case NDB_FILTER_AUTHORS: 801 for (j = 0; j < els->count; j++) { 802 if (!memcmp(els->elements[j].id, note->pubkey, 32)) 803 goto cont; 804 } 805 break; 806 case NDB_FILTER_GENERIC: 807 if (ndb_generic_filter_matches(els, note)) 808 continue; 809 break; 810 case NDB_FILTER_SINCE: 811 assert(els->count == 1); 812 if (note->created_at >= els->elements[0].integer) 813 continue; 814 break; 815 case NDB_FILTER_UNTIL: 816 assert(els->count == 1); 817 if (note->created_at < els->elements[0].integer) 818 continue; 819 case NDB_FILTER_LIMIT: 820 cont: 821 continue; 822 } 823 824 // all need to match 825 return 0; 826 } 827 828 return 1; 829 } 830 831 void ndb_filter_end_field(struct ndb_filter *filter) 832 { 833 filter->elements[filter->num_elements++] = filter->current; 834 filter->current = NULL; 835 } 836 837 static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id, 838 uint64_t timestamp, const char *search) 839 { 840 memcpy(key->id, id, 32); 841 key->timestamp = timestamp; 842 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 843 key->search[sizeof(key->search) - 1] = '\0'; 844 } 845 846 static int ndb_write_profile_search_index(struct ndb_txn *txn, 847 struct ndb_search_key *index_key, 848 uint64_t profile_key) 849 { 850 int rc; 851 MDB_val key, val; 852 853 key.mv_data = index_key; 854 key.mv_size = sizeof(*index_key); 855 val.mv_data = &profile_key; 856 val.mv_size = sizeof(profile_key); 857 858 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 859 &key, &val, 0))) 860 { 861 ndb_debug("ndb_write_profile_search_index failed: %s\n", 862 mdb_strerror(rc)); 863 return 0; 864 } 865 866 return 1; 867 } 868 869 870 // map usernames and display names to profile keys for user searching 871 static int ndb_write_profile_search_indices(struct ndb_txn *txn, 872 struct ndb_note *note, 873 uint64_t profile_key, 874 void *profile_root) 875 { 876 struct ndb_search_key index; 877 NdbProfileRecord_table_t profile_record; 878 NdbProfile_table_t profile; 879 880 profile_record = NdbProfileRecord_as_root(profile_root); 881 profile = NdbProfileRecord_profile_get(profile_record); 882 883 const char *name = NdbProfile_name_get(profile); 884 const char *display_name = NdbProfile_display_name_get(profile); 885 886 // words + pubkey + created 887 if (name) { 888 ndb_make_search_key(&index, note->pubkey, note->created_at, 889 name); 890 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 891 return 0; 892 } 893 894 if (display_name) { 895 // don't write the same name/display_name twice 896 if (name && !strcmp(display_name, name)) { 897 return 1; 898 } 899 ndb_make_search_key(&index, note->pubkey, note->created_at, 900 display_name); 901 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 902 return 0; 903 } 904 905 return 1; 906 } 907 908 909 static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags) 910 { 911 txn->lmdb = &ndb->lmdb; 912 MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn; 913 if (!txn->lmdb->env) 914 return 0; 915 return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0; 916 } 917 918 int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn) 919 { 920 return _ndb_begin_query(ndb, txn, MDB_RDONLY); 921 } 922 923 // this should only be used in migrations, etc 924 static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn) 925 { 926 return _ndb_begin_query(ndb, txn, 0); 927 } 928 929 930 // Migrations 931 // 932 933 static int ndb_migrate_user_search_indices(struct ndb *ndb) 934 { 935 int rc; 936 MDB_cursor *cur; 937 MDB_val k, v; 938 void *profile_root; 939 NdbProfileRecord_table_t record; 940 struct ndb_txn txn; 941 struct ndb_note *note; 942 uint64_t note_key, profile_key; 943 size_t len; 944 int count; 945 946 if (!ndb_begin_rw_query(ndb, &txn)) { 947 fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_rw_query failed\n"); 948 return 0; 949 } 950 951 if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) { 952 fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); 953 return 0; 954 } 955 956 count = 0; 957 958 // loop through all profiles and write search indices 959 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 960 profile_root = v.mv_data; 961 profile_key = *((uint64_t*)k.mv_data); 962 record = NdbProfileRecord_as_root(profile_root); 963 note_key = NdbProfileRecord_note_key(record); 964 note = ndb_get_note_by_key(&txn, note_key, &len); 965 966 if (note == NULL) { 967 fprintf(stderr, "ndb_migrate_user_search_indices: note lookup failed\n"); 968 return 0; 969 } 970 971 if (!ndb_write_profile_search_indices(&txn, note, profile_key, 972 profile_root)) { 973 974 fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n"); 975 return 0; 976 } 977 978 count++; 979 } 980 981 fprintf(stderr, "migrated %d profiles to include search indices\n", count); 982 983 mdb_cursor_close(cur); 984 985 ndb_end_query(&txn); 986 987 return 1; 988 } 989 990 static int ndb_migrate_lower_user_search_indices(struct ndb *ndb) 991 { 992 MDB_txn *txn; 993 994 if (mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn)) { 995 fprintf(stderr, "ndb_migrate_lower_user_search_indices: ndb_txn_begin failed\n"); 996 return 0; 997 } 998 999 // just drop the search db so we can rebuild it 1000 if (mdb_drop(txn, ndb->lmdb.dbs[NDB_DB_PROFILE_SEARCH], 0)) { 1001 fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n"); 1002 return 0; 1003 } 1004 1005 mdb_txn_commit(txn); 1006 1007 return ndb_migrate_user_search_indices(ndb); 1008 } 1009 1010 int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile); 1011 1012 1013 int ndb_db_version(struct ndb *ndb) 1014 { 1015 int rc; 1016 uint64_t version, version_key; 1017 MDB_val k, v; 1018 MDB_txn *txn; 1019 1020 version_key = NDB_META_KEY_VERSION; 1021 k.mv_data = &version_key; 1022 k.mv_size = sizeof(version_key); 1023 1024 if ((rc = mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn))) { 1025 fprintf(stderr, "ndb_db_version: mdb_txn_begin failed, error %d\n", rc); 1026 return -1; 1027 } 1028 1029 if (mdb_get(txn, ndb->lmdb.dbs[NDB_DB_NDB_META], &k, &v)) { 1030 version = -1; 1031 } else { 1032 if (v.mv_size != 8) { 1033 fprintf(stderr, "run_migrations: invalid version size?"); 1034 return 0; 1035 } 1036 version = *((uint64_t*)v.mv_data); 1037 } 1038 1039 mdb_txn_abort(txn); 1040 return version; 1041 } 1042 1043 // custom kind+timestamp comparison function. This is used by lmdb to perform 1044 // b+ tree searches over the kind+timestamp index 1045 static int ndb_u64_tsid_compare(const MDB_val *a, const MDB_val *b) 1046 { 1047 struct ndb_u64_tsid *tsa, *tsb; 1048 tsa = a->mv_data; 1049 tsb = b->mv_data; 1050 1051 if (tsa->u64 < tsb->u64) 1052 return -1; 1053 else if (tsa->u64 > tsb->u64) 1054 return 1; 1055 1056 if (tsa->timestamp < tsb->timestamp) 1057 return -1; 1058 else if (tsa->timestamp > tsb->timestamp) 1059 return 1; 1060 1061 return 0; 1062 } 1063 1064 static int ndb_tsid_compare(const MDB_val *a, const MDB_val *b) 1065 { 1066 struct ndb_tsid *tsa, *tsb; 1067 MDB_val a2 = *a, b2 = *b; 1068 1069 a2.mv_size = sizeof(tsa->id); 1070 b2.mv_size = sizeof(tsb->id); 1071 1072 int cmp = mdb_cmp_memn(&a2, &b2); 1073 if (cmp) return cmp; 1074 1075 tsa = a->mv_data; 1076 tsb = b->mv_data; 1077 1078 if (tsa->timestamp < tsb->timestamp) 1079 return -1; 1080 else if (tsa->timestamp > tsb->timestamp) 1081 return 1; 1082 return 0; 1083 } 1084 1085 static inline void ndb_tsid_low(struct ndb_tsid *key, unsigned char *id) 1086 { 1087 memcpy(key->id, id, 32); 1088 key->timestamp = 0; 1089 } 1090 1091 static inline void ndb_tsid_init(struct ndb_tsid *key, unsigned char *id, 1092 uint64_t timestamp) 1093 { 1094 memcpy(key->id, id, 32); 1095 key->timestamp = timestamp; 1096 } 1097 1098 static inline void ndb_u64_tsid_init(struct ndb_u64_tsid *key, uint64_t integer, 1099 uint64_t timestamp) 1100 { 1101 key->u64 = integer; 1102 key->timestamp = timestamp; 1103 } 1104 1105 // useful for range-searching for the latest key with a clustered created_at timen 1106 static inline void ndb_tsid_high(struct ndb_tsid *key, const unsigned char *id) 1107 { 1108 memcpy(key->id, id, 32); 1109 key->timestamp = UINT64_MAX; 1110 } 1111 1112 enum ndb_ingester_msgtype { 1113 NDB_INGEST_EVENT, // write json to the ingester queue for processing 1114 NDB_INGEST_QUIT, // kill ingester thread immediately 1115 }; 1116 1117 struct ndb_ingester_event { 1118 char *json; 1119 unsigned client : 1; // ["EVENT", {...}] messages 1120 unsigned len : 31; 1121 }; 1122 1123 struct ndb_writer_note { 1124 struct ndb_note *note; 1125 size_t note_len; 1126 }; 1127 1128 struct ndb_writer_profile { 1129 struct ndb_writer_note note; 1130 struct ndb_profile_record_builder record; 1131 }; 1132 1133 struct ndb_ingester_msg { 1134 enum ndb_ingester_msgtype type; 1135 union { 1136 struct ndb_ingester_event event; 1137 }; 1138 }; 1139 1140 struct ndb_writer_ndb_meta { 1141 // these are 64 bit because I'm paranoid of db-wide alignment issues 1142 uint64_t version; 1143 }; 1144 1145 // Used in the writer thread when writing ndb_profile_fetch_record's 1146 // kv = pubkey: recor 1147 struct ndb_writer_last_fetch { 1148 unsigned char pubkey[32]; 1149 uint64_t fetched_at; 1150 }; 1151 1152 // The different types of messages that the writer thread can write to the 1153 // database 1154 struct ndb_writer_msg { 1155 enum ndb_writer_msgtype type; 1156 union { 1157 struct ndb_writer_note note; 1158 struct ndb_writer_profile profile; 1159 struct ndb_writer_ndb_meta ndb_meta; 1160 struct ndb_writer_last_fetch last_fetch; 1161 }; 1162 }; 1163 1164 static inline int ndb_writer_queue_msg(struct ndb_writer *writer, 1165 struct ndb_writer_msg *msg) 1166 { 1167 return prot_queue_push(&writer->inbox, msg); 1168 } 1169 1170 static int ndb_migrate_utf8_profile_names(struct ndb *ndb) 1171 { 1172 int rc; 1173 MDB_cursor *cur; 1174 MDB_val k, v; 1175 void *profile_root; 1176 NdbProfileRecord_table_t record; 1177 struct ndb_txn txn; 1178 struct ndb_note *note, *copied_note; 1179 uint64_t note_key; 1180 size_t len; 1181 int count, failed; 1182 struct ndb_writer_msg out; 1183 1184 if (!ndb_begin_rw_query(ndb, &txn)) { 1185 fprintf(stderr, "ndb_migrate_utf8_profile_names: ndb_begin_rw_query failed\n"); 1186 return 0; 1187 } 1188 1189 if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) { 1190 fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc); 1191 return 0; 1192 } 1193 1194 count = 0; 1195 failed = 0; 1196 1197 // loop through all profiles and write search indices 1198 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1199 profile_root = v.mv_data; 1200 record = NdbProfileRecord_as_root(profile_root); 1201 note_key = NdbProfileRecord_note_key(record); 1202 note = ndb_get_note_by_key(&txn, note_key, &len); 1203 1204 if (note == NULL) { 1205 fprintf(stderr, "ndb_migrate_utf8_profile_names: note lookup failed\n"); 1206 return 0; 1207 } 1208 1209 struct ndb_profile_record_builder *b = &out.profile.record; 1210 1211 // reprocess profile 1212 if (!ndb_process_profile_note(note, b)) { 1213 failed++; 1214 continue; 1215 } 1216 1217 // the writer needs to own this note, and its expected to free it 1218 copied_note = malloc(len); 1219 memcpy(copied_note, note, len); 1220 1221 out.type = NDB_WRITER_PROFILE; 1222 out.profile.note.note = copied_note; 1223 out.profile.note.note_len = len; 1224 1225 ndb_writer_queue_msg(&ndb->writer, &out); 1226 1227 count++; 1228 } 1229 1230 fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count); 1231 1232 if (failed != 0) { 1233 fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed); 1234 } 1235 1236 mdb_cursor_close(cur); 1237 1238 ndb_end_query(&txn); 1239 1240 return 1; 1241 } 1242 1243 static struct ndb_migration MIGRATIONS[] = { 1244 { .fn = ndb_migrate_user_search_indices }, 1245 { .fn = ndb_migrate_lower_user_search_indices }, 1246 { .fn = ndb_migrate_utf8_profile_names } 1247 }; 1248 1249 1250 int ndb_end_query(struct ndb_txn *txn) 1251 { 1252 // this works on read or write queries. 1253 return mdb_txn_commit(txn->mdb_txn) == 0; 1254 } 1255 1256 int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32], 1257 unsigned char sig[64]) 1258 { 1259 secp256k1_xonly_pubkey xonly_pubkey; 1260 int ok; 1261 1262 ok = secp256k1_xonly_pubkey_parse((secp256k1_context*)ctx, &xonly_pubkey, 1263 pubkey) != 0; 1264 if (!ok) return 0; 1265 1266 ok = secp256k1_schnorrsig_verify((secp256k1_context*)ctx, sig, id, 32, 1267 &xonly_pubkey) > 0; 1268 if (!ok) return 0; 1269 1270 return 1; 1271 } 1272 1273 static inline int ndb_writer_queue_msgs(struct ndb_writer *writer, 1274 struct ndb_writer_msg *msgs, 1275 int num_msgs) 1276 { 1277 return prot_queue_push_all(&writer->inbox, msgs, num_msgs); 1278 } 1279 1280 static int ndb_writer_queue_note(struct ndb_writer *writer, 1281 struct ndb_note *note, size_t note_len) 1282 { 1283 struct ndb_writer_msg msg; 1284 msg.type = NDB_WRITER_NOTE; 1285 1286 msg.note.note = note; 1287 msg.note.note_len = note_len; 1288 1289 return prot_queue_push(&writer->inbox, &msg); 1290 } 1291 1292 static void ndb_writer_last_profile_fetch(struct ndb_txn *txn, 1293 const unsigned char *pubkey, 1294 uint64_t fetched_at) 1295 { 1296 int rc; 1297 MDB_val key, val; 1298 1299 key.mv_data = (unsigned char*)pubkey; 1300 key.mv_size = 32; 1301 val.mv_data = &fetched_at; 1302 val.mv_size = sizeof(fetched_at); 1303 1304 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], 1305 &key, &val, 0))) 1306 { 1307 ndb_debug("write version to ndb_meta failed: %s\n", 1308 mdb_strerror(rc)); 1309 return; 1310 } 1311 1312 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 1313 } 1314 1315 1316 // We just received a profile that we haven't processed yet, but it could 1317 // be an older one! Make sure we only write last fetched profile if it's a new 1318 // one 1319 // 1320 // To do this, we first check the latest profile in the database. If the 1321 // created_date for this profile note is newer, then we write a 1322 // last_profile_fetch record, otherwise we do not. 1323 // 1324 // WARNING: This function is only valid when called from the writer thread 1325 static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn, 1326 struct ndb_note *note) 1327 { 1328 size_t len; 1329 uint64_t profile_key, note_key; 1330 void *root; 1331 struct ndb_note *last_profile; 1332 NdbProfileRecord_table_t record; 1333 1334 if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) { 1335 record = NdbProfileRecord_as_root(root); 1336 note_key = NdbProfileRecord_note_key(record); 1337 last_profile = ndb_get_note_by_key(txn, note_key, &len); 1338 if (last_profile == NULL) { 1339 return 0; 1340 } 1341 1342 // found profile, let's see if it's newer than ours 1343 if (note->created_at > last_profile->created_at) { 1344 // this is a new profile note, record last fetched time 1345 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 1346 } 1347 } else { 1348 // couldn't fetch profile. record last fetched time 1349 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 1350 } 1351 1352 return 1; 1353 } 1354 1355 int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, 1356 uint64_t fetched_at) 1357 { 1358 struct ndb_writer_msg msg; 1359 msg.type = NDB_WRITER_PROFILE_LAST_FETCH; 1360 memcpy(&msg.last_fetch.pubkey[0], pubkey, 32); 1361 msg.last_fetch.fetched_at = fetched_at; 1362 1363 return ndb_writer_queue_msg(&ndb->writer, &msg); 1364 } 1365 1366 // get some value based on a clustered id key 1367 int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id, 1368 MDB_val *val) 1369 { 1370 MDB_val k, v; 1371 MDB_cursor *cur; 1372 int success = 0, rc; 1373 struct ndb_tsid tsid; 1374 1375 // position at the most recent 1376 ndb_tsid_high(&tsid, id); 1377 1378 k.mv_data = &tsid; 1379 k.mv_size = sizeof(tsid); 1380 1381 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur))) { 1382 ndb_debug("ndb_get_tsid: failed to open cursor: '%s'\n", mdb_strerror(rc)); 1383 return 0; 1384 } 1385 1386 // Position cursor at the next key greater than or equal to the specified key 1387 if (mdb_cursor_get(cur, &k, &v, MDB_SET_RANGE)) { 1388 // Failed :(. It could be the last element? 1389 if (mdb_cursor_get(cur, &k, &v, MDB_LAST)) 1390 goto cleanup; 1391 } else { 1392 // if set range worked and our key exists, it should be 1393 // the one right before this one 1394 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 1395 goto cleanup; 1396 } 1397 1398 if (memcmp(k.mv_data, id, 32) == 0) { 1399 *val = v; 1400 success = 1; 1401 } 1402 1403 cleanup: 1404 mdb_cursor_close(cur); 1405 return success; 1406 } 1407 1408 static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key, 1409 enum ndb_dbs store, size_t *len) 1410 { 1411 MDB_val k, v; 1412 1413 k.mv_data = &key; 1414 k.mv_size = sizeof(key); 1415 1416 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 1417 ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n"); 1418 return NULL; 1419 } 1420 1421 if (len) 1422 *len = v.mv_size; 1423 1424 return v.mv_data; 1425 } 1426 1427 static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind, 1428 enum ndb_dbs store, const unsigned char *pk, 1429 size_t *len, uint64_t *primkey) 1430 { 1431 MDB_val k, v; 1432 void *res = NULL; 1433 if (len) 1434 *len = 0; 1435 1436 if (!ndb_get_tsid(txn, ind, pk, &k)) { 1437 //ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n"); 1438 return 0; 1439 } 1440 1441 if (primkey) 1442 *primkey = *(uint64_t*)k.mv_data; 1443 1444 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 1445 ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n"); 1446 return 0; 1447 } 1448 1449 res = v.mv_data; 1450 assert(((uint64_t)res % 4) == 0); 1451 if (len) 1452 *len = v.mv_size; 1453 return res; 1454 } 1455 1456 void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pk, size_t *len, uint64_t *key) 1457 { 1458 return ndb_lookup_tsid(txn, NDB_DB_PROFILE_PK, NDB_DB_PROFILE, pk, len, key); 1459 } 1460 1461 struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *key) 1462 { 1463 return ndb_lookup_tsid(txn, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len, key); 1464 } 1465 1466 static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn, 1467 enum ndb_dbs db, 1468 const unsigned char *id) 1469 { 1470 MDB_val k; 1471 1472 if (!ndb_get_tsid(txn, db, id, &k)) 1473 return 0; 1474 1475 return *(uint32_t*)k.mv_data; 1476 } 1477 1478 uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id) 1479 { 1480 return ndb_get_indexkey_by_id(txn, NDB_DB_NOTE_ID, id); 1481 } 1482 1483 uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id) 1484 { 1485 return ndb_get_indexkey_by_id(txn, NDB_DB_PROFILE_PK, id); 1486 } 1487 1488 struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 1489 { 1490 return ndb_lookup_by_key(txn, key, NDB_DB_NOTE, len); 1491 } 1492 1493 void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 1494 { 1495 return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len); 1496 } 1497 1498 uint64_t 1499 ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey) 1500 { 1501 MDB_val k, v; 1502 1503 k.mv_data = (unsigned char*)pubkey; 1504 k.mv_size = 32; 1505 1506 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) { 1507 //ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n"); 1508 return 0; 1509 } 1510 1511 return *((uint64_t*)v.mv_data); 1512 } 1513 1514 1515 static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id) 1516 { 1517 MDB_val val; 1518 1519 if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val)) 1520 return 0; 1521 1522 return 1; 1523 } 1524 1525 static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb, 1526 MDB_txn *mdb_txn) 1527 { 1528 txn->lmdb = lmdb; 1529 txn->mdb_txn = mdb_txn; 1530 } 1531 1532 static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid) 1533 { 1534 unsigned char id[32]; 1535 struct ndb_ingest_controller *c = data; 1536 struct ndb_txn txn; 1537 1538 hex_decode(hexid, 64, id, sizeof(id)); 1539 1540 // let's see if we already have it 1541 1542 ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn); 1543 if (!ndb_has_note(&txn, id)) 1544 return NDB_IDRES_CONT; 1545 1546 return NDB_IDRES_STOP; 1547 } 1548 1549 static int ndbprofile_parse_json(flatcc_builder_t *B, 1550 const char *buf, size_t bufsiz, int flags, NdbProfile_ref_t *profile) 1551 { 1552 flatcc_json_parser_t parser, *ctx = &parser; 1553 flatcc_json_parser_init(ctx, B, buf, buf + bufsiz, flags); 1554 1555 if (flatcc_builder_start_buffer(B, 0, 0, 0)) 1556 return 0; 1557 1558 NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile); 1559 if (ctx->error) 1560 return 0; 1561 1562 if (!flatcc_builder_end_buffer(B, *profile)) 1563 return 0; 1564 1565 ctx->end_loc = buf; 1566 1567 1568 return 1; 1569 } 1570 1571 void ndb_profile_record_builder_init(struct ndb_profile_record_builder *b) 1572 { 1573 b->builder = malloc(sizeof(*b->builder)); 1574 b->flatbuf = NULL; 1575 } 1576 1577 void ndb_profile_record_builder_free(struct ndb_profile_record_builder *b) 1578 { 1579 if (b->builder) 1580 free(b->builder); 1581 if (b->flatbuf) 1582 free(b->flatbuf); 1583 1584 b->builder = NULL; 1585 b->flatbuf = NULL; 1586 } 1587 1588 int ndb_process_profile_note(struct ndb_note *note, 1589 struct ndb_profile_record_builder *profile) 1590 { 1591 int res; 1592 1593 NdbProfile_ref_t profile_table; 1594 flatcc_builder_t *builder; 1595 1596 ndb_profile_record_builder_init(profile); 1597 builder = profile->builder; 1598 flatcc_builder_init(builder); 1599 1600 NdbProfileRecord_start_as_root(builder); 1601 1602 //printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note)); 1603 if (!(res = ndbprofile_parse_json(builder, ndb_note_content(note), 1604 note->content_length, 1605 flatcc_json_parser_f_skip_unknown, 1606 &profile_table))) 1607 { 1608 ndb_debug("profile_parse_json failed %d '%.*s'\n", res, 1609 note->content_length, ndb_note_content(note)); 1610 ndb_profile_record_builder_free(profile); 1611 return 0; 1612 } 1613 1614 uint64_t received_at = time(NULL); 1615 const char *lnurl = "fixme"; 1616 1617 NdbProfileRecord_profile_add(builder, profile_table); 1618 NdbProfileRecord_received_at_add(builder, received_at); 1619 1620 flatcc_builder_ref_t lnurl_off; 1621 lnurl_off = flatcc_builder_create_string_str(builder, lnurl); 1622 1623 NdbProfileRecord_lnurl_add(builder, lnurl_off); 1624 1625 //*profile = flatcc_builder_finalize_aligned_buffer(builder, profile_len); 1626 return 1; 1627 } 1628 1629 static int ndb_ingester_process_note(secp256k1_context *ctx, 1630 struct ndb_note *note, 1631 size_t note_size, 1632 struct ndb_writer_msg *out, 1633 struct ndb_ingester *ingester) 1634 { 1635 enum ndb_ingest_filter_action action; 1636 action = NDB_INGEST_ACCEPT; 1637 1638 if (ingester->filter) 1639 action = ingester->filter(ingester->filter_context, note); 1640 1641 if (action == NDB_INGEST_REJECT) 1642 return 0; 1643 1644 // some special situations we might want to skip sig validation, 1645 // like during large imports 1646 if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) { 1647 // if we're skipping validation we don't need to verify 1648 } else { 1649 // verify! If it's an invalid note we don't need to 1650 // bother writing it to the database 1651 if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) { 1652 ndb_debug("signature verification failed\n"); 1653 return 0; 1654 } 1655 } 1656 1657 // we didn't find anything. let's send it 1658 // to the writer thread 1659 note = realloc(note, note_size); 1660 assert(((uint64_t)note % 4) == 0); 1661 1662 if (note->kind == 0) { 1663 struct ndb_profile_record_builder *b = 1664 &out->profile.record; 1665 1666 ndb_process_profile_note(note, b); 1667 1668 out->type = NDB_WRITER_PROFILE; 1669 out->profile.note.note = note; 1670 out->profile.note.note_len = note_size; 1671 } else { 1672 out->type = NDB_WRITER_NOTE; 1673 out->note.note = note; 1674 out->note.note_len = note_size; 1675 } 1676 1677 return 1; 1678 } 1679 1680 1681 static int ndb_ingester_process_event(secp256k1_context *ctx, 1682 struct ndb_ingester *ingester, 1683 struct ndb_ingester_event *ev, 1684 struct ndb_writer_msg *out, 1685 MDB_txn *read_txn 1686 ) 1687 { 1688 struct ndb_tce tce; 1689 struct ndb_fce fce; 1690 struct ndb_note *note; 1691 struct ndb_ingest_controller controller; 1692 struct ndb_id_cb cb; 1693 void *buf; 1694 int ok; 1695 size_t bufsize, note_size; 1696 1697 ok = 0; 1698 1699 // we will use this to check if we already have it in the DB during 1700 // ID parsing 1701 controller.read_txn = read_txn; 1702 controller.lmdb = ingester->writer->lmdb; 1703 cb.fn = ndb_ingester_json_controller; 1704 cb.data = &controller; 1705 1706 // since we're going to be passing this allocated note to a different 1707 // thread, we can't use thread-local buffers. just allocate a block 1708 bufsize = max(ev->len * 8.0, 4096); 1709 buf = malloc(bufsize); 1710 if (!buf) { 1711 ndb_debug("couldn't malloc buf\n"); 1712 return 0; 1713 } 1714 1715 note_size = 1716 ev->client ? 1717 ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) : 1718 ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb); 1719 1720 if ((int)note_size == -42) { 1721 // we already have this! 1722 //ndb_debug("already have id??\n"); 1723 goto cleanup; 1724 } else if (note_size == 0) { 1725 ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json); 1726 goto cleanup; 1727 } 1728 1729 //ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json); 1730 1731 if (ev->client) { 1732 switch (fce.evtype) { 1733 case NDB_FCE_EVENT: 1734 note = fce.event.note; 1735 if (note != buf) { 1736 ndb_debug("note buffer not equal to malloc'd buffer\n"); 1737 goto cleanup; 1738 } 1739 1740 if (!ndb_ingester_process_note(ctx, note, note_size, 1741 out, ingester)) { 1742 goto cleanup; 1743 } else { 1744 // we're done with the original json, free it 1745 free(ev->json); 1746 return 1; 1747 } 1748 } 1749 } else { 1750 switch (tce.evtype) { 1751 case NDB_TCE_NOTICE: goto cleanup; 1752 case NDB_TCE_EOSE: goto cleanup; 1753 case NDB_TCE_OK: goto cleanup; 1754 case NDB_TCE_EVENT: 1755 note = tce.event.note; 1756 if (note != buf) { 1757 ndb_debug("note buffer not equal to malloc'd buffer\n"); 1758 goto cleanup; 1759 } 1760 1761 if (!ndb_ingester_process_note(ctx, note, note_size, 1762 out, ingester)) { 1763 goto cleanup; 1764 } else { 1765 // we're done with the original json, free it 1766 free(ev->json); 1767 return 1; 1768 } 1769 } 1770 } 1771 1772 1773 cleanup: 1774 free(ev->json); 1775 free(buf); 1776 1777 return ok; 1778 } 1779 1780 static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db) 1781 { 1782 MDB_cursor *mc; 1783 MDB_val key, val; 1784 1785 if (mdb_cursor_open(txn, db, &mc)) 1786 return 0; 1787 1788 if (mdb_cursor_get(mc, &key, &val, MDB_LAST)) { 1789 mdb_cursor_close(mc); 1790 return 0; 1791 } 1792 1793 mdb_cursor_close(mc); 1794 1795 assert(key.mv_size == 8); 1796 return *((uint64_t*)key.mv_data); 1797 } 1798 1799 // 1800 // make a search key meant for user queries without any other note info 1801 static void ndb_make_search_key_low(struct ndb_search_key *key, const char *search) 1802 { 1803 memset(key->id, 0, sizeof(key->id)); 1804 key->timestamp = 0; 1805 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 1806 key->search[sizeof(key->search) - 1] = '\0'; 1807 } 1808 1809 int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query) 1810 { 1811 int rc; 1812 struct ndb_search_key s; 1813 MDB_val k, v; 1814 search->cursor = NULL; 1815 1816 MDB_cursor **cursor = (MDB_cursor **)&search->cursor; 1817 1818 ndb_make_search_key_low(&s, query); 1819 1820 k.mv_data = &s; 1821 k.mv_size = sizeof(s); 1822 1823 if ((rc = mdb_cursor_open(txn->mdb_txn, 1824 txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 1825 cursor))) { 1826 printf("search_profile: cursor opened failed: %s\n", 1827 mdb_strerror(rc)); 1828 return 0; 1829 } 1830 1831 // Position cursor at the next key greater than or equal to the specified key 1832 if (mdb_cursor_get(search->cursor, &k, &v, MDB_SET_RANGE)) { 1833 printf("search_profile: cursor get failed\n"); 1834 goto cleanup; 1835 } else { 1836 search->key = k.mv_data; 1837 assert(v.mv_size == 8); 1838 search->profile_key = *((uint64_t*)v.mv_data); 1839 return 1; 1840 } 1841 1842 cleanup: 1843 mdb_cursor_close(search->cursor); 1844 search->cursor = NULL; 1845 return 0; 1846 } 1847 1848 void ndb_search_profile_end(struct ndb_search *search) 1849 { 1850 if (search->cursor) 1851 mdb_cursor_close(search->cursor); 1852 } 1853 1854 int ndb_search_profile_next(struct ndb_search *search) 1855 { 1856 int rc; 1857 MDB_val k, v; 1858 unsigned char *init_id; 1859 1860 init_id = search->key->id; 1861 k.mv_data = search->key; 1862 k.mv_size = sizeof(*search->key); 1863 1864 retry: 1865 if ((rc = mdb_cursor_get(search->cursor, &k, &v, MDB_NEXT))) { 1866 ndb_debug("ndb_search_profile_next: %s\n", 1867 mdb_strerror(rc)); 1868 return 0; 1869 } else { 1870 search->key = k.mv_data; 1871 assert(v.mv_size == 8); 1872 search->profile_key = *((uint64_t*)v.mv_data); 1873 1874 // skip duplicate pubkeys 1875 if (!memcmp(init_id, search->key->id, 32)) 1876 goto retry; 1877 } 1878 1879 return 1; 1880 } 1881 1882 static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b) 1883 { 1884 int cmp; 1885 struct ndb_search_key *ska, *skb; 1886 1887 ska = a->mv_data; 1888 skb = b->mv_data; 1889 1890 MDB_val a2 = *a; 1891 MDB_val b2 = *b; 1892 1893 a2.mv_data = ska->search; 1894 a2.mv_size = sizeof(ska->search) + sizeof(ska->id); 1895 1896 cmp = mdb_cmp_memn(&a2, &b2); 1897 if (cmp) return cmp; 1898 1899 if (ska->timestamp < skb->timestamp) 1900 return -1; 1901 else if (ska->timestamp > skb->timestamp) 1902 return 1; 1903 1904 return 0; 1905 } 1906 1907 static int ndb_write_profile_pk_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key) 1908 1909 { 1910 MDB_val key, val; 1911 int rc; 1912 struct ndb_tsid tsid; 1913 MDB_dbi pk_db; 1914 1915 pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK]; 1916 1917 // write profile_pk + created_at index 1918 ndb_tsid_init(&tsid, note->pubkey, note->created_at); 1919 1920 key.mv_data = &tsid; 1921 key.mv_size = sizeof(tsid); 1922 val.mv_data = &profile_key; 1923 val.mv_size = sizeof(profile_key); 1924 1925 if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) { 1926 ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n", 1927 profile_key, mdb_strerror(rc)); 1928 return 0; 1929 } 1930 1931 return 1; 1932 } 1933 1934 static int ndb_write_profile(struct ndb_txn *txn, 1935 struct ndb_writer_profile *profile, 1936 uint64_t note_key) 1937 { 1938 uint64_t profile_key; 1939 struct ndb_note *note; 1940 void *flatbuf; 1941 size_t flatbuf_len; 1942 int rc; 1943 1944 MDB_val key, val; 1945 MDB_dbi profile_db; 1946 1947 note = profile->note.note; 1948 1949 // add note_key to profile record 1950 NdbProfileRecord_note_key_add(profile->record.builder, note_key); 1951 NdbProfileRecord_end_as_root(profile->record.builder); 1952 1953 flatbuf = profile->record.flatbuf = 1954 flatcc_builder_finalize_aligned_buffer(profile->record.builder, &flatbuf_len); 1955 1956 assert(((uint64_t)flatbuf % 8) == 0); 1957 1958 // TODO: this may not be safe!? 1959 flatbuf_len = (flatbuf_len + 7) & ~7; 1960 1961 //assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0); 1962 1963 // get dbs 1964 profile_db = txn->lmdb->dbs[NDB_DB_PROFILE]; 1965 1966 // get new key 1967 profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1; 1968 1969 // write profile to profile store 1970 key.mv_data = &profile_key; 1971 key.mv_size = sizeof(profile_key); 1972 val.mv_data = flatbuf; 1973 val.mv_size = flatbuf_len; 1974 1975 if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) { 1976 ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc)); 1977 return 0; 1978 } 1979 1980 // write last fetched record 1981 if (!ndb_maybe_write_last_profile_fetch(txn, note)) { 1982 ndb_debug("failed to write last profile fetched record\n"); 1983 return 0; 1984 } 1985 1986 // write profile pubkey index 1987 if (!ndb_write_profile_pk_index(txn, note, profile_key)) { 1988 ndb_debug("failed to write profile pubkey index\n"); 1989 return 0; 1990 } 1991 1992 // write name, display_name profile search indices 1993 if (!ndb_write_profile_search_indices(txn, note, profile_key, 1994 flatbuf)) { 1995 ndb_debug("failed to write profile search indices\n"); 1996 return 0; 1997 } 1998 1999 return 1; 2000 } 2001 2002 // find the last id tag in a note (e, p, etc) 2003 static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type) 2004 { 2005 unsigned char *last = NULL; 2006 struct ndb_iterator iter; 2007 struct ndb_str str; 2008 2009 // get the liked event id (last id) 2010 ndb_tags_iterate_start(note, &iter); 2011 2012 while (ndb_tags_iterate_next(&iter)) { 2013 if (iter.tag->count < 2) 2014 continue; 2015 2016 str = ndb_tag_str(note, iter.tag, 0); 2017 2018 // assign liked to the last e tag 2019 if (str.flag == NDB_PACKED_STR && str.str[0] == type) { 2020 str = ndb_tag_str(note, iter.tag, 1); 2021 if (str.flag == NDB_PACKED_ID) 2022 last = str.id; 2023 } 2024 } 2025 2026 return last; 2027 } 2028 2029 void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len) 2030 { 2031 MDB_val k, v; 2032 2033 k.mv_data = (unsigned char*)id; 2034 k.mv_size = 32; 2035 2036 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) { 2037 ndb_debug("ndb_get_note_meta: mdb_get note failed\n"); 2038 return NULL; 2039 } 2040 2041 if (len) 2042 *len = v.mv_size; 2043 2044 return v.mv_data; 2045 } 2046 2047 // When receiving a reaction note, look for the liked id and increase the 2048 // reaction counter in the note metadata database 2049 static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note) 2050 { 2051 size_t len; 2052 void *root; 2053 int reactions, rc; 2054 MDB_val key, val; 2055 NdbEventMeta_table_t meta; 2056 unsigned char *liked = ndb_note_last_id_tag(note, 'e'); 2057 2058 if (liked == NULL) 2059 return 0; 2060 2061 root = ndb_get_note_meta(txn, liked, &len); 2062 2063 flatcc_builder_t builder; 2064 flatcc_builder_init(&builder); 2065 NdbEventMeta_start_as_root(&builder); 2066 2067 // no meta record, let's make one 2068 if (root == NULL) { 2069 NdbEventMeta_reactions_add(&builder, 1); 2070 } else { 2071 // clone existing and add to it 2072 meta = NdbEventMeta_as_root(root); 2073 2074 reactions = NdbEventMeta_reactions_get(meta); 2075 NdbEventMeta_clone(&builder, meta); 2076 NdbEventMeta_reactions_add(&builder, reactions + 1); 2077 } 2078 2079 NdbProfileRecord_end_as_root(&builder); 2080 root = flatcc_builder_finalize_aligned_buffer(&builder, &len); 2081 assert(((uint64_t)root % 8) == 0); 2082 2083 if (root == NULL) { 2084 ndb_debug("failed to create note metadata record\n"); 2085 return 0; 2086 } 2087 2088 // metadata is keyed on id because we want to collect stats regardless 2089 // if we have the note yet or not 2090 key.mv_data = liked; 2091 key.mv_size = 32; 2092 2093 val.mv_data = root; 2094 val.mv_size = len; 2095 2096 // write the new meta record 2097 //ndb_debug("writing stats record for "); 2098 //print_hex(liked, 32); 2099 //ndb_debug("\n"); 2100 2101 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) { 2102 ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc)); 2103 return 0; 2104 } 2105 2106 free(root); 2107 2108 return 1; 2109 } 2110 2111 2112 static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note, 2113 uint64_t note_key) 2114 2115 { 2116 struct ndb_tsid tsid; 2117 int rc; 2118 MDB_val key, val; 2119 MDB_dbi id_db; 2120 2121 ndb_tsid_init(&tsid, note->id, note->created_at); 2122 2123 key.mv_data = &tsid; 2124 key.mv_size = sizeof(tsid); 2125 val.mv_data = ¬e_key; 2126 val.mv_size = sizeof(note_key); 2127 2128 id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 2129 2130 if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) { 2131 ndb_debug("write note id index to db failed: %s\n", 2132 mdb_strerror(rc)); 2133 return 0; 2134 } 2135 2136 return 1; 2137 } 2138 2139 static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note, 2140 uint64_t note_key) 2141 { 2142 struct ndb_u64_tsid tsid; 2143 int rc; 2144 MDB_val key, val; 2145 MDB_dbi kind_db; 2146 2147 ndb_u64_tsid_init(&tsid, note->kind, note->created_at); 2148 2149 key.mv_data = &tsid; 2150 key.mv_size = sizeof(tsid); 2151 val.mv_data = ¬e_key; 2152 val.mv_size = sizeof(note_key); 2153 2154 kind_db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 2155 2156 if ((rc = mdb_put(txn->mdb_txn, kind_db, &key, &val, 0))) { 2157 ndb_debug("write note kind index to db failed: %s\n", 2158 mdb_strerror(rc)); 2159 return 0; 2160 } 2161 2162 return 1; 2163 } 2164 2165 static int ndb_write_word_to_index(struct ndb_txn *txn, const char *word, 2166 int word_len, int word_index, 2167 uint64_t timestamp, uint64_t note_id) 2168 { 2169 // cap to some reasonable key size 2170 unsigned char buffer[1024]; 2171 int keysize, rc; 2172 MDB_val k, v; 2173 MDB_dbi text_db; 2174 2175 // build our compressed text index key 2176 if (!ndb_make_text_search_key(buffer, sizeof(buffer), word_index, 2177 word_len, word, timestamp, note_id, 2178 &keysize)) { 2179 // probably too big 2180 2181 return 0; 2182 } 2183 2184 k.mv_data = buffer; 2185 k.mv_size = keysize; 2186 2187 v.mv_data = NULL; 2188 v.mv_size = 0; 2189 2190 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 2191 2192 if ((rc = mdb_put(txn->mdb_txn, text_db, &k, &v, 0))) { 2193 ndb_debug("write note text index to db failed: %s\n", 2194 mdb_strerror(rc)); 2195 return 0; 2196 } 2197 2198 return 1; 2199 } 2200 2201 2202 2203 // break a string into individual words for querying or for building the 2204 // fulltext search index. This is callback based so we don't need to 2205 // build up an intermediate structure 2206 static int ndb_parse_words(struct cursor *cur, void *ctx, ndb_word_parser_fn fn) 2207 { 2208 int word_len, words; 2209 const char *word; 2210 2211 words = 0; 2212 2213 while (cur->p < cur->end) { 2214 consume_whitespace_or_punctuation(cur); 2215 if (cur->p >= cur->end) 2216 break; 2217 word = (const char *)cur->p; 2218 2219 if (!consume_until_boundary(cur)) 2220 break; 2221 2222 // start of word or end 2223 word_len = cur->p - (unsigned char *)word; 2224 if (word_len == 0 && cur->p >= cur->end) 2225 break; 2226 2227 if (word_len == 0) { 2228 if (!cursor_skip(cur, 1)) 2229 break; 2230 continue; 2231 } 2232 2233 //ndb_debug("writing word index '%.*s'\n", word_len, word); 2234 2235 if (!fn(ctx, word, word_len, words)) 2236 continue; 2237 2238 words++; 2239 } 2240 2241 return 1; 2242 } 2243 2244 struct ndb_word_writer_ctx 2245 { 2246 struct ndb_txn *txn; 2247 struct ndb_note *note; 2248 uint64_t note_id; 2249 }; 2250 2251 static int ndb_fulltext_word_writer(void *ctx, 2252 const char *word, int word_len, int words) 2253 { 2254 struct ndb_word_writer_ctx *wctx = ctx; 2255 2256 if (!ndb_write_word_to_index(wctx->txn, word, word_len, words, 2257 wctx->note->created_at, wctx->note_id)) { 2258 // too big to write this one, just skip it 2259 ndb_debug("failed to write word '%.*s' to index\n", word_len, word); 2260 2261 return 0; 2262 } 2263 2264 //fprintf(stderr, "wrote '%.*s' to note text index\n", word_len, word); 2265 return 1; 2266 } 2267 2268 static int ndb_write_note_fulltext_index(struct ndb_txn *txn, 2269 struct ndb_note *note, 2270 uint64_t note_id) 2271 { 2272 struct cursor cur; 2273 unsigned char *content; 2274 struct ndb_str str; 2275 struct ndb_word_writer_ctx ctx; 2276 2277 str = ndb_note_str(note, ¬e->content); 2278 // I don't think this should happen? 2279 if (unlikely(str.flag == NDB_PACKED_ID)) 2280 return 0; 2281 2282 content = (unsigned char *)str.str; 2283 2284 make_cursor(content, content + note->content_length, &cur); 2285 2286 ctx.txn = txn; 2287 ctx.note = note; 2288 ctx.note_id = note_id; 2289 2290 ndb_parse_words(&cur, &ctx, ndb_fulltext_word_writer); 2291 2292 return 1; 2293 } 2294 2295 static int ndb_parse_search_words(void *ctx, const char *word_str, int word_len, int word_index) 2296 { 2297 (void)word_index; 2298 struct ndb_search_words *words = ctx; 2299 struct ndb_word *word; 2300 2301 if (words->num_words + 1 > MAX_TEXT_SEARCH_WORDS) 2302 return 0; 2303 2304 word = &words->words[words->num_words++]; 2305 word->word = word_str; 2306 word->word_len = word_len; 2307 2308 return 1; 2309 } 2310 2311 static void ndb_search_words_init(struct ndb_search_words *words) 2312 { 2313 words->num_words = 0; 2314 } 2315 2316 static int prefix_count(const char *str1, int len1, const char *str2, int len2) { 2317 int i, count = 0; 2318 int min_len = len1 < len2 ? len1 : len2; 2319 2320 for (i = 0; i < min_len; i++) { 2321 // case insensitive 2322 if (tolower(str1[i]) == tolower(str2[i])) 2323 count++; 2324 else 2325 break; 2326 } 2327 2328 return count; 2329 } 2330 2331 static void ndb_print_text_search_key(struct ndb_text_search_key *key) 2332 { 2333 printf("K<'%.*s' %d %" PRIu64 " note_id:%" PRIu64 ">", key->str_len, key->str, 2334 key->word_index, 2335 key->timestamp, 2336 key->note_id); 2337 } 2338 2339 static int ndb_prefix_matches(struct ndb_text_search_result *result, 2340 struct ndb_word *search_word) 2341 { 2342 // Empty strings shouldn't happen but let's 2343 if (result->key.str_len < 2 || search_word->word_len < 2) 2344 return 0; 2345 2346 // make sure we at least have two matching prefix characters. exact 2347 // matches are nice but range searches allow us to match prefixes as 2348 // well. A double-char prefix is suffient, but maybe we could up this 2349 // in the future. 2350 // 2351 // TODO: How are we handling utf-8 prefix matches like 2352 // japanese? 2353 // 2354 if ( result->key.str[0] != tolower(search_word->word[0]) 2355 && result->key.str[1] != tolower(search_word->word[1]) 2356 ) 2357 return 0; 2358 2359 // count the number of prefix-matched characters. This will be used 2360 // for ranking search results 2361 result->prefix_chars = prefix_count(result->key.str, 2362 result->key.str_len, 2363 search_word->word, 2364 search_word->word_len); 2365 2366 if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5)) 2367 return 0; 2368 2369 return 1; 2370 } 2371 2372 // This is called when scanning the full text search index. Scanning stops 2373 // when we no longer have a prefix match for the word 2374 static int ndb_text_search_next_word(MDB_cursor *cursor, MDB_cursor_op op, 2375 MDB_val *k, struct ndb_word *search_word, 2376 struct ndb_text_search_result *last_result, 2377 struct ndb_text_search_result *result, 2378 MDB_cursor_op order_op) 2379 { 2380 struct cursor key_cursor; 2381 //struct ndb_text_search_key search_key; 2382 MDB_val v; 2383 int retries; 2384 retries = -1; 2385 2386 make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor); 2387 2388 // When op is MDB_SET_RANGE, this initializes the search. Position 2389 // the cursor at the next key greater than or equal to the specified 2390 // key. 2391 // 2392 // Subsequent searches should use MDB_NEXT 2393 if (mdb_cursor_get(cursor, k, &v, op)) { 2394 // we should only do this if we're going in reverse 2395 if (op == MDB_SET_RANGE && order_op == MDB_PREV) { 2396 // if set range worked and our key exists, it should be 2397 // the one right before this one 2398 if (mdb_cursor_get(cursor, k, &v, MDB_PREV)) 2399 return 0; 2400 } else { 2401 return 0; 2402 } 2403 } 2404 2405 retry: 2406 retries++; 2407 /* 2408 printf("continuing from "); 2409 if (ndb_unpack_text_search_key(k->mv_data, k->mv_size, &search_key)) { 2410 ndb_print_text_search_key(&search_key); 2411 } else { printf("??"); } 2412 printf("\n"); 2413 */ 2414 2415 make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor); 2416 2417 if (unlikely(!ndb_unpack_text_search_key_noteid(&key_cursor, &result->key.note_id))) { 2418 fprintf(stderr, "UNUSUAL: failed to unpack text search key note_id\n"); 2419 return 0; 2420 } 2421 2422 if (last_result) { 2423 if (last_result->key.note_id != result->key.note_id) 2424 return 0; 2425 } 2426 2427 // On success, this could still be not related at all. 2428 // It could just be adjacent to the word. Let's check 2429 // if we have a matching prefix at least. 2430 2431 // Before we unpack the entire key, let's quickly 2432 // unpack just the string to check the prefix. We don't 2433 // need to unpack the entire key if the prefix doesn't 2434 // match 2435 if (!ndb_unpack_text_search_key_string(&key_cursor, 2436 &result->key.str, 2437 &result->key.str_len)) { 2438 // this should never happen 2439 fprintf(stderr, "UNUSUAL: failed to unpack text search key string\n"); 2440 return 0; 2441 } 2442 2443 if (!ndb_prefix_matches(result, search_word)) { 2444 /* 2445 printf("result prefix '%.*s' didn't match search word '%.*s'\n", 2446 result->key.str_len, result->key.str, 2447 search_word->word_len, search_word->word); 2448 */ 2449 // we should only do this if we're going in reverse 2450 if (retries == 0 && op == MDB_SET_RANGE && order_op == MDB_PREV) { 2451 // if set range worked and our key exists, it should be 2452 // the one right before this one 2453 mdb_cursor_get(cursor, k, &v, MDB_PREV); 2454 goto retry; 2455 } else { 2456 return 0; 2457 } 2458 } 2459 2460 // Unpack the remaining text search key, we will need this information 2461 // when building up our search results. 2462 if (!ndb_unpack_remaining_text_search_key(&key_cursor, &result->key)) { 2463 // This should never happen 2464 fprintf(stderr, "UNUSUAL: failed to unpack text search key\n"); 2465 return 0; 2466 } 2467 2468 if (last_result) { 2469 if (result->key.word_index < last_result->key.word_index) { 2470 /* 2471 fprintf(stderr, "skipping '%.*s' because it is before last result '%.*s'\n", 2472 result->key.str_len, result->key.str, 2473 last_result->key.str_len, last_result->key.str); 2474 */ 2475 return 0; 2476 } 2477 } 2478 2479 return 1; 2480 } 2481 2482 static void ndb_text_search_results_init( 2483 struct ndb_text_search_results *results) { 2484 results->num_results = 0; 2485 } 2486 2487 void ndb_default_text_search_config(struct ndb_text_search_config *cfg) 2488 { 2489 cfg->order = NDB_ORDER_DESCENDING; 2490 cfg->limit = MAX_TEXT_SEARCH_RESULTS; 2491 } 2492 2493 void ndb_text_search_config_set_order(struct ndb_text_search_config *cfg, 2494 enum ndb_search_order order) 2495 { 2496 cfg->order = order; 2497 } 2498 2499 void ndb_text_search_config_set_limit(struct ndb_text_search_config *cfg, int limit) 2500 { 2501 cfg->limit = limit; 2502 } 2503 2504 int ndb_text_search(struct ndb_txn *txn, const char *query, 2505 struct ndb_text_search_results *results, 2506 struct ndb_text_search_config *config) 2507 { 2508 unsigned char buffer[1024], *buf; 2509 unsigned char saved_buf[1024], *saved; 2510 struct ndb_text_search_result *result, *last_result; 2511 struct ndb_text_search_result candidate, last_candidate; 2512 struct ndb_search_words search_words; 2513 //struct ndb_text_search_key search_key; 2514 struct ndb_word *search_word; 2515 struct cursor cur; 2516 ndb_text_search_key_order_fn key_order_fn; 2517 MDB_dbi text_db; 2518 MDB_cursor *cursor; 2519 MDB_val k, v; 2520 int i, j, keysize, saved_size, limit; 2521 MDB_cursor_op op, order_op; 2522 //int num_note_ids; 2523 2524 saved = NULL; 2525 ndb_text_search_results_init(results); 2526 ndb_search_words_init(&search_words); 2527 2528 // search config 2529 limit = MAX_TEXT_SEARCH_RESULTS; 2530 order_op = MDB_PREV; 2531 key_order_fn = ndb_make_text_search_key_high; 2532 if (config) { 2533 if (config->order == NDB_ORDER_ASCENDING) { 2534 order_op = MDB_NEXT; 2535 key_order_fn = ndb_make_text_search_key_low; 2536 } 2537 limit = min(limit, config->limit); 2538 } 2539 // end search config 2540 2541 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 2542 make_cursor((unsigned char *)query, (unsigned char *)query + strlen(query), &cur); 2543 2544 ndb_parse_words(&cur, &search_words, ndb_parse_search_words); 2545 if (search_words.num_words == 0) 2546 return 0; 2547 2548 if ((i = mdb_cursor_open(txn->mdb_txn, text_db, &cursor))) { 2549 fprintf(stderr, "nd_text_search: mdb_cursor_open failed, error %d\n", i); 2550 return 0; 2551 } 2552 2553 // for each word, we recursively find all of the submatches 2554 while (results->num_results < limit) { 2555 last_result = NULL; 2556 result = &results->results[results->num_results]; 2557 2558 // if we have saved, then we continue from the last root search 2559 // sequence 2560 if (saved) { 2561 buf = saved_buf; 2562 saved = NULL; 2563 keysize = saved_size; 2564 2565 k.mv_data = buf; 2566 k.mv_size = saved_size; 2567 2568 // reposition the cursor so we can continue 2569 if (mdb_cursor_get(cursor, &k, &v, MDB_SET_RANGE)) 2570 break; 2571 2572 op = order_op; 2573 } else { 2574 // construct a packed fulltext search key using this 2575 // word this key doesn't contain any timestamp or index 2576 // info, so it should range match instead of exact 2577 // match 2578 if (!key_order_fn(buffer, sizeof(buffer), 2579 search_words.words[0].word_len, 2580 search_words.words[0].word, &keysize)) 2581 { 2582 // word is too big to fit in 1024-sized key 2583 continue; 2584 } 2585 2586 buf = buffer; 2587 op = MDB_SET_RANGE; 2588 } 2589 2590 for (j = 0; j < search_words.num_words; j++) { 2591 search_word = &search_words.words[j]; 2592 2593 // shouldn't happen but let's be defensive a bit 2594 if (search_word->word_len == 0) 2595 continue; 2596 2597 // if we already matched a note in this phrase, make 2598 // sure we're including the note id in the query 2599 if (last_result) { 2600 // we are narrowing down a search. 2601 // if we already have this note id, just continue 2602 for (i = 0; i < results->num_results; i++) { 2603 if (results->results[i].key.note_id == last_result->key.note_id) 2604 goto cont; 2605 } 2606 2607 if (!ndb_make_noted_text_search_key( 2608 buffer, sizeof(buffer), 2609 search_word->word_len, 2610 search_word->word, 2611 last_result->key.timestamp, 2612 last_result->key.note_id, 2613 &keysize)) 2614 { 2615 continue; 2616 } 2617 2618 buf = buffer; 2619 } 2620 2621 k.mv_data = buf; 2622 k.mv_size = keysize; 2623 2624 if (!ndb_text_search_next_word(cursor, op, &k, 2625 search_word, 2626 last_result, 2627 &candidate, 2628 order_op)) { 2629 break; 2630 } 2631 2632 *result = candidate; 2633 op = MDB_SET_RANGE; 2634 2635 // save the first key match, since we will continue from 2636 // this on the next root word result 2637 if (j == 0 && !saved) { 2638 memcpy(saved_buf, k.mv_data, k.mv_size); 2639 saved = saved_buf; 2640 saved_size = k.mv_size; 2641 } 2642 2643 last_candidate = *result; 2644 last_result = &last_candidate; 2645 } 2646 2647 cont: 2648 // we matched all of the queries! 2649 if (j == search_words.num_words) { 2650 results->num_results++; 2651 } else if (j == 0) { 2652 break; 2653 } 2654 2655 } 2656 2657 mdb_cursor_close(cursor); 2658 2659 return 1; 2660 } 2661 2662 static uint64_t ndb_write_note(struct ndb_txn *txn, 2663 struct ndb_writer_note *note) 2664 { 2665 int rc; 2666 uint64_t note_key; 2667 MDB_dbi note_db; 2668 MDB_val key, val; 2669 2670 // let's quickly sanity check if we already have this note 2671 if (ndb_get_notekey_by_id(txn, note->note->id)) 2672 return 0; 2673 2674 // get dbs 2675 note_db = txn->lmdb->dbs[NDB_DB_NOTE]; 2676 2677 // get new key 2678 note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1; 2679 2680 // write note to event store 2681 key.mv_data = ¬e_key; 2682 key.mv_size = sizeof(note_key); 2683 val.mv_data = note->note; 2684 val.mv_size = note->note_len; 2685 2686 if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) { 2687 ndb_debug("write note to db failed: %s\n", mdb_strerror(rc)); 2688 return 0; 2689 } 2690 2691 // write id index key clustered with created_at 2692 if (!ndb_write_note_id_index(txn, note->note, note_key)) 2693 return 0; 2694 2695 // write note kind index 2696 if (!ndb_write_note_kind_index(txn, note->note, note_key)) 2697 return 0; 2698 2699 // only do fulltext index on text and longform notes 2700 if (note->note->kind == 1 || note->note->kind == 30023) { 2701 if (!ndb_write_note_fulltext_index(txn, note->note, note_key)) 2702 return 0; 2703 } 2704 2705 if (note->note->kind == 7) { 2706 ndb_write_reaction_stats(txn, note->note); 2707 } 2708 2709 return note_key; 2710 } 2711 2712 // only to be called from the writer thread 2713 static void ndb_write_version(struct ndb_txn *txn, uint64_t version) 2714 { 2715 int rc; 2716 MDB_val key, val; 2717 uint64_t version_key; 2718 2719 version_key = NDB_META_KEY_VERSION; 2720 2721 key.mv_data = &version_key; 2722 key.mv_size = sizeof(version_key); 2723 val.mv_data = &version; 2724 val.mv_size = sizeof(version); 2725 2726 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { 2727 ndb_debug("write version to ndb_meta failed: %s\n", 2728 mdb_strerror(rc)); 2729 return; 2730 } 2731 2732 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 2733 } 2734 2735 static void *ndb_writer_thread(void *data) 2736 { 2737 struct ndb_writer *writer = data; 2738 struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; 2739 int i, popped, done, any_note; 2740 uint64_t note_nkey; 2741 MDB_txn *mdb_txn = NULL; 2742 struct ndb_txn txn; 2743 ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn); 2744 2745 done = 0; 2746 while (!done) { 2747 txn.mdb_txn = NULL; 2748 popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); 2749 //ndb_debug("writer popped %d items\n", popped); 2750 2751 any_note = 0; 2752 for (i = 0 ; i < popped; i++) { 2753 msg = &msgs[i]; 2754 switch (msg->type) { 2755 case NDB_WRITER_NOTE: any_note = 1; break; 2756 case NDB_WRITER_PROFILE: any_note = 1; break; 2757 case NDB_WRITER_DBMETA: any_note = 1; break; 2758 case NDB_WRITER_PROFILE_LAST_FETCH: any_note = 1; break; 2759 case NDB_WRITER_QUIT: break; 2760 } 2761 } 2762 2763 if (any_note && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn)) 2764 { 2765 fprintf(stderr, "writer thread txn_begin failed"); 2766 // should definitely not happen unless DB is full 2767 // or something ? 2768 assert(false); 2769 } 2770 2771 for (i = 0; i < popped; i++) { 2772 msg = &msgs[i]; 2773 2774 switch (msg->type) { 2775 case NDB_WRITER_QUIT: 2776 // quits are handled before this 2777 done = 1; 2778 continue; 2779 case NDB_WRITER_PROFILE: 2780 note_nkey = 2781 ndb_write_note(&txn, &msg->note); 2782 if (msg->profile.record.builder) { 2783 // only write if parsing didn't fail 2784 ndb_write_profile(&txn, &msg->profile, 2785 note_nkey); 2786 } 2787 break; 2788 case NDB_WRITER_NOTE: 2789 ndb_write_note(&txn, &msg->note); 2790 //printf("wrote note "); 2791 //print_hex(msg->note.note->id, 32); 2792 //printf("\n"); 2793 break; 2794 case NDB_WRITER_DBMETA: 2795 ndb_write_version(&txn, msg->ndb_meta.version); 2796 break; 2797 case NDB_WRITER_PROFILE_LAST_FETCH: 2798 ndb_writer_last_profile_fetch(&txn, 2799 msg->last_fetch.pubkey, 2800 msg->last_fetch.fetched_at 2801 ); 2802 break; 2803 } 2804 } 2805 2806 // commit writes 2807 if (any_note && !ndb_end_query(&txn)) { 2808 fprintf(stderr, "writer thread txn commit failed"); 2809 assert(false); 2810 } 2811 2812 // free notes 2813 for (i = 0; i < popped; i++) { 2814 msg = &msgs[i]; 2815 if (msg->type == NDB_WRITER_NOTE) 2816 free(msg->note.note); 2817 else if (msg->type == NDB_WRITER_PROFILE) { 2818 free(msg->profile.note.note); 2819 ndb_profile_record_builder_free(&msg->profile.record); 2820 } 2821 } 2822 } 2823 2824 ndb_debug("quitting writer thread\n"); 2825 return NULL; 2826 } 2827 2828 static void *ndb_ingester_thread(void *data) 2829 { 2830 secp256k1_context *ctx; 2831 struct thread *thread = data; 2832 struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx; 2833 struct ndb_lmdb *lmdb = ingester->writer->lmdb; 2834 struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg; 2835 struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out; 2836 int i, to_write, popped, done, any_event; 2837 MDB_txn *read_txn = NULL; 2838 int rc; 2839 2840 ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); 2841 ndb_debug("started ingester thread\n"); 2842 2843 done = 0; 2844 while (!done) { 2845 to_write = 0; 2846 any_event = 0; 2847 2848 popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH); 2849 //ndb_debug("ingester popped %d items\n", popped); 2850 2851 for (i = 0; i < popped; i++) { 2852 msg = &msgs[i]; 2853 if (msg->type == NDB_INGEST_EVENT) { 2854 any_event = 1; 2855 break; 2856 } 2857 } 2858 2859 if (any_event && (rc = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn))) { 2860 // this is bad 2861 fprintf(stderr, "UNUSUAL ndb_ingester: mdb_txn_begin failed: '%s'\n", 2862 mdb_strerror(rc)); 2863 continue; 2864 } 2865 2866 for (i = 0; i < popped; i++) { 2867 msg = &msgs[i]; 2868 switch (msg->type) { 2869 case NDB_INGEST_QUIT: 2870 done = 1; 2871 break; 2872 2873 case NDB_INGEST_EVENT: 2874 out = &outs[to_write]; 2875 if (ndb_ingester_process_event(ctx, ingester, 2876 &msg->event, out, 2877 read_txn)) { 2878 to_write++; 2879 } 2880 } 2881 } 2882 2883 if (any_event) 2884 mdb_txn_abort(read_txn); 2885 2886 if (to_write > 0) { 2887 //ndb_debug("pushing %d events to write queue\n", to_write); 2888 if (!ndb_writer_queue_msgs(ingester->writer, outs, to_write)) { 2889 ndb_debug("failed pushing %d events to write queue\n", to_write); 2890 } 2891 } 2892 } 2893 2894 ndb_debug("quitting ingester thread\n"); 2895 secp256k1_context_destroy(ctx); 2896 return NULL; 2897 } 2898 2899 2900 static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb) 2901 { 2902 writer->lmdb = lmdb; 2903 writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE; 2904 writer->queue_buf = malloc(writer->queue_buflen); 2905 if (writer->queue_buf == NULL) { 2906 fprintf(stderr, "ndb: failed to allocate space for writer queue"); 2907 return 0; 2908 } 2909 2910 // init the writer queue. 2911 prot_queue_init(&writer->inbox, writer->queue_buf, 2912 writer->queue_buflen, sizeof(struct ndb_writer_msg)); 2913 2914 // spin up the writer thread 2915 if (pthread_create(&writer->thread_id, NULL, ndb_writer_thread, writer)) 2916 { 2917 fprintf(stderr, "ndb writer thread failed to create\n"); 2918 return 0; 2919 } 2920 2921 return 1; 2922 } 2923 2924 // initialize the ingester queue and then spawn the thread 2925 static int ndb_ingester_init(struct ndb_ingester *ingester, 2926 struct ndb_writer *writer, 2927 struct ndb_config *config) 2928 { 2929 int elem_size, num_elems; 2930 static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT }; 2931 2932 // TODO: configurable queue sizes 2933 elem_size = sizeof(struct ndb_ingester_msg); 2934 num_elems = DEFAULT_QUEUE_SIZE; 2935 2936 ingester->writer = writer; 2937 ingester->flags = config->flags; 2938 ingester->filter = config->ingest_filter; 2939 ingester->filter_context = config->filter_context; 2940 2941 if (!threadpool_init(&ingester->tp, config->ingester_threads, 2942 elem_size, num_elems, &quit_msg, ingester, 2943 ndb_ingester_thread)) 2944 { 2945 fprintf(stderr, "ndb ingester threadpool failed to init\n"); 2946 return 0; 2947 } 2948 2949 return 1; 2950 } 2951 2952 static int ndb_writer_destroy(struct ndb_writer *writer) 2953 { 2954 struct ndb_writer_msg msg; 2955 2956 // kill thread 2957 msg.type = NDB_WRITER_QUIT; 2958 if (!prot_queue_push(&writer->inbox, &msg)) { 2959 // queue is too full to push quit message. just kill it. 2960 pthread_exit(&writer->thread_id); 2961 } else { 2962 pthread_join(writer->thread_id, NULL); 2963 } 2964 2965 // cleanup 2966 prot_queue_destroy(&writer->inbox); 2967 2968 free(writer->queue_buf); 2969 2970 return 1; 2971 } 2972 2973 static int ndb_ingester_destroy(struct ndb_ingester *ingester) 2974 { 2975 threadpool_destroy(&ingester->tp); 2976 return 1; 2977 } 2978 2979 static int ndb_ingester_queue_event(struct ndb_ingester *ingester, 2980 char *json, unsigned len, unsigned client) 2981 { 2982 struct ndb_ingester_msg msg; 2983 msg.type = NDB_INGEST_EVENT; 2984 2985 msg.event.json = json; 2986 msg.event.len = len; 2987 msg.event.client = client; 2988 2989 return threadpool_dispatch(&ingester->tp, &msg); 2990 } 2991 2992 static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize) 2993 { 2994 int rc; 2995 MDB_txn *txn; 2996 2997 if ((rc = mdb_env_create(&lmdb->env))) { 2998 fprintf(stderr, "mdb_env_create failed, error %d\n", rc); 2999 return 0; 3000 } 3001 3002 if ((rc = mdb_env_set_mapsize(lmdb->env, mapsize))) { 3003 fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc); 3004 return 0; 3005 } 3006 3007 if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) { 3008 fprintf(stderr, "mdb_env_set_maxdbs failed, error %d\n", rc); 3009 return 0; 3010 } 3011 3012 if ((rc = mdb_env_open(lmdb->env, filename, 0, 0664))) { 3013 fprintf(stderr, "mdb_env_open failed, error %d\n", rc); 3014 return 0; 3015 } 3016 3017 // Initialize DBs 3018 if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) { 3019 fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc); 3020 return 0; 3021 } 3022 3023 // note flatbuffer db 3024 if ((rc = mdb_dbi_open(txn, "note", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NOTE]))) { 3025 fprintf(stderr, "mdb_dbi_open event failed, error %d\n", rc); 3026 return 0; 3027 } 3028 3029 // note metadata db 3030 if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) { 3031 fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc); 3032 return 0; 3033 } 3034 3035 // profile flatbuffer db 3036 if ((rc = mdb_dbi_open(txn, "profile", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_PROFILE]))) { 3037 fprintf(stderr, "mdb_dbi_open profile failed, error %d\n", rc); 3038 return 0; 3039 } 3040 3041 // profile search db 3042 if ((rc = mdb_dbi_open(txn, "profile_search", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_SEARCH]))) { 3043 fprintf(stderr, "mdb_dbi_open profile_search failed, error %d\n", rc); 3044 return 0; 3045 } 3046 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], ndb_search_key_cmp); 3047 3048 // ndb metadata (db version, etc) 3049 if ((rc = mdb_dbi_open(txn, "ndb_meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NDB_META]))) { 3050 fprintf(stderr, "mdb_dbi_open ndb_meta failed, error %d\n", rc); 3051 return 0; 3052 } 3053 3054 // profile last fetches 3055 if ((rc = mdb_dbi_open(txn, "profile_last_fetch", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH]))) { 3056 fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); 3057 return 0; 3058 } 3059 3060 // id+ts index flags 3061 unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; 3062 3063 // index dbs 3064 if ((rc = mdb_dbi_open(txn, "note_id", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_ID]))) { 3065 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 3066 return 0; 3067 } 3068 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare); 3069 3070 if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) { 3071 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 3072 return 0; 3073 } 3074 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare); 3075 3076 if ((rc = mdb_dbi_open(txn, "note_kind", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_KIND]))) { 3077 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 3078 return 0; 3079 } 3080 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_tsid_compare); 3081 3082 if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT, 3083 &lmdb->dbs[NDB_DB_NOTE_TEXT]))) { 3084 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 3085 return 0; 3086 } 3087 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare); 3088 3089 // Commit the transaction 3090 if ((rc = mdb_txn_commit(txn))) { 3091 fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc); 3092 return 0; 3093 } 3094 3095 return 1; 3096 } 3097 3098 static int ndb_queue_write_version(struct ndb *ndb, uint64_t version) 3099 { 3100 struct ndb_writer_msg msg; 3101 msg.type = NDB_WRITER_DBMETA; 3102 msg.ndb_meta.version = version; 3103 return ndb_writer_queue_msg(&ndb->writer, &msg); 3104 } 3105 3106 static int ndb_run_migrations(struct ndb *ndb) 3107 { 3108 int64_t version, latest_version, i; 3109 3110 latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); 3111 3112 if ((version = ndb_db_version(ndb)) == -1) { 3113 ndb_debug("run_migrations: no version found, assuming new db\n"); 3114 version = latest_version; 3115 3116 // no version found. fresh db? 3117 if (!ndb_queue_write_version(ndb, version)) { 3118 fprintf(stderr, "run_migrations: failed writing db version"); 3119 return 0; 3120 } 3121 3122 return 1; 3123 } else { 3124 ndb_debug("ndb: version %" PRIu64 " found\n", version); 3125 } 3126 3127 if (version < latest_version) 3128 ndb_debug("nostrdb: migrating v%d -> v%d\n", 3129 (int)version, (int)latest_version); 3130 3131 for (i = version; i < latest_version; i++) { 3132 if (!MIGRATIONS[i].fn(ndb)) { 3133 fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); 3134 return 0; 3135 } 3136 3137 if (!ndb_queue_write_version(ndb, i+1)) { 3138 fprintf(stderr, "run_migrations: failed writing db version"); 3139 return 0; 3140 } 3141 3142 version = i+1; 3143 } 3144 3145 ndb->version = version; 3146 3147 return 1; 3148 } 3149 3150 int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config) 3151 { 3152 struct ndb *ndb; 3153 //MDB_dbi ind_id; // TODO: ind_pk, etc 3154 3155 ndb = *pndb = calloc(1, sizeof(struct ndb)); 3156 ndb->flags = config->flags; 3157 3158 if (ndb == NULL) { 3159 fprintf(stderr, "ndb_init: malloc failed\n"); 3160 return 0; 3161 } 3162 3163 if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize)) 3164 return 0; 3165 3166 if (!ndb_writer_init(&ndb->writer, &ndb->lmdb)) { 3167 fprintf(stderr, "ndb_writer_init failed\n"); 3168 return 0; 3169 } 3170 3171 if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) { 3172 fprintf(stderr, "failed to initialize %d ingester thread(s)\n", 3173 config->ingester_threads); 3174 return 0; 3175 } 3176 3177 if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE) && 3178 !ndb_run_migrations(ndb)) { 3179 fprintf(stderr, "failed to run migrations\n"); 3180 return 0; 3181 } 3182 3183 // Initialize LMDB environment and spin up threads 3184 return 1; 3185 } 3186 3187 void ndb_destroy(struct ndb *ndb) 3188 { 3189 if (ndb == NULL) 3190 return; 3191 3192 // ingester depends on writer and must be destroyed first 3193 ndb_ingester_destroy(&ndb->ingester); 3194 ndb_writer_destroy(&ndb->writer); 3195 3196 mdb_env_close(ndb->lmdb.env); 3197 3198 free(ndb); 3199 } 3200 3201 // Process a nostr event from a client 3202 // 3203 // ie: ["EVENT", {"content":"..."} ...] 3204 // 3205 // The client-sent variation of ndb_process_event 3206 int ndb_process_client_event(struct ndb *ndb, const char *json, int len) 3207 { 3208 // Since we need to return as soon as possible, and we're not 3209 // making any assumptions about the lifetime of the string, we 3210 // definitely need to copy the json here. In the future once we 3211 // have our thread that manages a websocket connection, we can 3212 // avoid the copy and just use the buffer we get from that 3213 // thread. 3214 char *json_copy = strdupn(json, len); 3215 if (json_copy == NULL) 3216 return 0; 3217 3218 return ndb_ingester_queue_event(&ndb->ingester, json_copy, len, 1); 3219 } 3220 3221 // Process anostr event from a relay, 3222 // 3223 // ie: ["EVENT", "subid", {"content":"..."}...] 3224 // 3225 // This function returns as soon as possible, first copying the passed 3226 // json and then queueing it up for processing. Worker threads then take 3227 // the json and process it. 3228 // 3229 // Processing: 3230 // 3231 // 1. The event is parsed into ndb_notes and the signature is validated 3232 // 2. A quick lookup is made on the database to see if we already have 3233 // the note id, if we do we don't need to waste time on json parsing 3234 // or note validation. 3235 // 3. Once validation is done we pass it to the writer queue for writing 3236 // to LMDB. 3237 // 3238 int ndb_process_event(struct ndb *ndb, const char *json, int json_len) 3239 { 3240 // Since we need to return as soon as possible, and we're not 3241 // making any assumptions about the lifetime of the string, we 3242 // definitely need to copy the json here. In the future once we 3243 // have our thread that manages a websocket connection, we can 3244 // avoid the copy and just use the buffer we get from that 3245 // thread. 3246 char *json_copy = strdupn(json, json_len); 3247 if (json_copy == NULL) 3248 return 0; 3249 3250 return ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len, 0); 3251 } 3252 3253 3254 int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client) 3255 { 3256 const char *start, *end, *very_end; 3257 start = ldjson; 3258 end = start + json_len; 3259 very_end = ldjson + json_len; 3260 int (* process)(struct ndb *, const char *, int); 3261 #if DEBUG 3262 int processed = 0; 3263 #endif 3264 process = client ? ndb_process_client_event : ndb_process_event; 3265 3266 while ((end = fast_strchr(start, '\n', very_end - start))) { 3267 //printf("processing '%.*s'\n", (int)(end-start), start); 3268 if (!process(ndb, start, end - start)) { 3269 ndb_debug("ndb_process_client_event failed\n"); 3270 return 0; 3271 } 3272 start = end + 1; 3273 #if DEBUG 3274 processed++; 3275 #endif 3276 } 3277 3278 #if DEBUG 3279 ndb_debug("ndb_process_events: processed %d events\n", processed); 3280 #endif 3281 3282 return 1; 3283 } 3284 3285 int ndb_process_events_stream(struct ndb *ndb, FILE* fp) 3286 { 3287 char *line = NULL; 3288 size_t len = 0; 3289 ssize_t nread; 3290 3291 while ((nread = getline(&line, &len, fp)) != -1) { 3292 if (line == NULL) 3293 break; 3294 ndb_process_event(ndb, line, len); 3295 } 3296 3297 if (line) 3298 free(line); 3299 3300 return 1; 3301 } 3302 3303 int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len) 3304 { 3305 return _ndb_process_events(ndb, ldjson, json_len, 1); 3306 } 3307 3308 int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len) 3309 { 3310 return _ndb_process_events(ndb, ldjson, json_len, 0); 3311 } 3312 3313 static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag) 3314 { 3315 return cursor_push_u16(cur, tag->count); 3316 } 3317 3318 int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf, 3319 size_t bufsize) 3320 { 3321 struct ndb_note *note; 3322 int half, size, str_indices_size; 3323 3324 // come on bruh 3325 if (bufsize < sizeof(struct ndb_note) * 2) 3326 return 0; 3327 3328 str_indices_size = bufsize / 32; 3329 size = bufsize - str_indices_size; 3330 half = size / 2; 3331 3332 //debug("size %d half %d str_indices %d\n", size, half, str_indices_size); 3333 3334 // make a safe cursor of our available memory 3335 make_cursor(buf, buf + bufsize, &builder->mem); 3336 3337 note = builder->note = (struct ndb_note *)buf; 3338 3339 // take slices of the memory into subcursors 3340 if (!(cursor_slice(&builder->mem, &builder->note_cur, half) && 3341 cursor_slice(&builder->mem, &builder->strings, half) && 3342 cursor_slice(&builder->mem, &builder->str_indices, str_indices_size))) { 3343 return 0; 3344 } 3345 3346 memset(note, 0, sizeof(*note)); 3347 builder->note_cur.p += sizeof(*note); 3348 3349 note->strings = builder->strings.start - buf; 3350 note->version = 1; 3351 3352 return 1; 3353 } 3354 3355 3356 3357 static inline int ndb_json_parser_init(struct ndb_json_parser *p, 3358 const char *json, int json_len, 3359 unsigned char *buf, int bufsize) 3360 { 3361 int half = bufsize / 2; 3362 3363 unsigned char *tok_start = buf + half; 3364 unsigned char *tok_end = buf + bufsize; 3365 3366 p->toks = (jsmntok_t*)tok_start; 3367 p->toks_end = (jsmntok_t*)tok_end; 3368 p->num_tokens = 0; 3369 p->json = json; 3370 p->json_len = json_len; 3371 3372 // ndb_builder gets the first half of the buffer, and jsmn gets the 3373 // second half. I like this way of alloating memory (without actually 3374 // dynamically allocating memory). You get one big chunk upfront and 3375 // then submodules can recursively subdivide it. Maybe you could do 3376 // something even more clever like golden-ratio style subdivision where 3377 // the more important stuff gets a larger chunk and then it spirals 3378 // downward into smaller chunks. Thanks for coming to my TED talk. 3379 3380 if (!ndb_builder_init(&p->builder, buf, half)) 3381 return 0; 3382 3383 jsmn_init(&p->json_parser); 3384 3385 return 1; 3386 } 3387 3388 static inline int ndb_json_parser_parse(struct ndb_json_parser *p, 3389 struct ndb_id_cb *cb) 3390 { 3391 jsmntok_t *tok; 3392 int cap = ((unsigned char *)p->toks_end - (unsigned char*)p->toks)/sizeof(*p->toks); 3393 int res = 3394 jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap, cb != NULL); 3395 3396 // got an ID! 3397 if (res == -42) { 3398 tok = &p->toks[p->json_parser.toknext-1]; 3399 3400 switch (cb->fn(cb->data, p->json + tok->start)) { 3401 case NDB_IDRES_CONT: 3402 res = jsmn_parse(&p->json_parser, p->json, p->json_len, 3403 p->toks, cap, 0); 3404 break; 3405 case NDB_IDRES_STOP: 3406 return -42; 3407 } 3408 } else if (res == 0) { 3409 return 0; 3410 } 3411 3412 p->num_tokens = res; 3413 p->i = 0; 3414 3415 return 1; 3416 } 3417 3418 static inline int toksize(jsmntok_t *tok) 3419 { 3420 return tok->end - tok->start; 3421 } 3422 3423 3424 3425 static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2) 3426 { 3427 switch (c2) { 3428 case 't': return cursor_push_byte(cur, '\t'); 3429 case 'n': return cursor_push_byte(cur, '\n'); 3430 case 'r': return cursor_push_byte(cur, '\r'); 3431 case 'b': return cursor_push_byte(cur, '\b'); 3432 case 'f': return cursor_push_byte(cur, '\f'); 3433 case '\\': return cursor_push_byte(cur, '\\'); 3434 case '/': return cursor_push_byte(cur, '/'); 3435 case '"': return cursor_push_byte(cur, '"'); 3436 case 'u': 3437 // these aren't handled yet 3438 return 0; 3439 default: 3440 return cursor_push_byte(cur, c1) && cursor_push_byte(cur, c2); 3441 } 3442 } 3443 3444 static int cursor_push_escaped_char(struct cursor *cur, char c) 3445 { 3446 switch (c) { 3447 case '"': return cursor_push_str(cur, "\\\""); 3448 case '\\': return cursor_push_str(cur, "\\\\"); 3449 case '\b': return cursor_push_str(cur, "\\b"); 3450 case '\f': return cursor_push_str(cur, "\\f"); 3451 case '\n': return cursor_push_str(cur, "\\n"); 3452 case '\r': return cursor_push_str(cur, "\\r"); 3453 case '\t': return cursor_push_str(cur, "\\t"); 3454 // TODO: \u hex hex hex hex 3455 } 3456 return cursor_push_byte(cur, c); 3457 } 3458 3459 static int cursor_push_hex_str(struct cursor *cur, unsigned char *buf, int len) 3460 { 3461 int i; 3462 3463 if (len % 2 != 0) 3464 return 0; 3465 3466 if (!cursor_push_byte(cur, '"')) 3467 return 0; 3468 3469 for (i = 0; i < len; i++) { 3470 unsigned int c = ((const unsigned char *)buf)[i]; 3471 if (!cursor_push_byte(cur, hexchar(c >> 4))) 3472 return 0; 3473 if (!cursor_push_byte(cur, hexchar(c & 0xF))) 3474 return 0; 3475 } 3476 3477 if (!cursor_push_byte(cur, '"')) 3478 return 0; 3479 3480 return 1; 3481 } 3482 3483 static int cursor_push_jsonstr(struct cursor *cur, const char *str) 3484 { 3485 int i; 3486 int len; 3487 3488 len = strlen(str); 3489 3490 if (!cursor_push_byte(cur, '"')) 3491 return 0; 3492 3493 for (i = 0; i < len; i++) { 3494 if (!cursor_push_escaped_char(cur, str[i])) 3495 return 0; 3496 } 3497 3498 if (!cursor_push_byte(cur, '"')) 3499 return 0; 3500 3501 return 1; 3502 } 3503 3504 3505 static inline int cursor_push_json_tag_str(struct cursor *cur, struct ndb_str str) 3506 { 3507 if (str.flag == NDB_PACKED_ID) 3508 return cursor_push_hex_str(cur, str.id, 32); 3509 3510 return cursor_push_jsonstr(cur, str.str); 3511 } 3512 3513 static int cursor_push_json_tag(struct cursor *cur, struct ndb_note *note, 3514 struct ndb_tag *tag) 3515 { 3516 int i; 3517 3518 if (!cursor_push_byte(cur, '[')) 3519 return 0; 3520 3521 for (i = 0; i < tag->count; i++) { 3522 if (!cursor_push_json_tag_str(cur, ndb_tag_str(note, tag, i))) 3523 return 0; 3524 if (i != tag->count-1 && !cursor_push_byte(cur, ',')) 3525 return 0; 3526 } 3527 3528 return cursor_push_byte(cur, ']'); 3529 } 3530 3531 static int cursor_push_json_tags(struct cursor *cur, struct ndb_note *note) 3532 { 3533 int i; 3534 struct ndb_iterator iter, *it = &iter; 3535 ndb_tags_iterate_start(note, it); 3536 3537 if (!cursor_push_byte(cur, '[')) 3538 return 0; 3539 3540 i = 0; 3541 while (ndb_tags_iterate_next(it)) { 3542 if (!cursor_push_json_tag(cur, note, it->tag)) 3543 return 0; 3544 if (i != note->tags.count-1 && !cursor_push_str(cur, ",")) 3545 return 0; 3546 i++; 3547 } 3548 3549 if (!cursor_push_byte(cur, ']')) 3550 return 0; 3551 3552 return 1; 3553 } 3554 3555 static int ndb_event_commitment(struct ndb_note *ev, unsigned char *buf, int buflen) 3556 { 3557 char timebuf[16] = {0}; 3558 char kindbuf[16] = {0}; 3559 char pubkey[65]; 3560 struct cursor cur; 3561 int ok; 3562 3563 if (!hex_encode(ev->pubkey, sizeof(ev->pubkey), pubkey)) 3564 return 0; 3565 3566 make_cursor(buf, buf + buflen, &cur); 3567 3568 // TODO: update in 2106 ... 3569 snprintf(timebuf, sizeof(timebuf), "%d", (uint32_t)ev->created_at); 3570 snprintf(kindbuf, sizeof(kindbuf), "%d", ev->kind); 3571 3572 ok = 3573 cursor_push_str(&cur, "[0,\"") && 3574 cursor_push_str(&cur, pubkey) && 3575 cursor_push_str(&cur, "\",") && 3576 cursor_push_str(&cur, timebuf) && 3577 cursor_push_str(&cur, ",") && 3578 cursor_push_str(&cur, kindbuf) && 3579 cursor_push_str(&cur, ",") && 3580 cursor_push_json_tags(&cur, ev) && 3581 cursor_push_str(&cur, ",") && 3582 cursor_push_jsonstr(&cur, ndb_note_str(ev, &ev->content).str) && 3583 cursor_push_str(&cur, "]"); 3584 3585 if (!ok) 3586 return 0; 3587 3588 return cur.p - cur.start; 3589 } 3590 3591 int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen) { 3592 int len; 3593 3594 if (!(len = ndb_event_commitment(note, buf, buflen))) 3595 return 0; 3596 3597 //fprintf(stderr, "%.*s\n", len, buf); 3598 3599 sha256((struct sha256*)note->id, buf, len); 3600 3601 return 1; 3602 } 3603 3604 int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32], 3605 unsigned char sig[64]) 3606 { 3607 unsigned char aux[32]; 3608 secp256k1_keypair *pair = (secp256k1_keypair*) keypair->pair; 3609 3610 if (!fill_random(aux, sizeof(aux))) 3611 return 0; 3612 3613 secp256k1_context *ctx = 3614 secp256k1_context_create(SECP256K1_CONTEXT_NONE); 3615 3616 return secp256k1_schnorrsig_sign32(ctx, sig, id, pair, aux); 3617 } 3618 3619 int ndb_create_keypair(struct ndb_keypair *kp) 3620 { 3621 secp256k1_keypair *keypair = (secp256k1_keypair*)kp->pair; 3622 secp256k1_xonly_pubkey pubkey; 3623 3624 secp256k1_context *ctx = 3625 secp256k1_context_create(SECP256K1_CONTEXT_NONE);; 3626 3627 /* Try to create a keypair with a valid context, it should only 3628 * fail if the secret key is zero or out of range. */ 3629 if (!secp256k1_keypair_create(ctx, keypair, kp->secret)) 3630 return 0; 3631 3632 if (!secp256k1_keypair_xonly_pub(ctx, &pubkey, NULL, keypair)) 3633 return 0; 3634 3635 /* Serialize the public key. Should always return 1 for a valid public key. */ 3636 return secp256k1_xonly_pubkey_serialize(ctx, kp->pubkey, &pubkey); 3637 } 3638 3639 int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair) 3640 { 3641 if (!hex_decode(secstr, strlen(secstr), keypair->secret, 32)) { 3642 fprintf(stderr, "could not hex decode secret key\n"); 3643 return 0; 3644 } 3645 3646 return ndb_create_keypair(keypair); 3647 } 3648 3649 int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note, 3650 struct ndb_keypair *keypair) 3651 { 3652 int strings_len = builder->strings.p - builder->strings.start; 3653 unsigned char *note_end = builder->note_cur.p + strings_len; 3654 int total_size = note_end - builder->note_cur.start; 3655 3656 // move the strings buffer next to the end of our ndb_note 3657 memmove(builder->note_cur.p, builder->strings.start, strings_len); 3658 3659 // set the strings location 3660 builder->note->strings = builder->note_cur.p - builder->note_cur.start; 3661 3662 // record the total size 3663 //builder->note->size = total_size; 3664 3665 *note = builder->note; 3666 3667 // generate id and sign if we're building this manually 3668 if (keypair) { 3669 // use the remaining memory for building our id buffer 3670 unsigned char *end = builder->mem.end; 3671 unsigned char *start = (unsigned char*)(*note) + total_size; 3672 3673 ndb_builder_set_pubkey(builder, keypair->pubkey); 3674 3675 if (!ndb_calculate_id(builder->note, start, end - start)) 3676 return 0; 3677 3678 if (!ndb_sign_id(keypair, (*note)->id, (*note)->sig)) 3679 return 0; 3680 } 3681 3682 // make sure we're aligned as a whole 3683 total_size = (total_size + 7) & ~7; 3684 assert((total_size % 8) == 0); 3685 return total_size; 3686 } 3687 3688 struct ndb_note * ndb_builder_note(struct ndb_builder *builder) 3689 { 3690 return builder->note; 3691 } 3692 3693 static union ndb_packed_str ndb_offset_str(uint32_t offset) 3694 { 3695 // ensure accidents like -1 don't corrupt our packed_str 3696 union ndb_packed_str str; 3697 // most significant byte is reserved for ndb_packtype 3698 str.offset = offset & 0xFFFFFF; 3699 return str; 3700 } 3701 3702 3703 /// find an existing string via str_indices. these indices only exist in the 3704 /// builder phase just for this purpose. 3705 static inline int ndb_builder_find_str(struct ndb_builder *builder, 3706 const char *str, int len, 3707 union ndb_packed_str *pstr) 3708 { 3709 // find existing matching string to avoid duplicate strings 3710 int indices = cursor_count(&builder->str_indices, sizeof(uint32_t)); 3711 for (int i = 0; i < indices; i++) { 3712 uint32_t index = ((uint32_t*)builder->str_indices.start)[i]; 3713 const char *some_str = (const char*)builder->strings.start + index; 3714 3715 if (!memcmp(some_str, str, len)) { 3716 // found an existing matching str, use that index 3717 *pstr = ndb_offset_str(index); 3718 return 1; 3719 } 3720 } 3721 3722 return 0; 3723 } 3724 3725 static int ndb_builder_push_str(struct ndb_builder *builder, const char *str, 3726 int len, union ndb_packed_str *pstr) 3727 { 3728 uint32_t loc; 3729 3730 // no string found, push a new one 3731 loc = builder->strings.p - builder->strings.start; 3732 if (!(cursor_push(&builder->strings, (unsigned char*)str, len) && 3733 cursor_push_byte(&builder->strings, '\0'))) { 3734 return 0; 3735 } 3736 3737 *pstr = ndb_offset_str(loc); 3738 3739 // record in builder indices. ignore return value, if we can't cache it 3740 // then whatever 3741 cursor_push_u32(&builder->str_indices, loc); 3742 3743 return 1; 3744 } 3745 3746 static int ndb_builder_push_packed_id(struct ndb_builder *builder, 3747 unsigned char *id, 3748 union ndb_packed_str *pstr) 3749 { 3750 // Don't both find id duplicates. very rarely are they duplicated 3751 // and it slows things down quite a bit. If we really care about this 3752 // We can switch to a hash table. 3753 //if (ndb_builder_find_str(builder, (const char*)id, 32, pstr)) { 3754 // pstr->packed.flag = NDB_PACKED_ID; 3755 // return 1; 3756 //} 3757 3758 if (ndb_builder_push_str(builder, (const char*)id, 32, pstr)) { 3759 pstr->packed.flag = NDB_PACKED_ID; 3760 return 1; 3761 } 3762 3763 return 0; 3764 } 3765 3766 union ndb_packed_str ndb_chars_to_packed_str(char c1, char c2) 3767 { 3768 union ndb_packed_str str; 3769 str.packed.flag = NDB_PACKED_STR; 3770 str.packed.str[0] = c1; 3771 str.packed.str[1] = c2; 3772 str.packed.str[2] = '\0'; 3773 return str; 3774 } 3775 3776 static union ndb_packed_str ndb_char_to_packed_str(char c) 3777 { 3778 union ndb_packed_str str; 3779 str.packed.flag = NDB_PACKED_STR; 3780 str.packed.str[0] = c; 3781 str.packed.str[1] = '\0'; 3782 return str; 3783 } 3784 3785 3786 /// Check for small strings to pack 3787 static inline int ndb_builder_try_compact_str(struct ndb_builder *builder, 3788 const char *str, int len, 3789 union ndb_packed_str *pstr, 3790 int pack_ids) 3791 { 3792 unsigned char id_buf[32]; 3793 3794 if (len == 0) { 3795 *pstr = ndb_char_to_packed_str(0); 3796 return 1; 3797 } else if (len == 1) { 3798 *pstr = ndb_char_to_packed_str(str[0]); 3799 return 1; 3800 } else if (len == 2) { 3801 *pstr = ndb_chars_to_packed_str(str[0], str[1]); 3802 return 1; 3803 } else if (pack_ids && len == 64 && hex_decode(str, 64, id_buf, 32)) { 3804 return ndb_builder_push_packed_id(builder, id_buf, pstr); 3805 } 3806 3807 return 0; 3808 } 3809 3810 3811 static int ndb_builder_push_unpacked_str(struct ndb_builder *builder, 3812 const char *str, int len, 3813 union ndb_packed_str *pstr) 3814 { 3815 if (ndb_builder_find_str(builder, str, len, pstr)) 3816 return 1; 3817 3818 return ndb_builder_push_str(builder, str, len, pstr); 3819 } 3820 3821 int ndb_builder_make_str(struct ndb_builder *builder, const char *str, int len, 3822 union ndb_packed_str *pstr, int pack_ids) 3823 { 3824 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 3825 return 1; 3826 3827 return ndb_builder_push_unpacked_str(builder, str, len, pstr); 3828 } 3829 3830 int ndb_builder_set_content(struct ndb_builder *builder, const char *content, 3831 int len) 3832 { 3833 int pack_ids = 0; 3834 builder->note->content_length = len; 3835 return ndb_builder_make_str(builder, content, len, 3836 &builder->note->content, pack_ids); 3837 } 3838 3839 3840 static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len, 3841 const char *s) 3842 { 3843 if (tok->type == JSMN_STRING && (int)strlen(s) == tok_len && 3844 memcmp(json + tok->start, s, tok_len) == 0) { 3845 return 1; 3846 } 3847 return 0; 3848 } 3849 3850 static int ndb_builder_finalize_tag(struct ndb_builder *builder, 3851 union ndb_packed_str offset) 3852 { 3853 if (!cursor_push_u32(&builder->note_cur, offset.offset)) 3854 return 0; 3855 builder->current_tag->count++; 3856 return 1; 3857 } 3858 3859 /// Unescape and push json strings 3860 static int ndb_builder_make_json_str(struct ndb_builder *builder, 3861 const char *str, int len, 3862 union ndb_packed_str *pstr, 3863 int *written, int pack_ids) 3864 { 3865 // let's not care about de-duping these. we should just unescape 3866 // in-place directly into the strings table. 3867 if (written) 3868 *written = len; 3869 3870 const char *p, *end, *start; 3871 unsigned char *builder_start; 3872 3873 // always try compact strings first 3874 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 3875 return 1; 3876 3877 end = str + len; 3878 start = str; // Initialize start to the beginning of the string 3879 3880 *pstr = ndb_offset_str(builder->strings.p - builder->strings.start); 3881 builder_start = builder->strings.p; 3882 3883 for (p = str; p < end; p++) { 3884 if (*p == '\\' && p+1 < end) { 3885 // Push the chunk of unescaped characters before this escape sequence 3886 if (start < p && !cursor_push(&builder->strings, 3887 (unsigned char *)start, 3888 p - start)) { 3889 return 0; 3890 } 3891 3892 if (!cursor_push_unescaped_char(&builder->strings, *p, *(p+1))) 3893 return 0; 3894 3895 p++; // Skip the character following the backslash 3896 start = p + 1; // Update the start pointer to the next character 3897 } 3898 } 3899 3900 // Handle the last chunk after the last escape sequence (or if there are no escape sequences at all) 3901 if (start < p && !cursor_push(&builder->strings, (unsigned char *)start, 3902 p - start)) { 3903 return 0; 3904 } 3905 3906 if (written) 3907 *written = builder->strings.p - builder_start; 3908 3909 // TODO: dedupe these!? 3910 return cursor_push_byte(&builder->strings, '\0'); 3911 } 3912 3913 static int ndb_builder_push_json_tag(struct ndb_builder *builder, 3914 const char *str, int len) 3915 { 3916 union ndb_packed_str pstr; 3917 int pack_ids = 1; 3918 if (!ndb_builder_make_json_str(builder, str, len, &pstr, NULL, pack_ids)) 3919 return 0; 3920 return ndb_builder_finalize_tag(builder, pstr); 3921 } 3922 3923 // Push a json array into an ndb tag ["p", "abcd..."] -> struct ndb_tag 3924 static int ndb_builder_tag_from_json_array(struct ndb_json_parser *p, 3925 jsmntok_t *array) 3926 { 3927 jsmntok_t *str_tok; 3928 const char *str; 3929 3930 if (array->size == 0) 3931 return 0; 3932 3933 if (!ndb_builder_new_tag(&p->builder)) 3934 return 0; 3935 3936 for (int i = 0; i < array->size; i++) { 3937 str_tok = &array[i+1]; 3938 str = p->json + str_tok->start; 3939 3940 if (!ndb_builder_push_json_tag(&p->builder, str, 3941 toksize(str_tok))) { 3942 return 0; 3943 } 3944 } 3945 3946 return 1; 3947 } 3948 3949 // Push json tags into ndb data 3950 // [["t", "hashtag"], ["p", "abcde..."]] -> struct ndb_tags 3951 static inline int ndb_builder_process_json_tags(struct ndb_json_parser *p, 3952 jsmntok_t *array) 3953 { 3954 jsmntok_t *tag = array; 3955 3956 if (array->size == 0) 3957 return 1; 3958 3959 for (int i = 0; i < array->size; i++) { 3960 if (!ndb_builder_tag_from_json_array(p, &tag[i+1])) 3961 return 0; 3962 tag += tag[i+1].size; 3963 } 3964 3965 return 1; 3966 } 3967 3968 static int parse_unsigned_int(const char *start, int len, unsigned int *num) 3969 { 3970 unsigned int number = 0; 3971 const char *p = start, *end = start + len; 3972 int digits = 0; 3973 3974 while (p < end) { 3975 char c = *p; 3976 3977 if (c < '0' || c > '9') 3978 break; 3979 3980 // Check for overflow 3981 char digit = c - '0'; 3982 if (number > (UINT_MAX - digit) / 10) 3983 return 0; // Overflow detected 3984 3985 number = number * 10 + digit; 3986 3987 p++; 3988 digits++; 3989 } 3990 3991 if (digits == 0) 3992 return 0; 3993 3994 *num = number; 3995 return 1; 3996 } 3997 3998 int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce, 3999 unsigned char *buf, int bufsize, struct ndb_id_cb *cb) 4000 { 4001 jsmntok_t *tok = NULL; 4002 int tok_len, res; 4003 struct ndb_json_parser parser; 4004 4005 ndb_json_parser_init(&parser, json, len, buf, bufsize); 4006 4007 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 4008 return res; 4009 4010 if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY) 4011 return 0; 4012 4013 parser.i = 1; 4014 tok = &parser.toks[parser.i++]; 4015 tok_len = toksize(tok); 4016 if (tok->type != JSMN_STRING) 4017 return 0; 4018 4019 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 4020 fce->evtype = NDB_FCE_EVENT; 4021 struct ndb_event *ev = &fce->event; 4022 return ndb_parse_json_note(&parser, &ev->note); 4023 } 4024 4025 return 0; 4026 } 4027 4028 4029 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, 4030 unsigned char *buf, int bufsize, 4031 struct ndb_id_cb *cb) 4032 { 4033 jsmntok_t *tok = NULL; 4034 int tok_len, res; 4035 struct ndb_json_parser parser; 4036 4037 tce->subid_len = 0; 4038 tce->subid = ""; 4039 4040 ndb_json_parser_init(&parser, json, len, buf, bufsize); 4041 4042 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 4043 return res; 4044 4045 if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY) 4046 return 0; 4047 4048 parser.i = 1; 4049 tok = &parser.toks[parser.i++]; 4050 tok_len = toksize(tok); 4051 if (tok->type != JSMN_STRING) 4052 return 0; 4053 4054 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 4055 tce->evtype = NDB_TCE_EVENT; 4056 struct ndb_event *ev = &tce->event; 4057 4058 tok = &parser.toks[parser.i++]; 4059 if (tok->type != JSMN_STRING) 4060 return 0; 4061 4062 tce->subid = json + tok->start; 4063 tce->subid_len = toksize(tok); 4064 4065 return ndb_parse_json_note(&parser, &ev->note); 4066 } else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) { 4067 tce->evtype = NDB_TCE_EOSE; 4068 4069 tok = &parser.toks[parser.i++]; 4070 if (tok->type != JSMN_STRING) 4071 return 0; 4072 4073 tce->subid = json + tok->start; 4074 tce->subid_len = toksize(tok); 4075 return 1; 4076 } else if (tok_len == 2 && !memcmp("OK", json + tok->start, 2)) { 4077 if (parser.num_tokens != 5) 4078 return 0; 4079 4080 struct ndb_command_result *cr = &tce->command_result; 4081 4082 tce->evtype = NDB_TCE_OK; 4083 4084 tok = &parser.toks[parser.i++]; 4085 if (tok->type != JSMN_STRING) 4086 return 0; 4087 4088 tce->subid = json + tok->start; 4089 tce->subid_len = toksize(tok); 4090 4091 tok = &parser.toks[parser.i++]; 4092 if (tok->type != JSMN_PRIMITIVE || toksize(tok) == 0) 4093 return 0; 4094 4095 cr->ok = (json + tok->start)[0] == 't'; 4096 4097 tok = &parser.toks[parser.i++]; 4098 if (tok->type != JSMN_STRING) 4099 return 0; 4100 4101 tce->command_result.msg = json + tok->start; 4102 tce->command_result.msglen = toksize(tok); 4103 4104 return 1; 4105 } 4106 4107 return 0; 4108 } 4109 4110 int ndb_parse_json_note(struct ndb_json_parser *parser, struct ndb_note **note) 4111 { 4112 jsmntok_t *tok = NULL; 4113 unsigned char hexbuf[64]; 4114 const char *json = parser->json; 4115 const char *start; 4116 int i, tok_len, parsed; 4117 4118 parsed = 0; 4119 4120 if (parser->toks[parser->i].type != JSMN_OBJECT) 4121 return 0; 4122 4123 // TODO: build id buffer and verify at end 4124 4125 for (i = parser->i + 1; i < parser->num_tokens; i++) { 4126 tok = &parser->toks[i]; 4127 start = json + tok->start; 4128 tok_len = toksize(tok); 4129 4130 //printf("toplevel %.*s %d\n", tok_len, json + tok->start, tok->type); 4131 if (tok_len == 0 || i + 1 >= parser->num_tokens) 4132 continue; 4133 4134 if (start[0] == 'p' && jsoneq(json, tok, tok_len, "pubkey")) { 4135 // pubkey 4136 tok = &parser->toks[i+1]; 4137 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 4138 parsed |= NDB_PARSED_PUBKEY; 4139 ndb_builder_set_pubkey(&parser->builder, hexbuf); 4140 } else if (tok_len == 2 && start[0] == 'i' && start[1] == 'd') { 4141 // id 4142 tok = &parser->toks[i+1]; 4143 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 4144 parsed |= NDB_PARSED_ID; 4145 ndb_builder_set_id(&parser->builder, hexbuf); 4146 } else if (tok_len == 3 && start[0] == 's' && start[1] == 'i' && start[2] == 'g') { 4147 // sig 4148 tok = &parser->toks[i+1]; 4149 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 4150 parsed |= NDB_PARSED_SIG; 4151 ndb_builder_set_sig(&parser->builder, hexbuf); 4152 } else if (start[0] == 'k' && jsoneq(json, tok, tok_len, "kind")) { 4153 // kind 4154 tok = &parser->toks[i+1]; 4155 start = json + tok->start; 4156 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 4157 return 0; 4158 if (!parse_unsigned_int(start, toksize(tok), 4159 &parser->builder.note->kind)) 4160 return 0; 4161 parsed |= NDB_PARSED_KIND; 4162 } else if (start[0] == 'c') { 4163 if (jsoneq(json, tok, tok_len, "created_at")) { 4164 // created_at 4165 tok = &parser->toks[i+1]; 4166 start = json + tok->start; 4167 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 4168 return 0; 4169 // TODO: update to int64 in 2106 ... xD 4170 unsigned int bigi; 4171 if (!parse_unsigned_int(start, toksize(tok), &bigi)) 4172 return 0; 4173 parser->builder.note->created_at = bigi; 4174 parsed |= NDB_PARSED_CREATED_AT; 4175 } else if (jsoneq(json, tok, tok_len, "content")) { 4176 // content 4177 tok = &parser->toks[i+1]; 4178 union ndb_packed_str pstr; 4179 tok_len = toksize(tok); 4180 int written, pack_ids = 0; 4181 if (!ndb_builder_make_json_str(&parser->builder, 4182 json + tok->start, 4183 tok_len, &pstr, 4184 &written, pack_ids)) { 4185 ndb_debug("ndb_builder_make_json_str failed\n"); 4186 return 0; 4187 } 4188 parser->builder.note->content_length = written; 4189 parser->builder.note->content = pstr; 4190 parsed |= NDB_PARSED_CONTENT; 4191 } 4192 } else if (start[0] == 't' && jsoneq(json, tok, tok_len, "tags")) { 4193 tok = &parser->toks[i+1]; 4194 ndb_builder_process_json_tags(parser, tok); 4195 i += tok->size; 4196 parsed |= NDB_PARSED_TAGS; 4197 } 4198 } 4199 4200 //ndb_debug("parsed %d = %d, &->%d", parsed, NDB_PARSED_ALL, parsed & NDB_PARSED_ALL); 4201 if (parsed != NDB_PARSED_ALL) 4202 return 0; 4203 4204 return ndb_builder_finalize(&parser->builder, note, NULL); 4205 } 4206 4207 int ndb_note_from_json(const char *json, int len, struct ndb_note **note, 4208 unsigned char *buf, int bufsize) 4209 { 4210 struct ndb_json_parser parser; 4211 int res; 4212 4213 ndb_json_parser_init(&parser, json, len, buf, bufsize); 4214 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 4215 return res; 4216 4217 if (parser.num_tokens < 1) 4218 return 0; 4219 4220 return ndb_parse_json_note(&parser, note); 4221 } 4222 4223 void ndb_builder_set_pubkey(struct ndb_builder *builder, unsigned char *pubkey) 4224 { 4225 memcpy(builder->note->pubkey, pubkey, 32); 4226 } 4227 4228 void ndb_builder_set_id(struct ndb_builder *builder, unsigned char *id) 4229 { 4230 memcpy(builder->note->id, id, 32); 4231 } 4232 4233 void ndb_builder_set_sig(struct ndb_builder *builder, unsigned char *sig) 4234 { 4235 memcpy(builder->note->sig, sig, 64); 4236 } 4237 4238 void ndb_builder_set_kind(struct ndb_builder *builder, uint32_t kind) 4239 { 4240 builder->note->kind = kind; 4241 } 4242 4243 void ndb_builder_set_created_at(struct ndb_builder *builder, uint64_t created_at) 4244 { 4245 builder->note->created_at = created_at; 4246 } 4247 4248 int ndb_builder_new_tag(struct ndb_builder *builder) 4249 { 4250 builder->note->tags.count++; 4251 struct ndb_tag tag = {0}; 4252 builder->current_tag = (struct ndb_tag *)builder->note_cur.p; 4253 return cursor_push_tag(&builder->note_cur, &tag); 4254 } 4255 4256 void ndb_stat_counts_init(struct ndb_stat_counts *counts) 4257 { 4258 counts->count = 0; 4259 counts->key_size = 0; 4260 counts->value_size = 0; 4261 } 4262 4263 static void ndb_stat_init(struct ndb_stat *stat) 4264 { 4265 // init stats 4266 int i; 4267 4268 for (i = 0; i < NDB_CKIND_COUNT; i++) { 4269 ndb_stat_counts_init(&stat->common_kinds[i]); 4270 } 4271 4272 for (i = 0; i < NDB_DBS; i++) { 4273 ndb_stat_counts_init(&stat->dbs[i]); 4274 } 4275 4276 ndb_stat_counts_init(&stat->other_kinds); 4277 } 4278 4279 int ndb_stat(struct ndb *ndb, struct ndb_stat *stat) 4280 { 4281 int rc; 4282 MDB_cursor *cur; 4283 MDB_val k, v; 4284 MDB_dbi db; 4285 struct ndb_txn txn; 4286 struct ndb_note *note; 4287 int i; 4288 enum ndb_common_kind common_kind; 4289 4290 // initialize to 0 4291 ndb_stat_init(stat); 4292 4293 if (!ndb_begin_query(ndb, &txn)) { 4294 fprintf(stderr, "ndb_stat failed at ndb_begin_query\n"); 4295 return 0; 4296 } 4297 4298 // stat each dbi in the database 4299 for (i = 0; i < NDB_DBS; i++) 4300 { 4301 db = ndb->lmdb.dbs[i]; 4302 4303 if ((rc = mdb_cursor_open(txn.mdb_txn, db, &cur))) { 4304 fprintf(stderr, "ndb_stat: mdb_cursor_open failed, error '%s'\n", 4305 mdb_strerror(rc)); 4306 return 0; 4307 } 4308 4309 // loop over every entry and count kv sizes 4310 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 4311 // we gather more detailed per-kind stats if we're in 4312 // the notes db 4313 if (i == NDB_DB_NOTE) { 4314 note = v.mv_data; 4315 common_kind = ndb_kind_to_common_kind(note->kind); 4316 4317 // uncommon kind? just count them in bulk 4318 if ((int)common_kind == -1) { 4319 stat->other_kinds.count++; 4320 stat->other_kinds.key_size += k.mv_size; 4321 stat->other_kinds.value_size += v.mv_size; 4322 } else { 4323 stat->common_kinds[common_kind].count++; 4324 stat->common_kinds[common_kind].key_size += k.mv_size; 4325 stat->common_kinds[common_kind].value_size += v.mv_size; 4326 } 4327 } 4328 4329 stat->dbs[i].count++; 4330 stat->dbs[i].key_size += k.mv_size; 4331 stat->dbs[i].value_size += v.mv_size; 4332 } 4333 4334 // close the cursor, they are per-dbi 4335 mdb_cursor_close(cur); 4336 } 4337 4338 ndb_end_query(&txn); 4339 4340 return 1; 4341 } 4342 4343 /// Push an element to the current tag 4344 /// 4345 /// Basic idea is to call ndb_builder_new_tag 4346 inline int ndb_builder_push_tag_str(struct ndb_builder *builder, 4347 const char *str, int len) 4348 { 4349 union ndb_packed_str pstr; 4350 int pack_ids = 1; 4351 if (!ndb_builder_make_str(builder, str, len, &pstr, pack_ids)) 4352 return 0; 4353 return ndb_builder_finalize_tag(builder, pstr); 4354 } 4355 4356 // 4357 // CONFIG 4358 // 4359 void ndb_default_config(struct ndb_config *config) 4360 { 4361 int cores = get_cpu_cores(); 4362 config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB 4363 config->ingester_threads = cores == -1 ? 4 : cores; 4364 config->flags = 0; 4365 config->ingest_filter = NULL; 4366 config->filter_context = NULL; 4367 } 4368 4369 void ndb_config_set_ingest_threads(struct ndb_config *config, int threads) 4370 { 4371 config->ingester_threads = threads; 4372 } 4373 4374 void ndb_config_set_flags(struct ndb_config *config, int flags) 4375 { 4376 config->flags = flags; 4377 } 4378 4379 void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize) 4380 { 4381 config->mapsize = mapsize; 4382 } 4383 4384 void ndb_config_set_ingest_filter(struct ndb_config *config, 4385 ndb_ingest_filter_fn fn, void *filter_ctx) 4386 { 4387 config->ingest_filter = fn; 4388 config->filter_context = filter_ctx; 4389 } 4390 4391 // used by ndb.c 4392 int ndb_print_search_keys(struct ndb_txn *txn) 4393 { 4394 MDB_cursor *cur; 4395 MDB_val k, v; 4396 int i; 4397 struct ndb_text_search_key search_key; 4398 4399 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TEXT], &cur)) 4400 return 0; 4401 4402 i = 1; 4403 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 4404 if (!ndb_unpack_text_search_key(k.mv_data, k.mv_size, &search_key)) { 4405 fprintf(stderr, "error decoding key %d\n", i); 4406 continue; 4407 } 4408 4409 ndb_print_text_search_key(&search_key); 4410 printf("\n"); 4411 4412 i++; 4413 } 4414 4415 return 1; 4416 } 4417 4418 struct ndb_tags *ndb_note_tags(struct ndb_note *note) 4419 { 4420 return ¬e->tags; 4421 } 4422 4423 struct ndb_str ndb_note_str(struct ndb_note *note, union ndb_packed_str *pstr) 4424 { 4425 struct ndb_str str; 4426 str.flag = pstr->packed.flag; 4427 4428 if (str.flag == NDB_PACKED_STR) { 4429 str.str = pstr->packed.str; 4430 return str; 4431 } 4432 4433 str.str = ((const char *)note) + note->strings + (pstr->offset & 0xFFFFFF); 4434 return str; 4435 } 4436 4437 struct ndb_str ndb_tag_str(struct ndb_note *note, struct ndb_tag *tag, int ind) 4438 { 4439 return ndb_note_str(note, &tag->strs[ind]); 4440 } 4441 4442 struct ndb_str ndb_iter_tag_str(struct ndb_iterator *iter, int ind) 4443 { 4444 return ndb_tag_str(iter->note, iter->tag, ind); 4445 } 4446 4447 unsigned char * ndb_note_id(struct ndb_note *note) 4448 { 4449 return note->id; 4450 } 4451 4452 unsigned char * ndb_note_pubkey(struct ndb_note *note) 4453 { 4454 return note->pubkey; 4455 } 4456 4457 unsigned char * ndb_note_sig(struct ndb_note *note) 4458 { 4459 return note->sig; 4460 } 4461 4462 uint32_t ndb_note_created_at(struct ndb_note *note) 4463 { 4464 return note->created_at; 4465 } 4466 4467 uint32_t ndb_note_kind(struct ndb_note *note) 4468 { 4469 return note->kind; 4470 } 4471 4472 void _ndb_note_set_kind(struct ndb_note *note, uint32_t kind) 4473 { 4474 note->kind = kind; 4475 } 4476 4477 const char *ndb_note_content(struct ndb_note *note) 4478 { 4479 return ndb_note_str(note, ¬e->content).str; 4480 } 4481 4482 uint32_t ndb_note_content_length(struct ndb_note *note) 4483 { 4484 return note->content_length; 4485 } 4486 4487 struct ndb_note * ndb_note_from_bytes(unsigned char *bytes) 4488 { 4489 struct ndb_note *note = (struct ndb_note *)bytes; 4490 if (note->version != 1) 4491 return 0; 4492 return note; 4493 } 4494 4495 void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter) 4496 { 4497 iter->note = note; 4498 iter->tag = NULL; 4499 iter->index = -1; 4500 } 4501 4502 int ndb_tags_iterate_next(struct ndb_iterator *iter) 4503 { 4504 if (iter->tag == NULL || iter->index == -1) { 4505 iter->tag = iter->note->tags.tag; 4506 iter->index = 0; 4507 return iter->note->tags.count != 0; 4508 } 4509 4510 struct ndb_tags *tags = &iter->note->tags; 4511 4512 if (++iter->index < tags->count) { 4513 uint32_t tag_data_size = iter->tag->count * sizeof(iter->tag->strs[0]); 4514 iter->tag = (struct ndb_tag *)(iter->tag->strs[0].bytes + tag_data_size); 4515 return 1; 4516 } 4517 4518 return 0; 4519 } 4520 4521 uint16_t ndb_tags_count(struct ndb_tags *tags) 4522 { 4523 return tags->count; 4524 } 4525 4526 uint16_t ndb_tag_count(struct ndb_tag *tags) 4527 { 4528 return tags->count; 4529 } 4530 4531 enum ndb_common_kind ndb_kind_to_common_kind(int kind) 4532 { 4533 switch (kind) 4534 { 4535 case 0: return NDB_CKIND_PROFILE; 4536 case 1: return NDB_CKIND_TEXT; 4537 case 3: return NDB_CKIND_CONTACTS; 4538 case 4: return NDB_CKIND_DM; 4539 case 5: return NDB_CKIND_DELETE; 4540 case 6: return NDB_CKIND_REPOST; 4541 case 7: return NDB_CKIND_REACTION; 4542 case 9735: return NDB_CKIND_ZAP; 4543 case 9734: return NDB_CKIND_ZAP_REQUEST; 4544 case 23194: return NDB_CKIND_NWC_REQUEST; 4545 case 23195: return NDB_CKIND_NWC_RESPONSE; 4546 case 27235: return NDB_CKIND_HTTP_AUTH; 4547 case 30000: return NDB_CKIND_LIST; 4548 case 30023: return NDB_CKIND_LONGFORM; 4549 case 30315: return NDB_CKIND_STATUS; 4550 } 4551 4552 return -1; 4553 } 4554 4555 const char *ndb_kind_name(enum ndb_common_kind ck) 4556 { 4557 switch (ck) { 4558 case NDB_CKIND_PROFILE: return "profile"; 4559 case NDB_CKIND_TEXT: return "text"; 4560 case NDB_CKIND_CONTACTS: return "contacts"; 4561 case NDB_CKIND_DM: return "dm"; 4562 case NDB_CKIND_DELETE: return "delete"; 4563 case NDB_CKIND_REPOST: return "repost"; 4564 case NDB_CKIND_REACTION: return "reaction"; 4565 case NDB_CKIND_ZAP: return "zap"; 4566 case NDB_CKIND_ZAP_REQUEST: return "zap_request"; 4567 case NDB_CKIND_NWC_REQUEST: return "nwc_request"; 4568 case NDB_CKIND_NWC_RESPONSE: return "nwc_response"; 4569 case NDB_CKIND_HTTP_AUTH: return "http_auth"; 4570 case NDB_CKIND_LIST: return "list"; 4571 case NDB_CKIND_LONGFORM: return "longform"; 4572 case NDB_CKIND_STATUS: return "status"; 4573 case NDB_CKIND_COUNT: return "unknown"; 4574 } 4575 4576 return "unknown"; 4577 } 4578 4579 const char *ndb_db_name(enum ndb_dbs db) 4580 { 4581 switch (db) { 4582 case NDB_DB_NOTE: 4583 return "note"; 4584 case NDB_DB_META: 4585 return "note_metadata"; 4586 case NDB_DB_PROFILE: 4587 return "profile"; 4588 case NDB_DB_NOTE_ID: 4589 return "note_index"; 4590 case NDB_DB_PROFILE_PK: 4591 return "profile_pubkey_index"; 4592 case NDB_DB_NDB_META: 4593 return "nostrdb_metadata"; 4594 case NDB_DB_PROFILE_SEARCH: 4595 return "profile_search"; 4596 case NDB_DB_PROFILE_LAST_FETCH: 4597 return "profile_last_fetch"; 4598 case NDB_DB_NOTE_KIND: 4599 return "note_kind_index"; 4600 case NDB_DB_NOTE_TEXT: 4601 return "note_fulltext"; 4602 case NDB_DBS: 4603 return "count"; 4604 } 4605 4606 return "unknown"; 4607 }