nostrdb.c (159221B)
1 2 #include "nostrdb.h" 3 #include "jsmn.h" 4 #include "hex.h" 5 #include "cursor.h" 6 #include "random.h" 7 #include "ccan/crypto/sha256/sha256.h" 8 #include "bolt11/bolt11.h" 9 #include "bolt11/amount.h" 10 #include "lmdb.h" 11 #include "util.h" 12 #include "cpu.h" 13 #include "block.h" 14 #include "threadpool.h" 15 #include "protected_queue.h" 16 #include "memchr.h" 17 #include "print_util.h" 18 #include <stdlib.h> 19 #include <limits.h> 20 #include <assert.h> 21 22 #include "bindings/c/profile_json_parser.h" 23 #include "bindings/c/profile_builder.h" 24 #include "bindings/c/meta_builder.h" 25 #include "bindings/c/meta_reader.h" 26 #include "bindings/c/profile_verifier.h" 27 #include "secp256k1.h" 28 #include "secp256k1_ecdh.h" 29 #include "secp256k1_schnorrsig.h" 30 31 #define max(a,b) ((a) > (b) ? (a) : (b)) 32 #define min(a,b) ((a) < (b) ? (a) : (b)) 33 34 // the maximum number of things threads pop and push in bulk 35 static const int THREAD_QUEUE_BATCH = 4096; 36 37 // maximum number of active subscriptions 38 #define MAX_SUBSCRIPTIONS 256 39 #define MAX_SCAN_CURSORS 12 40 #define MAX_FILTERS 16 41 42 // the maximum size of inbox queues 43 static const int DEFAULT_QUEUE_SIZE = 1000000; 44 45 46 // increase if we need bigger filters 47 #define NDB_FILTER_PAGES 64 48 49 #define ndb_flag_set(flags, f) ((flags & f) == f) 50 51 #define NDB_PARSED_ID (1 << 0) 52 #define NDB_PARSED_PUBKEY (1 << 1) 53 #define NDB_PARSED_SIG (1 << 2) 54 #define NDB_PARSED_CREATED_AT (1 << 3) 55 #define NDB_PARSED_KIND (1 << 4) 56 #define NDB_PARSED_CONTENT (1 << 5) 57 #define NDB_PARSED_TAGS (1 << 6) 58 #define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS) 59 60 typedef int (*ndb_migrate_fn)(struct ndb *); 61 typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len, 62 int word_index); 63 64 // these must be byte-aligned, they are directly accessing the serialized data 65 // representation 66 #pragma pack(push, 1) 67 68 union ndb_packed_str { 69 struct { 70 char str[3]; 71 // we assume little endian everywhere. sorry not sorry. 72 unsigned char flag; // NDB_PACKED_STR, etc 73 } packed; 74 75 uint32_t offset; 76 unsigned char bytes[4]; 77 }; 78 79 struct ndb_tag { 80 uint16_t count; 81 union ndb_packed_str strs[0]; 82 }; 83 84 struct ndb_tags { 85 uint16_t padding; 86 uint16_t count; 87 struct ndb_tag tag[0]; 88 }; 89 90 // v1 91 struct ndb_note { 92 unsigned char version; // v=1 93 unsigned char padding[3]; // keep things aligned 94 unsigned char id[32]; 95 unsigned char pubkey[32]; 96 unsigned char sig[64]; 97 98 uint64_t created_at; 99 uint32_t kind; 100 uint32_t content_length; 101 union ndb_packed_str content; 102 uint32_t strings; 103 // nothing can come after tags since it contains variadic data 104 struct ndb_tags tags; 105 }; 106 107 #pragma pack(pop) 108 109 110 struct ndb_migration { 111 ndb_migrate_fn fn; 112 }; 113 114 struct ndb_profile_record_builder { 115 flatcc_builder_t *builder; 116 void *flatbuf; 117 }; 118 119 // controls whether to continue or stop the json parser 120 enum ndb_idres { 121 NDB_IDRES_CONT, 122 NDB_IDRES_STOP, 123 }; 124 125 // closure data for the id-detecting ingest controller 126 struct ndb_ingest_controller 127 { 128 MDB_txn *read_txn; 129 struct ndb_lmdb *lmdb; 130 }; 131 132 enum ndb_writer_msgtype { 133 NDB_WRITER_QUIT, // kill thread immediately 134 NDB_WRITER_NOTE, // write a note to the db 135 NDB_WRITER_PROFILE, // write a profile to the db 136 NDB_WRITER_DBMETA, // write ndb metadata 137 NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched 138 NDB_WRITER_BLOCKS, // write parsed note blocks 139 }; 140 141 // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) 142 enum ndb_meta_key { 143 NDB_META_KEY_VERSION = 1 144 }; 145 146 struct ndb_json_parser { 147 const char *json; 148 int json_len; 149 struct ndb_builder builder; 150 jsmn_parser json_parser; 151 jsmntok_t *toks, *toks_end; 152 int i; 153 int num_tokens; 154 }; 155 156 // useful to pass to threads on its own 157 struct ndb_lmdb { 158 MDB_env *env; 159 MDB_dbi dbs[NDB_DBS]; 160 }; 161 162 struct ndb_writer { 163 struct ndb_lmdb *lmdb; 164 struct ndb_monitor *monitor; 165 166 void *queue_buf; 167 int queue_buflen; 168 pthread_t thread_id; 169 170 struct prot_queue inbox; 171 }; 172 173 struct ndb_ingester { 174 uint32_t flags; 175 struct threadpool tp; 176 struct ndb_writer *writer; 177 void *filter_context; 178 ndb_ingest_filter_fn filter; 179 }; 180 181 struct ndb_filter_group { 182 struct ndb_filter filters[MAX_FILTERS]; 183 int num_filters; 184 }; 185 186 struct ndb_subscription { 187 uint64_t subid; 188 struct ndb_filter_group group; 189 struct prot_queue inbox; 190 }; 191 192 struct ndb_monitor { 193 struct ndb_subscription subscriptions[MAX_SUBSCRIPTIONS]; 194 ndb_sub_fn sub_cb; 195 void *sub_cb_ctx; 196 int num_subscriptions; 197 }; 198 199 struct ndb { 200 struct ndb_lmdb lmdb; 201 struct ndb_ingester ingester; 202 struct ndb_monitor monitor; 203 struct ndb_writer writer; 204 int version; 205 uint32_t flags; // setting flags 206 // lmdb environ handles, etc 207 }; 208 209 /// 210 /// Query Plans 211 /// 212 /// There are general strategies for performing certain types of query 213 /// depending on the filter. For example, for large contact list queries 214 /// with many authors, we simply do a descending scan on created_at 215 /// instead of doing 1000s of pubkey scans. 216 /// 217 /// Query plans are calculated from filters via `ndb_filter_plan` 218 /// 219 enum ndb_query_plan { 220 NDB_PLAN_KINDS, 221 NDB_PLAN_IDS, 222 NDB_PLAN_AUTHORS, 223 NDB_PLAN_CREATED, 224 NDB_PLAN_TAGS, 225 }; 226 227 // A clustered key with an id and a timestamp 228 struct ndb_tsid { 229 unsigned char id[32]; 230 uint64_t timestamp; 231 }; 232 233 // A u64 + timestamp id. Just using this for kinds at the moment. 234 struct ndb_u64_tsid { 235 uint64_t u64; // kind, etc 236 uint64_t timestamp; 237 }; 238 239 struct ndb_word 240 { 241 const char *word; 242 int word_len; 243 }; 244 245 struct ndb_search_words 246 { 247 struct ndb_word words[MAX_TEXT_SEARCH_WORDS]; 248 int num_words; 249 }; 250 251 // ndb_text_search_key 252 // 253 // This is compressed when in lmdb: 254 // 255 // note_id: varint 256 // strlen: varint 257 // str: cstr 258 // timestamp: varint 259 // word_index: varint 260 // 261 static int ndb_make_text_search_key(unsigned char *buf, int bufsize, 262 int word_index, int word_len, const char *str, 263 uint64_t timestamp, uint64_t note_id, 264 int *keysize) 265 { 266 struct cursor cur; 267 make_cursor(buf, buf + bufsize, &cur); 268 269 // TODO: need update this to uint64_t 270 // we push this first because our query function can pull this off 271 // quickly to check matches 272 if (!cursor_push_varint(&cur, (int32_t)note_id)) 273 return 0; 274 275 // string length 276 if (!cursor_push_varint(&cur, word_len)) 277 return 0; 278 279 // non-null terminated, lowercase string 280 if (!cursor_push_lowercase(&cur, str, word_len)) 281 return 0; 282 283 // TODO: need update this to uint64_t 284 if (!cursor_push_varint(&cur, (int)timestamp)) 285 return 0; 286 287 // the index of the word in the content so that we can do more accurate 288 // phrase searches 289 if (!cursor_push_varint(&cur, word_index)) 290 return 0; 291 292 // pad to 8-byte alignment 293 if (!cursor_align(&cur, 8)) 294 return 0; 295 296 *keysize = cur.p - cur.start; 297 assert((*keysize % 8) == 0); 298 299 return 1; 300 } 301 302 static int ndb_make_noted_text_search_key(unsigned char *buf, int bufsize, 303 int wordlen, const char *word, 304 uint64_t timestamp, uint64_t note_id, 305 int *keysize) 306 { 307 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 308 timestamp, note_id, keysize); 309 } 310 311 static int ndb_make_text_search_key_low(unsigned char *buf, int bufsize, 312 int wordlen, const char *word, 313 int *keysize) 314 { 315 uint64_t timestamp, note_id; 316 timestamp = 0; 317 note_id = 0; 318 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 319 timestamp, note_id, keysize); 320 } 321 322 static int ndb_make_text_search_key_high(unsigned char *buf, int bufsize, 323 int wordlen, const char *word, 324 int *keysize) 325 { 326 uint64_t timestamp, note_id; 327 timestamp = INT32_MAX; 328 note_id = INT32_MAX; 329 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 330 timestamp, note_id, keysize); 331 } 332 333 typedef int (*ndb_text_search_key_order_fn)(unsigned char *buf, int bufsize, int wordlen, const char *word, int *keysize); 334 335 /** From LMDB: Compare two items lexically */ 336 static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) { 337 int diff; 338 ssize_t len_diff; 339 unsigned int len; 340 341 len = a->mv_size; 342 len_diff = (ssize_t) a->mv_size - (ssize_t) b->mv_size; 343 if (len_diff > 0) { 344 len = b->mv_size; 345 len_diff = 1; 346 } 347 348 diff = memcmp(a->mv_data, b->mv_data, len); 349 return diff ? diff : len_diff<0 ? -1 : len_diff; 350 } 351 352 static int ndb_tag_key_compare(const MDB_val *a, const MDB_val *b) 353 { 354 MDB_val va, vb; 355 uint64_t ts_a, ts_b; 356 int cmp; 357 358 va.mv_data = a->mv_data; 359 va.mv_size = a->mv_size - 8; 360 361 vb.mv_data = b->mv_data; 362 vb.mv_size = b->mv_size - 8; 363 364 if ((cmp = mdb_cmp_memn(&va, &vb))) 365 return cmp; 366 367 ts_a = *(uint64_t*)(va.mv_data + va.mv_size); 368 ts_b = *(uint64_t*)(vb.mv_data + vb.mv_size); 369 370 if (ts_a < ts_b) 371 return -1; 372 else if (ts_a > ts_b) 373 return 1; 374 375 return 0; 376 } 377 378 static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b) 379 { 380 struct cursor ca, cb; 381 uint64_t sa, sb, nid_a, nid_b; 382 MDB_val a2, b2; 383 384 make_cursor(a->mv_data, a->mv_data + a->mv_size, &ca); 385 make_cursor(b->mv_data, b->mv_data + b->mv_size, &cb); 386 387 // note_id 388 if (unlikely(!cursor_pull_varint(&ca, &nid_a) || !cursor_pull_varint(&cb, &nid_b))) 389 return 0; 390 391 // string size 392 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 393 return 0; 394 395 a2.mv_data = ca.p; 396 a2.mv_size = sa; 397 398 b2.mv_data = cb.p; 399 b2.mv_size = sb; 400 401 int cmp = mdb_cmp_memn(&a2, &b2); 402 if (cmp) return cmp; 403 404 // skip over string 405 ca.p += sa; 406 cb.p += sb; 407 408 // timestamp 409 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 410 return 0; 411 412 if (sa < sb) return -1; 413 else if (sa > sb) return 1; 414 415 // note_id 416 if (nid_a < nid_b) return -1; 417 else if (nid_a > nid_b) return 1; 418 419 // word index 420 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 421 return 0; 422 423 if (sa < sb) return -1; 424 else if (sa > sb) return 1; 425 426 return 0; 427 } 428 429 static inline int ndb_unpack_text_search_key_noteid( 430 struct cursor *cur, uint64_t *note_id) 431 { 432 if (!cursor_pull_varint(cur, note_id)) 433 return 0; 434 435 return 1; 436 } 437 438 // faster peek of just the string instead of unpacking everything 439 // this is used to quickly discard range query matches if there is no 440 // common prefix 441 static inline int ndb_unpack_text_search_key_string(struct cursor *cur, 442 const char **str, 443 int *str_len) 444 { 445 uint64_t len; 446 447 if (!cursor_pull_varint(cur, &len)) 448 return 0; 449 450 *str_len = len; 451 452 *str = (const char *)cur->p; 453 454 if (!cursor_skip(cur, *str_len)) 455 return 0; 456 457 return 1; 458 } 459 460 // should be called after ndb_unpack_text_search_key_string. It continues 461 // the unpacking of a text search key if we've already started it. 462 static inline int 463 ndb_unpack_remaining_text_search_key(struct cursor *cur, 464 struct ndb_text_search_key *key) 465 { 466 if (!cursor_pull_varint(cur, &key->timestamp)) 467 return 0; 468 469 if (!cursor_pull_varint(cur, &key->word_index)) 470 return 0; 471 472 return 1; 473 } 474 475 // unpack a fulltext search key 476 // 477 // full version of string + unpack remaining. This is split up because text 478 // searching only requires to pull the string for prefix searching, and the 479 // remaining is optional 480 static inline int ndb_unpack_text_search_key(unsigned char *p, int len, 481 struct ndb_text_search_key *key) 482 { 483 struct cursor c; 484 make_cursor(p, p + len, &c); 485 486 if (!ndb_unpack_text_search_key_noteid(&c, &key->note_id)) 487 return 0; 488 489 if (!ndb_unpack_text_search_key_string(&c, &key->str, &key->str_len)) 490 return 0; 491 492 return ndb_unpack_remaining_text_search_key(&c, key); 493 } 494 495 // Copies only lowercase characters to the destination string and fills the rest with null bytes. 496 // `dst` and `src` are pointers to the destination and source strings, respectively. 497 // `n` is the maximum number of characters to copy. 498 static void lowercase_strncpy(char *dst, const char *src, int n) { 499 int j = 0, i = 0; 500 501 if (!dst || !src || n == 0) { 502 return; 503 } 504 505 while (src[i] != '\0' && j < n) { 506 dst[j++] = tolower(src[i++]); 507 } 508 509 // Null-terminate and fill the destination string 510 while (j < n) { 511 dst[j++] = '\0'; 512 } 513 } 514 515 static inline int ndb_filter_elem_is_ptr(struct ndb_filter_field *field) { 516 return field->elem_type == NDB_ELEMENT_STRING || field->elem_type == NDB_ELEMENT_ID; 517 } 518 519 // Copy the filter 520 int ndb_filter_clone(struct ndb_filter *dst, struct ndb_filter *src) 521 { 522 size_t src_size, elem_size, data_size; 523 524 memcpy(dst, src, sizeof(*src)); 525 526 elem_size = src->elem_buf.end - src->elem_buf.start; 527 data_size = src->data_buf.end - src->data_buf.start; 528 src_size = data_size + elem_size; 529 530 // let's only allow finalized filters to be cloned 531 if (!src || !src->finalized) 532 return 0; 533 534 dst->elem_buf.start = malloc(src_size); 535 dst->elem_buf.end = dst->elem_buf.start + elem_size; 536 dst->elem_buf.p = dst->elem_buf.end; 537 538 dst->data_buf.start = dst->elem_buf.start + elem_size; 539 dst->data_buf.end = dst->data_buf.start + data_size; 540 dst->data_buf.p = dst->data_buf.end; 541 542 if (dst->elem_buf.start == NULL) 543 return 0; 544 545 memcpy(dst->elem_buf.start, src->elem_buf.start, src_size); 546 547 return 1; 548 } 549 550 // "Finalize" the filter. This resizes the allocated heap buffers so that they 551 // are as small as possible. This also prevents new fields from being added 552 int ndb_filter_end(struct ndb_filter *filter) 553 { 554 #ifdef DEBUG 555 size_t orig_size; 556 #endif 557 size_t data_len, elem_len; 558 if (filter->finalized == 1) 559 return 0; 560 561 // move the data buffer to the end of the element buffer and update 562 // all of the element pointers accordingly 563 data_len = filter->data_buf.p - filter->data_buf.start; 564 elem_len = filter->elem_buf.p - filter->elem_buf.start; 565 #ifdef DEBUG 566 orig_size = filter->data_buf.end - filter->elem_buf.start; 567 #endif 568 569 // cap the elem buff 570 filter->elem_buf.end = filter->elem_buf.p; 571 572 // move the data buffer to the end of the element buffer 573 memmove(filter->elem_buf.p, filter->data_buf.start, data_len); 574 575 // realloc the whole thing 576 filter->elem_buf.start = realloc(filter->elem_buf.start, elem_len + data_len); 577 filter->elem_buf.end = filter->elem_buf.start + elem_len; 578 filter->elem_buf.p = filter->elem_buf.end; 579 580 filter->data_buf.start = filter->elem_buf.end; 581 filter->data_buf.end = filter->data_buf.start + data_len; 582 filter->data_buf.p = filter->data_buf.end; 583 584 filter->finalized = 1; 585 586 ndb_debug("ndb_filter_end: %ld -> %ld\n", orig_size, elem_len + data_len); 587 588 return 1; 589 } 590 591 static inline struct ndb_filter_elements * 592 ndb_filter_get_elements_by_offset(const struct ndb_filter *filter, int offset) 593 { 594 struct ndb_filter_elements *els; 595 596 if (offset < 0) 597 return NULL; 598 599 els = (struct ndb_filter_elements *)(filter->elem_buf.start + offset); 600 601 if ((unsigned char *)els > filter->elem_buf.p) 602 return NULL; 603 604 return els; 605 } 606 607 struct ndb_filter_elements * 608 ndb_filter_current_element(const struct ndb_filter *filter) 609 { 610 return ndb_filter_get_elements_by_offset(filter, filter->current); 611 } 612 613 struct ndb_filter_elements * 614 ndb_filter_get_elements(const struct ndb_filter *filter, int index) 615 { 616 if (filter->num_elements <= 0) 617 return NULL; 618 619 if (index > filter->num_elements-1) 620 return NULL; 621 622 return ndb_filter_get_elements_by_offset(filter, filter->elements[index]); 623 } 624 625 static inline unsigned char * 626 ndb_filter_elements_data(const struct ndb_filter *filter, int offset) 627 { 628 unsigned char *data; 629 630 if (offset < 0) 631 return NULL; 632 633 data = filter->data_buf.start + offset; 634 if (data > filter->data_buf.p) 635 return NULL; 636 637 return data; 638 } 639 640 unsigned char * 641 ndb_filter_get_id_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index) 642 { 643 return ndb_filter_elements_data(filter, els->elements[index]); 644 } 645 646 const char * 647 ndb_filter_get_string_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index) 648 { 649 return (const char *)ndb_filter_elements_data(filter, els->elements[index]); 650 } 651 652 uint64_t * 653 ndb_filter_get_int_element_ptr(struct ndb_filter_elements *els, int index) 654 { 655 return &els->elements[index]; 656 } 657 658 uint64_t 659 ndb_filter_get_int_element(const struct ndb_filter_elements *els, int index) 660 { 661 return els->elements[index]; 662 } 663 664 int ndb_filter_init(struct ndb_filter *filter) 665 { 666 struct cursor cur; 667 int page_size, elem_pages, data_pages, buf_size; 668 669 page_size = 4096; // assuming this, not a big deal if we're wrong 670 elem_pages = NDB_FILTER_PAGES / 4; 671 data_pages = NDB_FILTER_PAGES - elem_pages; 672 buf_size = page_size * NDB_FILTER_PAGES; 673 674 unsigned char *buf = malloc(buf_size); 675 if (!buf) 676 return 0; 677 678 // init memory arena for the cursor 679 make_cursor(buf, buf + buf_size, &cur); 680 681 cursor_slice(&cur, &filter->elem_buf, page_size * elem_pages); 682 cursor_slice(&cur, &filter->data_buf, page_size * data_pages); 683 684 // make sure we are fully allocated 685 assert(cur.p == cur.end); 686 687 // make sure elem_buf is the start of the buffer 688 assert(filter->elem_buf.start == cur.start); 689 690 filter->num_elements = 0; 691 filter->elements[0] = 0; 692 filter->current = -1; 693 filter->finalized = 0; 694 695 return 1; 696 } 697 698 void ndb_filter_destroy(struct ndb_filter *filter) 699 { 700 if (filter->elem_buf.start) 701 free(filter->elem_buf.start); 702 703 memset(filter, 0, sizeof(*filter)); 704 } 705 706 static const char *ndb_filter_field_name(enum ndb_filter_fieldtype field) 707 { 708 switch (field) { 709 case NDB_FILTER_IDS: return "ids"; 710 case NDB_FILTER_AUTHORS: return "authors"; 711 case NDB_FILTER_KINDS: return "kinds"; 712 case NDB_FILTER_TAGS: return "tags"; 713 case NDB_FILTER_SINCE: return "since"; 714 case NDB_FILTER_UNTIL: return "until"; 715 case NDB_FILTER_LIMIT: return "limit"; 716 } 717 718 return "unknown"; 719 } 720 721 static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filter_fieldtype field, char tag) 722 { 723 int i; 724 struct ndb_filter_elements *els, *el; 725 726 if (ndb_filter_current_element(filter)) { 727 fprintf(stderr, "ndb_filter_start_field: filter field already in progress, did you forget to call ndb_filter_end_field?\n"); 728 return 0; 729 } 730 731 // you can only start and end fields once 732 for (i = 0; i < filter->num_elements; i++) { 733 el = ndb_filter_get_elements(filter, i); 734 assert(el); 735 // TODO: fix this tags check to try to find matching tags 736 if (el->field.type == field && field != NDB_FILTER_TAGS) { 737 fprintf(stderr, "ndb_filter_start_field: field '%s' already exists\n", 738 ndb_filter_field_name(field)); 739 return 0; 740 } 741 } 742 743 filter->current = filter->elem_buf.p - filter->elem_buf.start; 744 els = ndb_filter_current_element(filter); 745 assert(els); 746 747 // advance elem buffer to the variable data section 748 if (!cursor_skip(&filter->elem_buf, sizeof(struct ndb_filter_elements))) { 749 fprintf(stderr, "ndb_filter_start_field: '%s' oom (todo: realloc?)\n", 750 ndb_filter_field_name(field)); 751 return 0; 752 } 753 754 els->field.type = field; 755 els->field.tag = tag; 756 els->field.elem_type = 0; 757 els->count = 0; 758 759 return 1; 760 } 761 762 int ndb_filter_start_field(struct ndb_filter *filter, enum ndb_filter_fieldtype field) 763 { 764 return ndb_filter_start_field_impl(filter, field, 0); 765 } 766 767 int ndb_filter_start_tag_field(struct ndb_filter *filter, char tag) 768 { 769 return ndb_filter_start_field_impl(filter, NDB_FILTER_TAGS, tag); 770 } 771 772 static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_element el) 773 { 774 struct ndb_filter_elements *current; 775 uint64_t offset; 776 777 if (!(current = ndb_filter_current_element(filter))) 778 return 0; 779 780 offset = filter->data_buf.p - filter->data_buf.start; 781 782 switch (current->field.type) { 783 case NDB_FILTER_IDS: 784 case NDB_FILTER_AUTHORS: 785 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 786 return 0; 787 break; 788 case NDB_FILTER_KINDS: 789 offset = el.integer; 790 break; 791 case NDB_FILTER_SINCE: 792 case NDB_FILTER_UNTIL: 793 case NDB_FILTER_LIMIT: 794 // only one allowed for since/until 795 if (current->count != 0) 796 return 0; 797 offset = el.integer; 798 break; 799 case NDB_FILTER_TAGS: 800 switch (current->field.elem_type) { 801 case NDB_ELEMENT_ID: 802 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 803 return 0; 804 break; 805 case NDB_ELEMENT_STRING: 806 if (!cursor_push(&filter->data_buf, (unsigned char *)el.string.string, el.string.len)) 807 return 0; 808 if (!cursor_push_byte(&filter->data_buf, 0)) 809 return 0; 810 break; 811 case NDB_ELEMENT_INT: 812 // ints are not allowed in tag filters 813 case NDB_ELEMENT_UNKNOWN: 814 return 0; 815 } 816 // push a pointer of the string in the databuf as an element 817 break; 818 } 819 820 if (!cursor_push(&filter->elem_buf, (unsigned char *)&offset, 821 sizeof(offset))) { 822 return 0; 823 } 824 825 current->count++; 826 827 return 1; 828 } 829 830 static int ndb_filter_set_elem_type(struct ndb_filter *filter, 831 enum ndb_generic_element_type elem_type) 832 { 833 enum ndb_generic_element_type current_elem_type; 834 struct ndb_filter_elements *current; 835 836 if (!(current = ndb_filter_current_element(filter))) 837 return 0; 838 839 current_elem_type = current->field.elem_type; 840 841 // element types must be uniform 842 if (current_elem_type != elem_type && current_elem_type != NDB_ELEMENT_UNKNOWN) { 843 fprintf(stderr, "ndb_filter_set_elem_type: element types must be uniform\n"); 844 return 0; 845 } 846 847 current->field.elem_type = elem_type; 848 849 return 1; 850 } 851 852 853 int ndb_filter_add_str_element_len(struct ndb_filter *filter, const char *str, int len) 854 { 855 union ndb_filter_element el; 856 struct ndb_filter_elements *current; 857 858 if (!(current = ndb_filter_current_element(filter))) 859 return 0; 860 861 // only generic queries are allowed to have strings 862 switch (current->field.type) { 863 case NDB_FILTER_SINCE: 864 case NDB_FILTER_UNTIL: 865 case NDB_FILTER_LIMIT: 866 case NDB_FILTER_IDS: 867 case NDB_FILTER_AUTHORS: 868 case NDB_FILTER_KINDS: 869 return 0; 870 case NDB_FILTER_TAGS: 871 break; 872 } 873 874 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_STRING)) 875 return 0; 876 877 el.string.string = str; 878 el.string.len = len; 879 880 return ndb_filter_add_element(filter, el); 881 } 882 883 int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str) 884 { 885 return ndb_filter_add_str_element_len(filter, str, strlen(str)); 886 } 887 888 int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer) 889 { 890 union ndb_filter_element el; 891 struct ndb_filter_elements *current; 892 if (!(current = ndb_filter_current_element(filter))) 893 return 0; 894 895 switch (current->field.type) { 896 case NDB_FILTER_IDS: 897 case NDB_FILTER_AUTHORS: 898 case NDB_FILTER_TAGS: 899 return 0; 900 case NDB_FILTER_KINDS: 901 case NDB_FILTER_SINCE: 902 case NDB_FILTER_UNTIL: 903 case NDB_FILTER_LIMIT: 904 break; 905 } 906 907 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_INT)) 908 return 0; 909 910 el.integer = integer; 911 912 return ndb_filter_add_element(filter, el); 913 } 914 915 int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id) 916 { 917 union ndb_filter_element el; 918 struct ndb_filter_elements *current; 919 920 if (!(current = ndb_filter_current_element(filter))) 921 return 0; 922 923 // only certain filter types allow pushing id elements 924 switch (current->field.type) { 925 case NDB_FILTER_SINCE: 926 case NDB_FILTER_UNTIL: 927 case NDB_FILTER_LIMIT: 928 case NDB_FILTER_KINDS: 929 return 0; 930 case NDB_FILTER_IDS: 931 case NDB_FILTER_AUTHORS: 932 case NDB_FILTER_TAGS: 933 break; 934 } 935 936 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_ID)) 937 return 0; 938 939 // this is needed so that generic filters know its an id 940 el.id = id; 941 942 return ndb_filter_add_element(filter, el); 943 } 944 945 static int ndb_tag_filter_matches(struct ndb_filter *filter, 946 struct ndb_filter_elements *els, 947 struct ndb_note *note) 948 { 949 int i; 950 const unsigned char *id; 951 const char *el_str; 952 struct ndb_iterator iter, *it = &iter; 953 struct ndb_str str; 954 955 ndb_tags_iterate_start(note, it); 956 957 while (ndb_tags_iterate_next(it)) { 958 // we're looking for tags with 2 or more entries: ["p", id], etc 959 if (it->tag->count < 2) 960 continue; 961 962 str = ndb_tag_str(note, it->tag, 0); 963 964 // we only care about packed strings (single char, etc) 965 if (str.flag != NDB_PACKED_STR) 966 continue; 967 968 // do we have #e matching e (or p, etc) 969 if (str.str[0] != els->field.tag || str.str[1] != 0) 970 continue; 971 972 str = ndb_tag_str(note, it->tag, 1); 973 974 switch (els->field.elem_type) { 975 case NDB_ELEMENT_ID: 976 // if our filter element type is an id, then we 977 // expect a packed id in the tag, otherwise skip 978 if (str.flag != NDB_PACKED_ID) 979 continue; 980 break; 981 case NDB_ELEMENT_STRING: 982 // if our filter element type is a string, then 983 // we should not expect an id 984 if (str.flag == NDB_PACKED_ID) 985 continue; 986 987 break; 988 case NDB_ELEMENT_UNKNOWN: 989 default: 990 // For some reason the element type is not set. It's 991 // possible nothing was added to the generic filter? 992 // Let's just fail here and log a note for debugging 993 fprintf(stderr, "UNUSUAL ndb_tag_filter_matches: have unknown element type %d\n", els->field.elem_type); 994 return 0; 995 } 996 997 for (i = 0; i < els->count; i++) { 998 switch (els->field.elem_type) { 999 case NDB_ELEMENT_ID: 1000 id = ndb_filter_get_id_element(filter, els, i); 1001 if (!memcmp(id, str.id, 32)) 1002 return 1; 1003 break; 1004 case NDB_ELEMENT_STRING: 1005 el_str = ndb_filter_get_string_element(filter, els, i); 1006 if (!strcmp(el_str, str.str)) 1007 return 1; 1008 break; 1009 case NDB_ELEMENT_INT: 1010 // int elements int tag queries are not supported 1011 case NDB_ELEMENT_UNKNOWN: 1012 return 0; 1013 } 1014 } 1015 } 1016 1017 return 0; 1018 } 1019 1020 struct search_id_state { 1021 struct ndb_filter *filter; 1022 struct ndb_filter_elements *els; 1023 unsigned char *key; 1024 }; 1025 1026 static int compare_ids(const void *pa, const void *pb) 1027 { 1028 unsigned char *a = *(unsigned char **)pa; 1029 unsigned char *b = *(unsigned char **)pb; 1030 1031 return memcmp(a, b, 32); 1032 } 1033 1034 static int search_ids(const void *ctx, const void *mid_ptr) 1035 { 1036 struct search_id_state *state; 1037 unsigned char *mid_id; 1038 uint32_t mid; 1039 1040 state = (struct search_id_state *)ctx; 1041 mid = *(uint32_t *)mid_ptr; 1042 1043 mid_id = ndb_filter_elements_data(state->filter, mid); 1044 assert(mid_id); 1045 1046 return memcmp(state->key, mid_id, 32); 1047 } 1048 1049 static int compare_kinds(const void *pa, const void *pb) 1050 { 1051 1052 // NOTE: this should match type in `union ndb_filter_element` 1053 uint64_t a = *(uint64_t *)pa; 1054 uint64_t b = *(uint64_t *)pb; 1055 1056 if (a < b) { 1057 return -1; 1058 } else if (a > b) { 1059 return 1; 1060 } else { 1061 return 0; 1062 } 1063 } 1064 1065 // 1066 // returns 1 if a filter matches a note 1067 static int ndb_filter_matches_with(struct ndb_filter *filter, 1068 struct ndb_note *note, int already_matched) 1069 { 1070 int i, j; 1071 struct ndb_filter_elements *els; 1072 struct search_id_state state; 1073 1074 state.filter = filter; 1075 1076 for (i = 0; i < filter->num_elements; i++) { 1077 els = ndb_filter_get_elements(filter, i); 1078 state.els = els; 1079 assert(els); 1080 1081 // if we know we already match from a query scan result, 1082 // we can skip this check 1083 if ((1 << els->field.type) & already_matched) 1084 continue; 1085 1086 switch (els->field.type) { 1087 case NDB_FILTER_KINDS: 1088 for (j = 0; j < els->count; j++) { 1089 if ((unsigned int)els->elements[j] == note->kind) 1090 goto cont; 1091 } 1092 break; 1093 case NDB_FILTER_IDS: 1094 state.key = ndb_note_id(note); 1095 if (bsearch(&state, &els->elements[0], els->count, 1096 sizeof(els->elements[0]), search_ids)) { 1097 continue; 1098 } 1099 break; 1100 case NDB_FILTER_AUTHORS: 1101 state.key = ndb_note_pubkey(note); 1102 if (bsearch(&state, &els->elements[0], els->count, 1103 sizeof(els->elements[0]), search_ids)) { 1104 continue; 1105 } 1106 break; 1107 case NDB_FILTER_TAGS: 1108 if (ndb_tag_filter_matches(filter, els, note)) 1109 continue; 1110 break; 1111 case NDB_FILTER_SINCE: 1112 assert(els->count == 1); 1113 if (note->created_at >= els->elements[0]) 1114 continue; 1115 break; 1116 case NDB_FILTER_UNTIL: 1117 assert(els->count == 1); 1118 if (note->created_at < els->elements[0]) 1119 continue; 1120 case NDB_FILTER_LIMIT: 1121 cont: 1122 continue; 1123 } 1124 1125 // all need to match 1126 return 0; 1127 } 1128 1129 return 1; 1130 } 1131 1132 int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note) 1133 { 1134 return ndb_filter_matches_with(filter, note, 0); 1135 } 1136 1137 // because elements are stored as offsets and qsort doesn't support context, 1138 // we do a dumb thing where we convert elements to pointers and back if we 1139 // are doing an id or string sort 1140 static void sort_filter_elements(struct ndb_filter *filter, 1141 struct ndb_filter_elements *els, 1142 int (*cmp)(const void *, const void *)) 1143 { 1144 int i; 1145 1146 assert(ndb_filter_elem_is_ptr(&els->field)); 1147 1148 for (i = 0; i < els->count; i++) 1149 els->elements[i] += (uint64_t)filter->data_buf.start; 1150 1151 qsort(&els->elements[0], els->count, sizeof(els->elements[0]), cmp); 1152 1153 for (i = 0; i < els->count; i++) 1154 els->elements[i] -= (uint64_t)filter->data_buf.start; 1155 } 1156 1157 void ndb_filter_end_field(struct ndb_filter *filter) 1158 { 1159 int cur_offset; 1160 struct ndb_filter_elements *cur; 1161 1162 cur_offset = filter->current; 1163 1164 if (!(cur = ndb_filter_current_element(filter))) 1165 return; 1166 1167 filter->elements[filter->num_elements++] = cur_offset; 1168 1169 // sort elements for binary search 1170 switch (cur->field.type) { 1171 case NDB_FILTER_IDS: 1172 case NDB_FILTER_AUTHORS: 1173 sort_filter_elements(filter, cur, compare_ids); 1174 break; 1175 case NDB_FILTER_KINDS: 1176 qsort(&cur->elements[0], cur->count, 1177 sizeof(cur->elements[0]), compare_kinds); 1178 break; 1179 case NDB_FILTER_TAGS: 1180 // TODO: generic tag search sorting 1181 break; 1182 case NDB_FILTER_SINCE: 1183 case NDB_FILTER_UNTIL: 1184 case NDB_FILTER_LIMIT: 1185 // don't need to sort these 1186 break; 1187 } 1188 1189 filter->current = -1; 1190 1191 } 1192 1193 static void ndb_filter_group_init(struct ndb_filter_group *group) 1194 { 1195 group->num_filters = 0; 1196 } 1197 1198 static int ndb_filter_group_add(struct ndb_filter_group *group, 1199 struct ndb_filter *filter) 1200 { 1201 if (group->num_filters + 1 > MAX_FILTERS) 1202 return 0; 1203 1204 return ndb_filter_clone(&group->filters[group->num_filters++], filter); 1205 } 1206 1207 static int ndb_filter_group_matches(struct ndb_filter_group *group, 1208 struct ndb_note *note) 1209 { 1210 int i; 1211 struct ndb_filter *filter; 1212 1213 if (group->num_filters == 0) 1214 return 1; 1215 1216 for (i = 0; i < group->num_filters; i++) { 1217 filter = &group->filters[i]; 1218 1219 if (ndb_filter_matches(filter, note)) 1220 return 1; 1221 } 1222 1223 return 0; 1224 } 1225 1226 static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id, 1227 uint64_t timestamp, const char *search) 1228 { 1229 memcpy(key->id, id, 32); 1230 key->timestamp = timestamp; 1231 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 1232 key->search[sizeof(key->search) - 1] = '\0'; 1233 } 1234 1235 static int ndb_write_profile_search_index(struct ndb_txn *txn, 1236 struct ndb_search_key *index_key, 1237 uint64_t profile_key) 1238 { 1239 int rc; 1240 MDB_val key, val; 1241 1242 key.mv_data = index_key; 1243 key.mv_size = sizeof(*index_key); 1244 val.mv_data = &profile_key; 1245 val.mv_size = sizeof(profile_key); 1246 1247 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 1248 &key, &val, 0))) 1249 { 1250 ndb_debug("ndb_write_profile_search_index failed: %s\n", 1251 mdb_strerror(rc)); 1252 return 0; 1253 } 1254 1255 return 1; 1256 } 1257 1258 1259 // map usernames and display names to profile keys for user searching 1260 static int ndb_write_profile_search_indices(struct ndb_txn *txn, 1261 struct ndb_note *note, 1262 uint64_t profile_key, 1263 void *profile_root) 1264 { 1265 struct ndb_search_key index; 1266 NdbProfileRecord_table_t profile_record; 1267 NdbProfile_table_t profile; 1268 1269 profile_record = NdbProfileRecord_as_root(profile_root); 1270 profile = NdbProfileRecord_profile_get(profile_record); 1271 1272 const char *name = NdbProfile_name_get(profile); 1273 const char *display_name = NdbProfile_display_name_get(profile); 1274 1275 // words + pubkey + created 1276 if (name) { 1277 ndb_make_search_key(&index, note->pubkey, note->created_at, 1278 name); 1279 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 1280 return 0; 1281 } 1282 1283 if (display_name) { 1284 // don't write the same name/display_name twice 1285 if (name && !strcmp(display_name, name)) { 1286 return 1; 1287 } 1288 ndb_make_search_key(&index, note->pubkey, note->created_at, 1289 display_name); 1290 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 1291 return 0; 1292 } 1293 1294 return 1; 1295 } 1296 1297 1298 static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags) 1299 { 1300 txn->lmdb = &ndb->lmdb; 1301 MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn; 1302 if (!txn->lmdb->env) 1303 return 0; 1304 return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0; 1305 } 1306 1307 int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn) 1308 { 1309 return _ndb_begin_query(ndb, txn, MDB_RDONLY); 1310 } 1311 1312 // this should only be used in migrations, etc 1313 static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn) 1314 { 1315 return _ndb_begin_query(ndb, txn, 0); 1316 } 1317 1318 1319 // Migrations 1320 // 1321 1322 static int ndb_migrate_user_search_indices(struct ndb *ndb) 1323 { 1324 int rc; 1325 MDB_cursor *cur; 1326 MDB_val k, v; 1327 void *profile_root; 1328 NdbProfileRecord_table_t record; 1329 struct ndb_txn txn; 1330 struct ndb_note *note; 1331 uint64_t note_key, profile_key; 1332 size_t len; 1333 int count; 1334 1335 if (!ndb_begin_rw_query(ndb, &txn)) { 1336 fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_rw_query failed\n"); 1337 return 0; 1338 } 1339 1340 if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) { 1341 fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); 1342 return 0; 1343 } 1344 1345 count = 0; 1346 1347 // loop through all profiles and write search indices 1348 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1349 profile_root = v.mv_data; 1350 profile_key = *((uint64_t*)k.mv_data); 1351 record = NdbProfileRecord_as_root(profile_root); 1352 note_key = NdbProfileRecord_note_key(record); 1353 note = ndb_get_note_by_key(&txn, note_key, &len); 1354 1355 if (note == NULL) { 1356 fprintf(stderr, "ndb_migrate_user_search_indices: note lookup failed\n"); 1357 return 0; 1358 } 1359 1360 if (!ndb_write_profile_search_indices(&txn, note, profile_key, 1361 profile_root)) { 1362 1363 fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n"); 1364 return 0; 1365 } 1366 1367 count++; 1368 } 1369 1370 fprintf(stderr, "migrated %d profiles to include search indices\n", count); 1371 1372 mdb_cursor_close(cur); 1373 1374 ndb_end_query(&txn); 1375 1376 return 1; 1377 } 1378 1379 static int ndb_migrate_lower_user_search_indices(struct ndb *ndb) 1380 { 1381 MDB_txn *txn; 1382 1383 if (mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn)) { 1384 fprintf(stderr, "ndb_migrate_lower_user_search_indices: ndb_txn_begin failed\n"); 1385 return 0; 1386 } 1387 1388 // just drop the search db so we can rebuild it 1389 if (mdb_drop(txn, ndb->lmdb.dbs[NDB_DB_PROFILE_SEARCH], 0)) { 1390 fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n"); 1391 return 0; 1392 } 1393 1394 mdb_txn_commit(txn); 1395 1396 return ndb_migrate_user_search_indices(ndb); 1397 } 1398 1399 int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile); 1400 1401 1402 int ndb_db_version(struct ndb *ndb) 1403 { 1404 int rc; 1405 uint64_t version, version_key; 1406 MDB_val k, v; 1407 MDB_txn *txn; 1408 1409 version_key = NDB_META_KEY_VERSION; 1410 k.mv_data = &version_key; 1411 k.mv_size = sizeof(version_key); 1412 1413 if ((rc = mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn))) { 1414 fprintf(stderr, "ndb_db_version: mdb_txn_begin failed, error %d\n", rc); 1415 return -1; 1416 } 1417 1418 if (mdb_get(txn, ndb->lmdb.dbs[NDB_DB_NDB_META], &k, &v)) { 1419 version = -1; 1420 } else { 1421 if (v.mv_size != 8) { 1422 fprintf(stderr, "run_migrations: invalid version size?"); 1423 return 0; 1424 } 1425 version = *((uint64_t*)v.mv_data); 1426 } 1427 1428 mdb_txn_abort(txn); 1429 return version; 1430 } 1431 1432 // custom kind+timestamp comparison function. This is used by lmdb to perform 1433 // b+ tree searches over the kind+timestamp index 1434 static int ndb_u64_tsid_compare(const MDB_val *a, const MDB_val *b) 1435 { 1436 struct ndb_u64_tsid *tsa, *tsb; 1437 tsa = a->mv_data; 1438 tsb = b->mv_data; 1439 1440 if (tsa->u64 < tsb->u64) 1441 return -1; 1442 else if (tsa->u64 > tsb->u64) 1443 return 1; 1444 1445 if (tsa->timestamp < tsb->timestamp) 1446 return -1; 1447 else if (tsa->timestamp > tsb->timestamp) 1448 return 1; 1449 1450 return 0; 1451 } 1452 1453 static int ndb_tsid_compare(const MDB_val *a, const MDB_val *b) 1454 { 1455 struct ndb_tsid *tsa, *tsb; 1456 MDB_val a2 = *a, b2 = *b; 1457 1458 a2.mv_size = sizeof(tsa->id); 1459 b2.mv_size = sizeof(tsb->id); 1460 1461 int cmp = mdb_cmp_memn(&a2, &b2); 1462 if (cmp) return cmp; 1463 1464 tsa = a->mv_data; 1465 tsb = b->mv_data; 1466 1467 if (tsa->timestamp < tsb->timestamp) 1468 return -1; 1469 else if (tsa->timestamp > tsb->timestamp) 1470 return 1; 1471 return 0; 1472 } 1473 1474 static inline void ndb_tsid_low(struct ndb_tsid *key, unsigned char *id) 1475 { 1476 memcpy(key->id, id, 32); 1477 key->timestamp = 0; 1478 } 1479 1480 static inline void ndb_tsid_init(struct ndb_tsid *key, unsigned char *id, 1481 uint64_t timestamp) 1482 { 1483 memcpy(key->id, id, 32); 1484 key->timestamp = timestamp; 1485 } 1486 1487 static inline void ndb_u64_tsid_init(struct ndb_u64_tsid *key, uint64_t integer, 1488 uint64_t timestamp) 1489 { 1490 key->u64 = integer; 1491 key->timestamp = timestamp; 1492 } 1493 1494 // useful for range-searching for the latest key with a clustered created_at timen 1495 static inline void ndb_tsid_high(struct ndb_tsid *key, const unsigned char *id) 1496 { 1497 memcpy(key->id, id, 32); 1498 key->timestamp = UINT64_MAX; 1499 } 1500 1501 enum ndb_ingester_msgtype { 1502 NDB_INGEST_EVENT, // write json to the ingester queue for processing 1503 NDB_INGEST_QUIT, // kill ingester thread immediately 1504 }; 1505 1506 struct ndb_ingester_event { 1507 char *json; 1508 unsigned client : 1; // ["EVENT", {...}] messages 1509 unsigned len : 31; 1510 }; 1511 1512 struct ndb_writer_note { 1513 struct ndb_note *note; 1514 size_t note_len; 1515 }; 1516 1517 struct ndb_writer_profile { 1518 struct ndb_writer_note note; 1519 struct ndb_profile_record_builder record; 1520 }; 1521 1522 struct ndb_ingester_msg { 1523 enum ndb_ingester_msgtype type; 1524 union { 1525 struct ndb_ingester_event event; 1526 }; 1527 }; 1528 1529 struct ndb_writer_ndb_meta { 1530 // these are 64 bit because I'm paranoid of db-wide alignment issues 1531 uint64_t version; 1532 }; 1533 1534 // Used in the writer thread when writing ndb_profile_fetch_record's 1535 // kv = pubkey: recor 1536 struct ndb_writer_last_fetch { 1537 unsigned char pubkey[32]; 1538 uint64_t fetched_at; 1539 }; 1540 1541 // write note blocks 1542 struct ndb_writer_blocks { 1543 struct ndb_blocks *blocks; 1544 uint64_t note_key; 1545 }; 1546 1547 // The different types of messages that the writer thread can write to the 1548 // database 1549 struct ndb_writer_msg { 1550 enum ndb_writer_msgtype type; 1551 union { 1552 struct ndb_writer_note note; 1553 struct ndb_writer_profile profile; 1554 struct ndb_writer_ndb_meta ndb_meta; 1555 struct ndb_writer_last_fetch last_fetch; 1556 struct ndb_writer_blocks blocks; 1557 }; 1558 }; 1559 1560 static inline int ndb_writer_queue_msg(struct ndb_writer *writer, 1561 struct ndb_writer_msg *msg) 1562 { 1563 return prot_queue_push(&writer->inbox, msg); 1564 } 1565 1566 static int ndb_migrate_utf8_profile_names(struct ndb *ndb) 1567 { 1568 int rc; 1569 MDB_cursor *cur; 1570 MDB_val k, v; 1571 void *profile_root; 1572 NdbProfileRecord_table_t record; 1573 struct ndb_txn txn; 1574 struct ndb_note *note, *copied_note; 1575 uint64_t note_key; 1576 size_t len; 1577 int count, failed; 1578 struct ndb_writer_msg out; 1579 1580 if (!ndb_begin_rw_query(ndb, &txn)) { 1581 fprintf(stderr, "ndb_migrate_utf8_profile_names: ndb_begin_rw_query failed\n"); 1582 return 0; 1583 } 1584 1585 if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) { 1586 fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc); 1587 return 0; 1588 } 1589 1590 count = 0; 1591 failed = 0; 1592 1593 // loop through all profiles and write search indices 1594 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1595 profile_root = v.mv_data; 1596 record = NdbProfileRecord_as_root(profile_root); 1597 note_key = NdbProfileRecord_note_key(record); 1598 note = ndb_get_note_by_key(&txn, note_key, &len); 1599 1600 if (note == NULL) { 1601 fprintf(stderr, "ndb_migrate_utf8_profile_names: note lookup failed\n"); 1602 return 0; 1603 } 1604 1605 struct ndb_profile_record_builder *b = &out.profile.record; 1606 1607 // reprocess profile 1608 if (!ndb_process_profile_note(note, b)) { 1609 failed++; 1610 continue; 1611 } 1612 1613 // the writer needs to own this note, and its expected to free it 1614 copied_note = malloc(len); 1615 memcpy(copied_note, note, len); 1616 1617 out.type = NDB_WRITER_PROFILE; 1618 out.profile.note.note = copied_note; 1619 out.profile.note.note_len = len; 1620 1621 ndb_writer_queue_msg(&ndb->writer, &out); 1622 1623 count++; 1624 } 1625 1626 fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count); 1627 1628 if (failed != 0) { 1629 fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed); 1630 } 1631 1632 mdb_cursor_close(cur); 1633 1634 ndb_end_query(&txn); 1635 1636 return 1; 1637 } 1638 1639 static struct ndb_migration MIGRATIONS[] = { 1640 { .fn = ndb_migrate_user_search_indices }, 1641 { .fn = ndb_migrate_lower_user_search_indices }, 1642 { .fn = ndb_migrate_utf8_profile_names } 1643 }; 1644 1645 1646 int ndb_end_query(struct ndb_txn *txn) 1647 { 1648 // this works on read or write queries. 1649 return mdb_txn_commit(txn->mdb_txn) == 0; 1650 } 1651 1652 int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32], 1653 unsigned char sig[64]) 1654 { 1655 secp256k1_xonly_pubkey xonly_pubkey; 1656 int ok; 1657 1658 ok = secp256k1_xonly_pubkey_parse((secp256k1_context*)ctx, &xonly_pubkey, 1659 pubkey) != 0; 1660 if (!ok) return 0; 1661 1662 ok = secp256k1_schnorrsig_verify((secp256k1_context*)ctx, sig, id, 32, 1663 &xonly_pubkey) > 0; 1664 if (!ok) return 0; 1665 1666 return 1; 1667 } 1668 1669 static inline int ndb_writer_queue_msgs(struct ndb_writer *writer, 1670 struct ndb_writer_msg *msgs, 1671 int num_msgs) 1672 { 1673 return prot_queue_push_all(&writer->inbox, msgs, num_msgs); 1674 } 1675 1676 static int ndb_writer_queue_note(struct ndb_writer *writer, 1677 struct ndb_note *note, size_t note_len) 1678 { 1679 struct ndb_writer_msg msg; 1680 msg.type = NDB_WRITER_NOTE; 1681 1682 msg.note.note = note; 1683 msg.note.note_len = note_len; 1684 1685 return prot_queue_push(&writer->inbox, &msg); 1686 } 1687 1688 static void ndb_writer_last_profile_fetch(struct ndb_txn *txn, 1689 const unsigned char *pubkey, 1690 uint64_t fetched_at) 1691 { 1692 int rc; 1693 MDB_val key, val; 1694 1695 key.mv_data = (unsigned char*)pubkey; 1696 key.mv_size = 32; 1697 val.mv_data = &fetched_at; 1698 val.mv_size = sizeof(fetched_at); 1699 1700 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], 1701 &key, &val, 0))) 1702 { 1703 ndb_debug("write version to ndb_meta failed: %s\n", 1704 mdb_strerror(rc)); 1705 return; 1706 } 1707 1708 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 1709 } 1710 1711 1712 // We just received a profile that we haven't processed yet, but it could 1713 // be an older one! Make sure we only write last fetched profile if it's a new 1714 // one 1715 // 1716 // To do this, we first check the latest profile in the database. If the 1717 // created_date for this profile note is newer, then we write a 1718 // last_profile_fetch record, otherwise we do not. 1719 // 1720 // WARNING: This function is only valid when called from the writer thread 1721 static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn, 1722 struct ndb_note *note) 1723 { 1724 size_t len; 1725 uint64_t profile_key, note_key; 1726 void *root; 1727 struct ndb_note *last_profile; 1728 NdbProfileRecord_table_t record; 1729 1730 if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) { 1731 record = NdbProfileRecord_as_root(root); 1732 note_key = NdbProfileRecord_note_key(record); 1733 last_profile = ndb_get_note_by_key(txn, note_key, &len); 1734 if (last_profile == NULL) { 1735 return 0; 1736 } 1737 1738 // found profile, let's see if it's newer than ours 1739 if (note->created_at > last_profile->created_at) { 1740 // this is a new profile note, record last fetched time 1741 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 1742 } 1743 } else { 1744 // couldn't fetch profile. record last fetched time 1745 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 1746 } 1747 1748 return 1; 1749 } 1750 1751 int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, 1752 uint64_t fetched_at) 1753 { 1754 struct ndb_writer_msg msg; 1755 msg.type = NDB_WRITER_PROFILE_LAST_FETCH; 1756 memcpy(&msg.last_fetch.pubkey[0], pubkey, 32); 1757 msg.last_fetch.fetched_at = fetched_at; 1758 1759 return ndb_writer_queue_msg(&ndb->writer, &msg); 1760 } 1761 1762 1763 // When doing cursor scans from greatest to lowest, this function positions the 1764 // cursor at the first element before descending. MDB_SET_RANGE puts us right 1765 // after the first element, so we have to go back one. 1766 static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v) 1767 { 1768 // Position cursor at the next key greater than or equal to the 1769 // specified key 1770 if (mdb_cursor_get(cur, k, v, MDB_SET_RANGE)) { 1771 // Failed :(. It could be the last element? 1772 if (mdb_cursor_get(cur, k, v, MDB_LAST)) 1773 return 0; 1774 } else { 1775 // if set range worked and our key exists, it should be 1776 // the one right before this one 1777 if (mdb_cursor_get(cur, k, v, MDB_PREV)) 1778 return 0; 1779 } 1780 1781 return 1; 1782 } 1783 1784 // get some value based on a clustered id key 1785 int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id, 1786 MDB_val *val) 1787 { 1788 MDB_val k, v; 1789 MDB_cursor *cur; 1790 int success = 0, rc; 1791 struct ndb_tsid tsid; 1792 1793 // position at the most recent 1794 ndb_tsid_high(&tsid, id); 1795 1796 k.mv_data = &tsid; 1797 k.mv_size = sizeof(tsid); 1798 1799 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur))) { 1800 ndb_debug("ndb_get_tsid: failed to open cursor: '%s'\n", mdb_strerror(rc)); 1801 return 0; 1802 } 1803 1804 if (!ndb_cursor_start(cur, &k, &v)) 1805 goto cleanup; 1806 1807 if (memcmp(k.mv_data, id, 32) == 0) { 1808 *val = v; 1809 success = 1; 1810 } 1811 1812 cleanup: 1813 mdb_cursor_close(cur); 1814 return success; 1815 } 1816 1817 static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key, 1818 enum ndb_dbs store, size_t *len) 1819 { 1820 MDB_val k, v; 1821 1822 k.mv_data = &key; 1823 k.mv_size = sizeof(key); 1824 1825 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 1826 ndb_debug("ndb_lookup_by_key: mdb_get note failed\n"); 1827 return NULL; 1828 } 1829 1830 if (len) 1831 *len = v.mv_size; 1832 1833 return v.mv_data; 1834 } 1835 1836 static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind, 1837 enum ndb_dbs store, const unsigned char *pk, 1838 size_t *len, uint64_t *primkey) 1839 { 1840 MDB_val k, v; 1841 void *res = NULL; 1842 if (len) 1843 *len = 0; 1844 1845 if (!ndb_get_tsid(txn, ind, pk, &k)) { 1846 //ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n"); 1847 return 0; 1848 } 1849 1850 if (primkey) 1851 *primkey = *(uint64_t*)k.mv_data; 1852 1853 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 1854 ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n"); 1855 return 0; 1856 } 1857 1858 res = v.mv_data; 1859 assert(((uint64_t)res % 4) == 0); 1860 if (len) 1861 *len = v.mv_size; 1862 return res; 1863 } 1864 1865 void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pk, size_t *len, uint64_t *key) 1866 { 1867 return ndb_lookup_tsid(txn, NDB_DB_PROFILE_PK, NDB_DB_PROFILE, pk, len, key); 1868 } 1869 1870 struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *key) 1871 { 1872 return ndb_lookup_tsid(txn, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len, key); 1873 } 1874 1875 static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn, 1876 enum ndb_dbs db, 1877 const unsigned char *id) 1878 { 1879 MDB_val k; 1880 1881 if (!ndb_get_tsid(txn, db, id, &k)) 1882 return 0; 1883 1884 return *(uint32_t*)k.mv_data; 1885 } 1886 1887 uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id) 1888 { 1889 return ndb_get_indexkey_by_id(txn, NDB_DB_NOTE_ID, id); 1890 } 1891 1892 uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id) 1893 { 1894 return ndb_get_indexkey_by_id(txn, NDB_DB_PROFILE_PK, id); 1895 } 1896 1897 struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 1898 { 1899 return ndb_lookup_by_key(txn, key, NDB_DB_NOTE, len); 1900 } 1901 1902 void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 1903 { 1904 return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len); 1905 } 1906 1907 uint64_t 1908 ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey) 1909 { 1910 MDB_val k, v; 1911 1912 k.mv_data = (unsigned char*)pubkey; 1913 k.mv_size = 32; 1914 1915 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) { 1916 //ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n"); 1917 return 0; 1918 } 1919 1920 return *((uint64_t*)v.mv_data); 1921 } 1922 1923 1924 static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id) 1925 { 1926 MDB_val val; 1927 1928 if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val)) 1929 return 0; 1930 1931 return 1; 1932 } 1933 1934 static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb, 1935 MDB_txn *mdb_txn) 1936 { 1937 txn->lmdb = lmdb; 1938 txn->mdb_txn = mdb_txn; 1939 } 1940 1941 static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid) 1942 { 1943 unsigned char id[32]; 1944 struct ndb_ingest_controller *c = data; 1945 struct ndb_txn txn; 1946 1947 hex_decode(hexid, 64, id, sizeof(id)); 1948 1949 // let's see if we already have it 1950 1951 ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn); 1952 if (!ndb_has_note(&txn, id)) 1953 return NDB_IDRES_CONT; 1954 1955 return NDB_IDRES_STOP; 1956 } 1957 1958 static int ndbprofile_parse_json(flatcc_builder_t *B, 1959 const char *buf, size_t bufsiz, int flags, NdbProfile_ref_t *profile) 1960 { 1961 flatcc_json_parser_t parser, *ctx = &parser; 1962 flatcc_json_parser_init(ctx, B, buf, buf + bufsiz, flags); 1963 1964 if (flatcc_builder_start_buffer(B, 0, 0, 0)) 1965 return 0; 1966 1967 NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile); 1968 if (ctx->error) 1969 return 0; 1970 1971 if (!flatcc_builder_end_buffer(B, *profile)) 1972 return 0; 1973 1974 ctx->end_loc = buf; 1975 1976 1977 return 1; 1978 } 1979 1980 void ndb_profile_record_builder_init(struct ndb_profile_record_builder *b) 1981 { 1982 b->builder = malloc(sizeof(*b->builder)); 1983 b->flatbuf = NULL; 1984 } 1985 1986 void ndb_profile_record_builder_free(struct ndb_profile_record_builder *b) 1987 { 1988 if (b->builder) 1989 free(b->builder); 1990 if (b->flatbuf) 1991 free(b->flatbuf); 1992 1993 b->builder = NULL; 1994 b->flatbuf = NULL; 1995 } 1996 1997 int ndb_process_profile_note(struct ndb_note *note, 1998 struct ndb_profile_record_builder *profile) 1999 { 2000 int res; 2001 2002 NdbProfile_ref_t profile_table; 2003 flatcc_builder_t *builder; 2004 2005 ndb_profile_record_builder_init(profile); 2006 builder = profile->builder; 2007 flatcc_builder_init(builder); 2008 2009 NdbProfileRecord_start_as_root(builder); 2010 2011 //printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note)); 2012 if (!(res = ndbprofile_parse_json(builder, ndb_note_content(note), 2013 note->content_length, 2014 flatcc_json_parser_f_skip_unknown, 2015 &profile_table))) 2016 { 2017 ndb_debug("profile_parse_json failed %d '%.*s'\n", res, 2018 note->content_length, ndb_note_content(note)); 2019 ndb_profile_record_builder_free(profile); 2020 return 0; 2021 } 2022 2023 uint64_t received_at = time(NULL); 2024 const char *lnurl = "fixme"; 2025 2026 NdbProfileRecord_profile_add(builder, profile_table); 2027 NdbProfileRecord_received_at_add(builder, received_at); 2028 2029 flatcc_builder_ref_t lnurl_off; 2030 lnurl_off = flatcc_builder_create_string_str(builder, lnurl); 2031 2032 NdbProfileRecord_lnurl_add(builder, lnurl_off); 2033 2034 //*profile = flatcc_builder_finalize_aligned_buffer(builder, profile_len); 2035 return 1; 2036 } 2037 2038 static int ndb_ingester_process_note(secp256k1_context *ctx, 2039 struct ndb_note *note, 2040 size_t note_size, 2041 struct ndb_writer_msg *out, 2042 struct ndb_ingester *ingester) 2043 { 2044 enum ndb_ingest_filter_action action; 2045 action = NDB_INGEST_ACCEPT; 2046 2047 if (ingester->filter) 2048 action = ingester->filter(ingester->filter_context, note); 2049 2050 if (action == NDB_INGEST_REJECT) 2051 return 0; 2052 2053 // some special situations we might want to skip sig validation, 2054 // like during large imports 2055 if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) { 2056 // if we're skipping validation we don't need to verify 2057 } else { 2058 // verify! If it's an invalid note we don't need to 2059 // bother writing it to the database 2060 if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) { 2061 ndb_debug("signature verification failed\n"); 2062 return 0; 2063 } 2064 } 2065 2066 // we didn't find anything. let's send it 2067 // to the writer thread 2068 note = realloc(note, note_size); 2069 assert(((uint64_t)note % 4) == 0); 2070 2071 if (note->kind == 0) { 2072 struct ndb_profile_record_builder *b = 2073 &out->profile.record; 2074 2075 ndb_process_profile_note(note, b); 2076 2077 out->type = NDB_WRITER_PROFILE; 2078 out->profile.note.note = note; 2079 out->profile.note.note_len = note_size; 2080 } else { 2081 out->type = NDB_WRITER_NOTE; 2082 out->note.note = note; 2083 out->note.note_len = note_size; 2084 } 2085 2086 return 1; 2087 } 2088 2089 2090 static int ndb_ingester_process_event(secp256k1_context *ctx, 2091 struct ndb_ingester *ingester, 2092 struct ndb_ingester_event *ev, 2093 struct ndb_writer_msg *out, 2094 MDB_txn *read_txn 2095 ) 2096 { 2097 struct ndb_tce tce; 2098 struct ndb_fce fce; 2099 struct ndb_note *note; 2100 struct ndb_ingest_controller controller; 2101 struct ndb_id_cb cb; 2102 void *buf; 2103 int ok; 2104 size_t bufsize, note_size; 2105 2106 ok = 0; 2107 2108 // we will use this to check if we already have it in the DB during 2109 // ID parsing 2110 controller.read_txn = read_txn; 2111 controller.lmdb = ingester->writer->lmdb; 2112 cb.fn = ndb_ingester_json_controller; 2113 cb.data = &controller; 2114 2115 // since we're going to be passing this allocated note to a different 2116 // thread, we can't use thread-local buffers. just allocate a block 2117 bufsize = max(ev->len * 8.0, 4096); 2118 buf = malloc(bufsize); 2119 if (!buf) { 2120 ndb_debug("couldn't malloc buf\n"); 2121 return 0; 2122 } 2123 2124 note_size = 2125 ev->client ? 2126 ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) : 2127 ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb); 2128 2129 if ((int)note_size == -42) { 2130 // we already have this! 2131 //ndb_debug("already have id??\n"); 2132 goto cleanup; 2133 } else if (note_size == 0) { 2134 ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json); 2135 goto cleanup; 2136 } 2137 2138 //ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json); 2139 2140 if (ev->client) { 2141 switch (fce.evtype) { 2142 case NDB_FCE_EVENT: 2143 note = fce.event.note; 2144 if (note != buf) { 2145 ndb_debug("note buffer not equal to malloc'd buffer\n"); 2146 goto cleanup; 2147 } 2148 2149 if (!ndb_ingester_process_note(ctx, note, note_size, 2150 out, ingester)) { 2151 ndb_debug("failed to process note\n"); 2152 goto cleanup; 2153 } else { 2154 // we're done with the original json, free it 2155 free(ev->json); 2156 return 1; 2157 } 2158 } 2159 } else { 2160 switch (tce.evtype) { 2161 case NDB_TCE_AUTH: goto cleanup; 2162 case NDB_TCE_NOTICE: goto cleanup; 2163 case NDB_TCE_EOSE: goto cleanup; 2164 case NDB_TCE_OK: goto cleanup; 2165 case NDB_TCE_EVENT: 2166 note = tce.event.note; 2167 if (note != buf) { 2168 ndb_debug("note buffer not equal to malloc'd buffer\n"); 2169 goto cleanup; 2170 } 2171 2172 if (!ndb_ingester_process_note(ctx, note, note_size, 2173 out, ingester)) { 2174 ndb_debug("failed to process note\n"); 2175 goto cleanup; 2176 } else { 2177 // we're done with the original json, free it 2178 free(ev->json); 2179 return 1; 2180 } 2181 } 2182 } 2183 2184 2185 cleanup: 2186 free(ev->json); 2187 free(buf); 2188 2189 return ok; 2190 } 2191 2192 static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db) 2193 { 2194 MDB_cursor *mc; 2195 MDB_val key, val; 2196 2197 if (mdb_cursor_open(txn, db, &mc)) 2198 return 0; 2199 2200 if (mdb_cursor_get(mc, &key, &val, MDB_LAST)) { 2201 mdb_cursor_close(mc); 2202 return 0; 2203 } 2204 2205 mdb_cursor_close(mc); 2206 2207 assert(key.mv_size == 8); 2208 return *((uint64_t*)key.mv_data); 2209 } 2210 2211 // 2212 // make a search key meant for user queries without any other note info 2213 static void ndb_make_search_key_low(struct ndb_search_key *key, const char *search) 2214 { 2215 memset(key->id, 0, sizeof(key->id)); 2216 key->timestamp = 0; 2217 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 2218 key->search[sizeof(key->search) - 1] = '\0'; 2219 } 2220 2221 int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query) 2222 { 2223 int rc; 2224 struct ndb_search_key s; 2225 MDB_val k, v; 2226 search->cursor = NULL; 2227 2228 MDB_cursor **cursor = (MDB_cursor **)&search->cursor; 2229 2230 ndb_make_search_key_low(&s, query); 2231 2232 k.mv_data = &s; 2233 k.mv_size = sizeof(s); 2234 2235 if ((rc = mdb_cursor_open(txn->mdb_txn, 2236 txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 2237 cursor))) { 2238 printf("search_profile: cursor opened failed: %s\n", 2239 mdb_strerror(rc)); 2240 return 0; 2241 } 2242 2243 // Position cursor at the next key greater than or equal to the specified key 2244 if (mdb_cursor_get(search->cursor, &k, &v, MDB_SET_RANGE)) { 2245 printf("search_profile: cursor get failed\n"); 2246 goto cleanup; 2247 } else { 2248 search->key = k.mv_data; 2249 assert(v.mv_size == 8); 2250 search->profile_key = *((uint64_t*)v.mv_data); 2251 return 1; 2252 } 2253 2254 cleanup: 2255 mdb_cursor_close(search->cursor); 2256 search->cursor = NULL; 2257 return 0; 2258 } 2259 2260 void ndb_search_profile_end(struct ndb_search *search) 2261 { 2262 if (search->cursor) 2263 mdb_cursor_close(search->cursor); 2264 } 2265 2266 int ndb_search_profile_next(struct ndb_search *search) 2267 { 2268 int rc; 2269 MDB_val k, v; 2270 unsigned char *init_id; 2271 2272 init_id = search->key->id; 2273 k.mv_data = search->key; 2274 k.mv_size = sizeof(*search->key); 2275 2276 retry: 2277 if ((rc = mdb_cursor_get(search->cursor, &k, &v, MDB_NEXT))) { 2278 ndb_debug("ndb_search_profile_next: %s\n", 2279 mdb_strerror(rc)); 2280 return 0; 2281 } else { 2282 search->key = k.mv_data; 2283 assert(v.mv_size == 8); 2284 search->profile_key = *((uint64_t*)v.mv_data); 2285 2286 // skip duplicate pubkeys 2287 if (!memcmp(init_id, search->key->id, 32)) 2288 goto retry; 2289 } 2290 2291 return 1; 2292 } 2293 2294 static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b) 2295 { 2296 int cmp; 2297 struct ndb_search_key *ska, *skb; 2298 2299 ska = a->mv_data; 2300 skb = b->mv_data; 2301 2302 MDB_val a2 = *a; 2303 MDB_val b2 = *b; 2304 2305 a2.mv_data = ska->search; 2306 a2.mv_size = sizeof(ska->search) + sizeof(ska->id); 2307 2308 cmp = mdb_cmp_memn(&a2, &b2); 2309 if (cmp) return cmp; 2310 2311 if (ska->timestamp < skb->timestamp) 2312 return -1; 2313 else if (ska->timestamp > skb->timestamp) 2314 return 1; 2315 2316 return 0; 2317 } 2318 2319 static int ndb_write_profile_pk_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key) 2320 2321 { 2322 MDB_val key, val; 2323 int rc; 2324 struct ndb_tsid tsid; 2325 MDB_dbi pk_db; 2326 2327 pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK]; 2328 2329 // write profile_pk + created_at index 2330 ndb_tsid_init(&tsid, note->pubkey, note->created_at); 2331 2332 key.mv_data = &tsid; 2333 key.mv_size = sizeof(tsid); 2334 val.mv_data = &profile_key; 2335 val.mv_size = sizeof(profile_key); 2336 2337 if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) { 2338 ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n", 2339 profile_key, mdb_strerror(rc)); 2340 return 0; 2341 } 2342 2343 return 1; 2344 } 2345 2346 static int ndb_write_profile(struct ndb_txn *txn, 2347 struct ndb_writer_profile *profile, 2348 uint64_t note_key) 2349 { 2350 uint64_t profile_key; 2351 struct ndb_note *note; 2352 void *flatbuf; 2353 size_t flatbuf_len; 2354 int rc; 2355 2356 MDB_val key, val; 2357 MDB_dbi profile_db; 2358 2359 note = profile->note.note; 2360 2361 // add note_key to profile record 2362 NdbProfileRecord_note_key_add(profile->record.builder, note_key); 2363 NdbProfileRecord_end_as_root(profile->record.builder); 2364 2365 flatbuf = profile->record.flatbuf = 2366 flatcc_builder_finalize_aligned_buffer(profile->record.builder, &flatbuf_len); 2367 2368 assert(((uint64_t)flatbuf % 8) == 0); 2369 2370 // TODO: this may not be safe!? 2371 flatbuf_len = (flatbuf_len + 7) & ~7; 2372 2373 //assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0); 2374 2375 // get dbs 2376 profile_db = txn->lmdb->dbs[NDB_DB_PROFILE]; 2377 2378 // get new key 2379 profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1; 2380 2381 // write profile to profile store 2382 key.mv_data = &profile_key; 2383 key.mv_size = sizeof(profile_key); 2384 val.mv_data = flatbuf; 2385 val.mv_size = flatbuf_len; 2386 2387 if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) { 2388 ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc)); 2389 return 0; 2390 } 2391 2392 // write last fetched record 2393 if (!ndb_maybe_write_last_profile_fetch(txn, note)) { 2394 ndb_debug("failed to write last profile fetched record\n"); 2395 } 2396 2397 // write profile pubkey index 2398 if (!ndb_write_profile_pk_index(txn, note, profile_key)) { 2399 ndb_debug("failed to write profile pubkey index\n"); 2400 return 0; 2401 } 2402 2403 // write name, display_name profile search indices 2404 if (!ndb_write_profile_search_indices(txn, note, profile_key, 2405 flatbuf)) { 2406 ndb_debug("failed to write profile search indices\n"); 2407 return 0; 2408 } 2409 2410 return 1; 2411 } 2412 2413 // find the last id tag in a note (e, p, etc) 2414 static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type) 2415 { 2416 unsigned char *last = NULL; 2417 struct ndb_iterator iter; 2418 struct ndb_str str; 2419 2420 // get the liked event id (last id) 2421 ndb_tags_iterate_start(note, &iter); 2422 2423 while (ndb_tags_iterate_next(&iter)) { 2424 if (iter.tag->count < 2) 2425 continue; 2426 2427 str = ndb_tag_str(note, iter.tag, 0); 2428 2429 // assign liked to the last e tag 2430 if (str.flag == NDB_PACKED_STR && str.str[0] == type) { 2431 str = ndb_tag_str(note, iter.tag, 1); 2432 if (str.flag == NDB_PACKED_ID) 2433 last = str.id; 2434 } 2435 } 2436 2437 return last; 2438 } 2439 2440 void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len) 2441 { 2442 MDB_val k, v; 2443 2444 k.mv_data = (unsigned char*)id; 2445 k.mv_size = 32; 2446 2447 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) { 2448 //ndb_debug("ndb_get_note_meta: mdb_get note failed\n"); 2449 return NULL; 2450 } 2451 2452 if (len) 2453 *len = v.mv_size; 2454 2455 return v.mv_data; 2456 } 2457 2458 // When receiving a reaction note, look for the liked id and increase the 2459 // reaction counter in the note metadata database 2460 static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note) 2461 { 2462 size_t len; 2463 void *root; 2464 int reactions, rc; 2465 MDB_val key, val; 2466 NdbEventMeta_table_t meta; 2467 unsigned char *liked = ndb_note_last_id_tag(note, 'e'); 2468 2469 if (liked == NULL) 2470 return 0; 2471 2472 root = ndb_get_note_meta(txn, liked, &len); 2473 2474 flatcc_builder_t builder; 2475 flatcc_builder_init(&builder); 2476 NdbEventMeta_start_as_root(&builder); 2477 2478 // no meta record, let's make one 2479 if (root == NULL) { 2480 NdbEventMeta_reactions_add(&builder, 1); 2481 } else { 2482 // clone existing and add to it 2483 meta = NdbEventMeta_as_root(root); 2484 2485 reactions = NdbEventMeta_reactions_get(meta); 2486 NdbEventMeta_clone(&builder, meta); 2487 NdbEventMeta_reactions_add(&builder, reactions + 1); 2488 } 2489 2490 NdbProfileRecord_end_as_root(&builder); 2491 root = flatcc_builder_finalize_aligned_buffer(&builder, &len); 2492 assert(((uint64_t)root % 8) == 0); 2493 2494 if (root == NULL) { 2495 ndb_debug("failed to create note metadata record\n"); 2496 return 0; 2497 } 2498 2499 // metadata is keyed on id because we want to collect stats regardless 2500 // if we have the note yet or not 2501 key.mv_data = liked; 2502 key.mv_size = 32; 2503 2504 val.mv_data = root; 2505 val.mv_size = len; 2506 2507 // write the new meta record 2508 //ndb_debug("writing stats record for "); 2509 //print_hex(liked, 32); 2510 //ndb_debug("\n"); 2511 2512 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) { 2513 ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc)); 2514 return 0; 2515 } 2516 2517 free(root); 2518 2519 return 1; 2520 } 2521 2522 2523 static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note, 2524 uint64_t note_key) 2525 2526 { 2527 struct ndb_tsid tsid; 2528 int rc; 2529 MDB_val key, val; 2530 MDB_dbi id_db; 2531 2532 ndb_tsid_init(&tsid, note->id, note->created_at); 2533 2534 key.mv_data = &tsid; 2535 key.mv_size = sizeof(tsid); 2536 val.mv_data = ¬e_key; 2537 val.mv_size = sizeof(note_key); 2538 2539 id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 2540 2541 if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) { 2542 ndb_debug("write note id index to db failed: %s\n", 2543 mdb_strerror(rc)); 2544 return 0; 2545 } 2546 2547 return 1; 2548 } 2549 2550 static int ndb_filter_group_add_filters(struct ndb_filter_group *group, 2551 struct ndb_filter *filters, 2552 int num_filters) 2553 { 2554 int i; 2555 2556 for (i = 0; i < num_filters; i++) { 2557 if (!ndb_filter_group_add(group, &filters[i])) 2558 return 0; 2559 } 2560 2561 return 1; 2562 } 2563 2564 2565 static struct ndb_filter_elements * 2566 ndb_filter_get_elems(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) 2567 { 2568 int i; 2569 struct ndb_filter_elements *els; 2570 2571 for (i = 0; i < filter->num_elements; i++) { 2572 els = ndb_filter_get_elements(filter, i); 2573 assert(els); 2574 if (els->field.type == typ) { 2575 return els; 2576 } 2577 } 2578 2579 return NULL; 2580 } 2581 2582 static uint64_t * 2583 ndb_filter_get_elem(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) 2584 { 2585 struct ndb_filter_elements *els; 2586 if ((els = ndb_filter_get_elems(filter, typ))) 2587 return &els->elements[0]; 2588 return NULL; 2589 } 2590 2591 static uint64_t *ndb_filter_get_int(struct ndb_filter *filter, 2592 enum ndb_filter_fieldtype typ) 2593 { 2594 uint64_t *el; 2595 if (NULL == (el = ndb_filter_get_elem(filter, typ))) 2596 return NULL; 2597 return el; 2598 } 2599 2600 static inline int push_query_result(struct ndb_query_results *results, 2601 struct ndb_query_result *result) 2602 { 2603 return cursor_push(&results->cur, (unsigned char*)result, sizeof(*result)); 2604 } 2605 2606 static int compare_query_results(const void *pa, const void *pb) 2607 { 2608 struct ndb_query_result *a, *b; 2609 2610 a = (struct ndb_query_result *)pa; 2611 b = (struct ndb_query_result *)pb; 2612 2613 if (a->note->created_at == b->note->created_at) { 2614 return 0; 2615 } else if (a->note->created_at > b->note->created_at) { 2616 return -1; 2617 } else { 2618 return 1; 2619 } 2620 } 2621 2622 static void ndb_query_result_init(struct ndb_query_result *res, 2623 struct ndb_note *note, 2624 uint64_t note_size, 2625 uint64_t note_id) 2626 { 2627 *res = (struct ndb_query_result){ 2628 .note_id = note_id, 2629 .note_size = note_size, 2630 .note = note, 2631 }; 2632 } 2633 2634 static int query_is_full(struct ndb_query_results *results, int limit) 2635 { 2636 if (results->cur.p >= results->cur.end) 2637 return 1; 2638 2639 return cursor_count(&results->cur, sizeof(struct ndb_query_result)) >= limit; 2640 } 2641 2642 static int ndb_query_plan_execute_ids(struct ndb_txn *txn, 2643 struct ndb_filter *filter, 2644 struct ndb_query_results *results, 2645 int limit 2646 ) 2647 { 2648 MDB_cursor *cur; 2649 MDB_dbi db; 2650 MDB_val k, v; 2651 int matched, rc, i; 2652 struct ndb_filter_elements *ids; 2653 struct ndb_note *note; 2654 struct ndb_query_result res; 2655 struct ndb_tsid tsid, *ptsid; 2656 uint64_t note_id, until, *pint; 2657 size_t note_size; 2658 unsigned char *id; 2659 2660 matched = 0; 2661 until = UINT64_MAX; 2662 2663 if (!(ids = ndb_filter_get_elems(filter, NDB_FILTER_IDS))) 2664 return 0; 2665 2666 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 2667 until = *pint; 2668 2669 db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 2670 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 2671 return 0; 2672 2673 // for each id in our ids filter, find in the db 2674 for (i = 0; i < ids->count; i++) { 2675 if (query_is_full(results, limit)) 2676 break; 2677 2678 id = ndb_filter_get_id_element(filter, ids, i); 2679 ndb_tsid_init(&tsid, (unsigned char *)id, until); 2680 2681 k.mv_data = &tsid; 2682 k.mv_size = sizeof(tsid); 2683 2684 if (!ndb_cursor_start(cur, &k, &v)) 2685 continue; 2686 2687 ptsid = (struct ndb_tsid *)k.mv_data; 2688 note_id = *(uint64_t*)v.mv_data; 2689 2690 if (memcmp(id, ptsid->id, 32) == 0) 2691 matched |= 1 << NDB_FILTER_AUTHORS; 2692 else 2693 continue; 2694 2695 // get the note because we need it to match against the filter 2696 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 2697 continue; 2698 2699 // Sure this particular lookup matched the index query, but 2700 // does it match the entire filter? Check! We also pass in 2701 // things we've already matched via the filter so we don't have 2702 // to check again. This can be pretty important for filters 2703 // with a large number of entries. 2704 if (!ndb_filter_matches_with(filter, note, matched)) 2705 continue; 2706 2707 ndb_query_result_init(&res, note, note_size, note_id); 2708 if (!push_query_result(results, &res)) 2709 break; 2710 } 2711 2712 mdb_cursor_close(cur); 2713 return 1; 2714 } 2715 2716 // 2717 // encode a tag index key 2718 // 2719 // consists of: 2720 // 2721 // u8 tag 2722 // u8 tag_val_len 2723 // [u8] tag_val_bytes 2724 // u64 created_at 2725 // 2726 static int ndb_encode_tag_key(unsigned char *buf, int buf_size, 2727 char tag, const unsigned char *val, 2728 unsigned char val_len, 2729 uint64_t timestamp) 2730 { 2731 struct cursor writer; 2732 int ok; 2733 2734 // quick exit for obvious case where it will be too big. There can be 2735 // values of val_len that still fail, but we just let the writer handle 2736 // those failure cases 2737 if (val_len >= buf_size) 2738 return 0; 2739 2740 make_cursor(buf, buf + buf_size, &writer); 2741 2742 ok = 2743 cursor_push_byte(&writer, tag) && 2744 cursor_push(&writer, (unsigned char*)val, val_len) && 2745 cursor_push(&writer, (unsigned char*)×tamp, sizeof(timestamp)); 2746 2747 if (!ok) 2748 return 0; 2749 2750 return writer.p - writer.start; 2751 } 2752 2753 static int ndb_query_plan_execute_created_at(struct ndb_txn *txn, 2754 struct ndb_filter *filter, 2755 struct ndb_query_results *results, 2756 int limit) 2757 { 2758 MDB_dbi db; 2759 MDB_val k, v; 2760 MDB_cursor *cur; 2761 int rc; 2762 struct ndb_note *note; 2763 struct ndb_tsid key, *pkey; 2764 uint64_t *pint, until, since, note_id; 2765 size_t note_size; 2766 struct ndb_query_result res; 2767 unsigned char high_key[32] = {0xFF}; 2768 2769 db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 2770 2771 until = UINT64_MAX; 2772 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 2773 until = *pint; 2774 2775 since = 0; 2776 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 2777 since = *pint; 2778 2779 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 2780 return 0; 2781 2782 // if we have until, start there, otherwise just use max 2783 ndb_tsid_init(&key, high_key, until); 2784 k.mv_data = &key; 2785 k.mv_size = sizeof(key); 2786 2787 if (!ndb_cursor_start(cur, &k, &v)) 2788 return 1; 2789 2790 while (!query_is_full(results, limit)) { 2791 pkey = (struct ndb_tsid *)k.mv_data; 2792 note_id = *(uint64_t*)v.mv_data; 2793 assert(v.mv_size == 8); 2794 2795 // TODO(perf): if we are only looking for IDs and have no other 2796 // condition, then we can use the ID in the index without 2797 // looking up the note. For now we always look up the note 2798 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 2799 goto next; 2800 2801 // does this entry match our filter? 2802 if (!ndb_filter_matches_with(filter, note, 0)) 2803 goto next; 2804 2805 // don't continue the scan if we're below `since` 2806 if (pkey->timestamp < since) 2807 break; 2808 2809 ndb_query_result_init(&res, note, (uint64_t)note_size, note_id); 2810 if (!push_query_result(results, &res)) 2811 break; 2812 next: 2813 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 2814 break; 2815 } 2816 2817 mdb_cursor_close(cur); 2818 return 1; 2819 } 2820 2821 static int ndb_query_plan_execute_tags(struct ndb_txn *txn, 2822 struct ndb_filter *filter, 2823 struct ndb_query_results *results, 2824 int limit) 2825 { 2826 MDB_cursor *cur; 2827 MDB_dbi db; 2828 MDB_val k, v; 2829 int len, taglen, rc, i; 2830 uint64_t *pint, until, note_id; 2831 size_t note_size; 2832 unsigned char key_buffer[255]; 2833 struct ndb_note *note; 2834 struct ndb_filter_elements *tags; 2835 unsigned char *tag; 2836 struct ndb_query_result res; 2837 2838 db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; 2839 2840 if (!(tags = ndb_filter_get_elems(filter, NDB_FILTER_TAGS))) 2841 return 0; 2842 2843 until = UINT64_MAX; 2844 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 2845 until = *pint; 2846 2847 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 2848 return 0; 2849 2850 for (i = 0; i < tags->count; i++) { 2851 tag = ndb_filter_get_id_element(filter, tags, i); 2852 2853 taglen = tags->field.elem_type == NDB_ELEMENT_ID 2854 ? 32 : strlen((const char*)tag); 2855 2856 if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), 2857 tags->field.tag, tag, taglen, 2858 until))) 2859 return 0; 2860 2861 k.mv_data = key_buffer; 2862 k.mv_size = len; 2863 2864 if (!ndb_cursor_start(cur, &k, &v)) 2865 continue; 2866 2867 // for each id in our ids filter, find in the db 2868 while (!query_is_full(results, limit)) { 2869 // check if tag value matches, bail if not 2870 if (((unsigned char *)k.mv_data)[0] != tags->field.tag) 2871 break; 2872 2873 // check if tag value matches, bail if not 2874 if (taglen != k.mv_size - 9) 2875 break; 2876 2877 if (memcmp(k.mv_data+1, tag, k.mv_size-9)) 2878 break; 2879 2880 note_id = *(uint64_t*)v.mv_data; 2881 2882 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 2883 goto next; 2884 2885 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_TAGS)) 2886 goto next; 2887 2888 ndb_query_result_init(&res, note, note_size, note_id); 2889 if (!push_query_result(results, &res)) 2890 break; 2891 2892 next: 2893 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 2894 break; 2895 } 2896 } 2897 2898 mdb_cursor_close(cur); 2899 return 1; 2900 } 2901 2902 static int ndb_query_plan_execute_kinds(struct ndb_txn *txn, 2903 struct ndb_filter *filter, 2904 struct ndb_query_results *results, 2905 int limit) 2906 { 2907 MDB_cursor *cur; 2908 MDB_dbi db; 2909 MDB_val k, v; 2910 struct ndb_note *note; 2911 struct ndb_u64_tsid tsid, *ptsid; 2912 struct ndb_filter_elements *kinds; 2913 struct ndb_query_result res; 2914 uint64_t kind, note_id, until, *pint; 2915 size_t note_size; 2916 int i, rc; 2917 2918 // we should have kinds in a kinds filter! 2919 if (!(kinds = ndb_filter_get_elems(filter, NDB_FILTER_KINDS))) 2920 return 0; 2921 2922 until = UINT64_MAX; 2923 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 2924 until = *pint; 2925 2926 db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 2927 2928 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 2929 return 0; 2930 2931 for (i = 0; i < kinds->count; i++) { 2932 if (query_is_full(results, limit)) 2933 break; 2934 2935 kind = kinds->elements[i]; 2936 ndb_debug("kind %" PRIu64 "\n", kind); 2937 ndb_u64_tsid_init(&tsid, kind, until); 2938 2939 k.mv_data = &tsid; 2940 k.mv_size = sizeof(tsid); 2941 2942 if (!ndb_cursor_start(cur, &k, &v)) 2943 continue; 2944 2945 // for each id in our ids filter, find in the db 2946 while (!query_is_full(results, limit)) { 2947 ptsid = (struct ndb_u64_tsid *)k.mv_data; 2948 if (ptsid->u64 != kind) 2949 break; 2950 2951 note_id = *(uint64_t*)v.mv_data; 2952 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 2953 goto next; 2954 2955 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_KINDS)) 2956 goto next; 2957 2958 ndb_query_result_init(&res, note, note_size, note_id); 2959 if (!push_query_result(results, &res)) 2960 break; 2961 2962 next: 2963 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 2964 break; 2965 } 2966 } 2967 2968 mdb_cursor_close(cur); 2969 return 1; 2970 } 2971 2972 static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter) 2973 { 2974 struct ndb_filter_elements *ids, *kinds, *authors, *tags; 2975 2976 ids = ndb_filter_get_elems(filter, NDB_FILTER_IDS); 2977 kinds = ndb_filter_get_elems(filter, NDB_FILTER_KINDS); 2978 authors = ndb_filter_get_elems(filter, NDB_FILTER_AUTHORS); 2979 tags = ndb_filter_get_elems(filter, NDB_FILTER_TAGS); 2980 2981 // this is rougly similar to the heuristic in strfry's dbscan 2982 if (ids) { 2983 return NDB_PLAN_IDS; 2984 } else if (authors && authors->count <= 5) { 2985 // TODO: actually implment author plan and use it 2986 //return NDB_PLAN_AUTHORS; 2987 return NDB_PLAN_CREATED; 2988 } else if (tags && tags->count <= 5) { 2989 return NDB_PLAN_TAGS; 2990 } else if (kinds) { 2991 return NDB_PLAN_KINDS; 2992 } 2993 2994 return NDB_PLAN_CREATED; 2995 } 2996 2997 static const char *ndb_query_plan_name(int plan_id) 2998 { 2999 switch (plan_id) { 3000 case NDB_PLAN_IDS: return "ids"; 3001 case NDB_PLAN_KINDS: return "kinds"; 3002 case NDB_PLAN_TAGS: return "tags"; 3003 case NDB_PLAN_CREATED: return "created"; 3004 case NDB_PLAN_AUTHORS: return "authors"; 3005 } 3006 3007 return "unknown"; 3008 } 3009 3010 static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter, 3011 struct ndb_query_result *res, int capacity, 3012 int *results_out) 3013 { 3014 struct ndb_query_results results; 3015 uint64_t limit, *pint; 3016 enum ndb_query_plan plan; 3017 limit = capacity; 3018 3019 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT))) 3020 limit = *pint; 3021 3022 limit = min(capacity, limit); 3023 make_cursor((unsigned char *)res, 3024 ((unsigned char *)res) + limit * sizeof(*res), 3025 &results.cur); 3026 3027 plan = ndb_filter_plan(filter); 3028 ndb_debug("using query plan '%s'\n", ndb_query_plan_name(plan)); 3029 switch (plan) { 3030 // We have a list of ids, just open a cursor and jump to each once 3031 case NDB_PLAN_IDS: 3032 if (!ndb_query_plan_execute_ids(txn, filter, &results, limit)) 3033 return 0; 3034 break; 3035 3036 // We have just kinds, just scan the kind index 3037 case NDB_PLAN_KINDS: 3038 if (!ndb_query_plan_execute_kinds(txn, filter, &results, limit)) 3039 return 0; 3040 break; 3041 3042 case NDB_PLAN_TAGS: 3043 if (!ndb_query_plan_execute_tags(txn, filter, &results, limit)) 3044 return 0; 3045 break; 3046 case NDB_PLAN_CREATED: 3047 if (!ndb_query_plan_execute_created_at(txn, filter, &results, limit)) 3048 return 0; 3049 break; 3050 case NDB_PLAN_AUTHORS: 3051 // TODO: finish authors query plan 3052 return 0; 3053 } 3054 3055 *results_out = cursor_count(&results.cur, sizeof(*res)); 3056 return 1; 3057 } 3058 3059 int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters, 3060 struct ndb_query_result *results, int result_capacity, int *count) 3061 { 3062 int i, out; 3063 struct ndb_query_result *p = results; 3064 3065 out = 0; 3066 *count = 0; 3067 3068 for (i = 0; i < num_filters; i++) { 3069 if (!ndb_query_filter(txn, &filters[i], p, 3070 result_capacity, &out)) { 3071 return 0; 3072 } 3073 3074 *count += out; 3075 p += out; 3076 result_capacity -= out; 3077 if (result_capacity <= 0) 3078 break; 3079 } 3080 3081 // sort results 3082 qsort(results, *count, sizeof(*results), compare_query_results); 3083 return 1; 3084 } 3085 3086 static int ndb_write_note_tag_index(struct ndb_txn *txn, struct ndb_note *note, 3087 uint64_t note_key) 3088 { 3089 unsigned char key_buffer[255]; 3090 struct ndb_iterator iter; 3091 struct ndb_str tkey, tval; 3092 char tchar; 3093 int len, rc; 3094 MDB_val key, val; 3095 MDB_dbi tags_db; 3096 3097 tags_db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; 3098 3099 ndb_tags_iterate_start(note, &iter); 3100 3101 while (ndb_tags_iterate_next(&iter)) { 3102 if (iter.tag->count < 2) 3103 continue; 3104 3105 tkey = ndb_tag_str(note, iter.tag, 0); 3106 3107 // we only write indices for 1-char tags. 3108 tchar = tkey.str[0]; 3109 if (tchar == 0 || tkey.str[1] != 0) 3110 continue; 3111 3112 tval = ndb_tag_str(note, iter.tag, 1); 3113 len = ndb_str_len(&tval); 3114 3115 if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), 3116 tchar, tval.id, (unsigned char)len, 3117 ndb_note_created_at(note)))) { 3118 // this will fail when we try to encode a key that is 3119 // too big 3120 continue; 3121 } 3122 3123 //ndb_debug("writing tag '%c':'data:%d' to index\n", tchar, len); 3124 3125 key.mv_data = key_buffer; 3126 key.mv_size = len; 3127 3128 val.mv_data = ¬e_key; 3129 val.mv_size = sizeof(note_key); 3130 3131 if ((rc = mdb_put(txn->mdb_txn, tags_db, &key, &val, 0))) { 3132 ndb_debug("write note tag index to db failed: %s\n", 3133 mdb_strerror(rc)); 3134 return 0; 3135 } 3136 } 3137 3138 return 1; 3139 } 3140 3141 static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note, 3142 uint64_t note_key) 3143 { 3144 struct ndb_u64_tsid tsid; 3145 int rc; 3146 MDB_val key, val; 3147 MDB_dbi kind_db; 3148 3149 ndb_u64_tsid_init(&tsid, note->kind, note->created_at); 3150 3151 key.mv_data = &tsid; 3152 key.mv_size = sizeof(tsid); 3153 val.mv_data = ¬e_key; 3154 val.mv_size = sizeof(note_key); 3155 3156 kind_db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 3157 3158 if ((rc = mdb_put(txn->mdb_txn, kind_db, &key, &val, 0))) { 3159 ndb_debug("write note kind index to db failed: %s\n", 3160 mdb_strerror(rc)); 3161 return 0; 3162 } 3163 3164 return 1; 3165 } 3166 3167 static int ndb_write_word_to_index(struct ndb_txn *txn, const char *word, 3168 int word_len, int word_index, 3169 uint64_t timestamp, uint64_t note_id) 3170 { 3171 // cap to some reasonable key size 3172 unsigned char buffer[1024]; 3173 int keysize, rc; 3174 MDB_val k, v; 3175 MDB_dbi text_db; 3176 3177 // build our compressed text index key 3178 if (!ndb_make_text_search_key(buffer, sizeof(buffer), word_index, 3179 word_len, word, timestamp, note_id, 3180 &keysize)) { 3181 // probably too big 3182 3183 return 0; 3184 } 3185 3186 k.mv_data = buffer; 3187 k.mv_size = keysize; 3188 3189 v.mv_data = NULL; 3190 v.mv_size = 0; 3191 3192 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 3193 3194 if ((rc = mdb_put(txn->mdb_txn, text_db, &k, &v, 0))) { 3195 ndb_debug("write note text index to db failed: %s\n", 3196 mdb_strerror(rc)); 3197 return 0; 3198 } 3199 3200 return 1; 3201 } 3202 3203 3204 3205 // break a string into individual words for querying or for building the 3206 // fulltext search index. This is callback based so we don't need to 3207 // build up an intermediate structure 3208 static int ndb_parse_words(struct cursor *cur, void *ctx, ndb_word_parser_fn fn) 3209 { 3210 int word_len, words; 3211 const char *word; 3212 3213 words = 0; 3214 3215 while (cur->p < cur->end) { 3216 consume_whitespace_or_punctuation(cur); 3217 if (cur->p >= cur->end) 3218 break; 3219 word = (const char *)cur->p; 3220 3221 if (!consume_until_boundary(cur)) 3222 break; 3223 3224 // start of word or end 3225 word_len = cur->p - (unsigned char *)word; 3226 if (word_len == 0 && cur->p >= cur->end) 3227 break; 3228 3229 if (word_len == 0) { 3230 if (!cursor_skip(cur, 1)) 3231 break; 3232 continue; 3233 } 3234 3235 //ndb_debug("writing word index '%.*s'\n", word_len, word); 3236 3237 if (!fn(ctx, word, word_len, words)) 3238 continue; 3239 3240 words++; 3241 } 3242 3243 return 1; 3244 } 3245 3246 struct ndb_word_writer_ctx 3247 { 3248 struct ndb_txn *txn; 3249 struct ndb_note *note; 3250 uint64_t note_id; 3251 }; 3252 3253 static int ndb_fulltext_word_writer(void *ctx, 3254 const char *word, int word_len, int words) 3255 { 3256 struct ndb_word_writer_ctx *wctx = ctx; 3257 3258 if (!ndb_write_word_to_index(wctx->txn, word, word_len, words, 3259 wctx->note->created_at, wctx->note_id)) { 3260 // too big to write this one, just skip it 3261 ndb_debug("failed to write word '%.*s' to index\n", word_len, word); 3262 3263 return 0; 3264 } 3265 3266 //fprintf(stderr, "wrote '%.*s' to note text index\n", word_len, word); 3267 return 1; 3268 } 3269 3270 static int ndb_write_note_fulltext_index(struct ndb_txn *txn, 3271 struct ndb_note *note, 3272 uint64_t note_id) 3273 { 3274 struct cursor cur; 3275 unsigned char *content; 3276 struct ndb_str str; 3277 struct ndb_word_writer_ctx ctx; 3278 3279 str = ndb_note_str(note, ¬e->content); 3280 // I don't think this should happen? 3281 if (unlikely(str.flag == NDB_PACKED_ID)) 3282 return 0; 3283 3284 content = (unsigned char *)str.str; 3285 3286 make_cursor(content, content + note->content_length, &cur); 3287 3288 ctx.txn = txn; 3289 ctx.note = note; 3290 ctx.note_id = note_id; 3291 3292 ndb_parse_words(&cur, &ctx, ndb_fulltext_word_writer); 3293 3294 return 1; 3295 } 3296 3297 static int ndb_parse_search_words(void *ctx, const char *word_str, int word_len, int word_index) 3298 { 3299 (void)word_index; 3300 struct ndb_search_words *words = ctx; 3301 struct ndb_word *word; 3302 3303 if (words->num_words + 1 > MAX_TEXT_SEARCH_WORDS) 3304 return 0; 3305 3306 word = &words->words[words->num_words++]; 3307 word->word = word_str; 3308 word->word_len = word_len; 3309 3310 return 1; 3311 } 3312 3313 static void ndb_search_words_init(struct ndb_search_words *words) 3314 { 3315 words->num_words = 0; 3316 } 3317 3318 static int prefix_count(const char *str1, int len1, const char *str2, int len2) { 3319 int i, count = 0; 3320 int min_len = len1 < len2 ? len1 : len2; 3321 3322 for (i = 0; i < min_len; i++) { 3323 // case insensitive 3324 if (tolower(str1[i]) == tolower(str2[i])) 3325 count++; 3326 else 3327 break; 3328 } 3329 3330 return count; 3331 } 3332 3333 static int ndb_prefix_matches(struct ndb_text_search_result *result, 3334 struct ndb_word *search_word) 3335 { 3336 // Empty strings shouldn't happen but let's 3337 if (result->key.str_len < 2 || search_word->word_len < 2) 3338 return 0; 3339 3340 // make sure we at least have two matching prefix characters. exact 3341 // matches are nice but range searches allow us to match prefixes as 3342 // well. A double-char prefix is suffient, but maybe we could up this 3343 // in the future. 3344 // 3345 // TODO: How are we handling utf-8 prefix matches like 3346 // japanese? 3347 // 3348 if ( result->key.str[0] != tolower(search_word->word[0]) 3349 && result->key.str[1] != tolower(search_word->word[1]) 3350 ) 3351 return 0; 3352 3353 // count the number of prefix-matched characters. This will be used 3354 // for ranking search results 3355 result->prefix_chars = prefix_count(result->key.str, 3356 result->key.str_len, 3357 search_word->word, 3358 search_word->word_len); 3359 3360 if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5)) 3361 return 0; 3362 3363 return 1; 3364 } 3365 3366 // This is called when scanning the full text search index. Scanning stops 3367 // when we no longer have a prefix match for the word 3368 static int ndb_text_search_next_word(MDB_cursor *cursor, MDB_cursor_op op, 3369 MDB_val *k, struct ndb_word *search_word, 3370 struct ndb_text_search_result *last_result, 3371 struct ndb_text_search_result *result, 3372 MDB_cursor_op order_op) 3373 { 3374 struct cursor key_cursor; 3375 //struct ndb_text_search_key search_key; 3376 MDB_val v; 3377 int retries; 3378 retries = -1; 3379 3380 make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor); 3381 3382 // When op is MDB_SET_RANGE, this initializes the search. Position 3383 // the cursor at the next key greater than or equal to the specified 3384 // key. 3385 // 3386 // Subsequent searches should use MDB_NEXT 3387 if (mdb_cursor_get(cursor, k, &v, op)) { 3388 // we should only do this if we're going in reverse 3389 if (op == MDB_SET_RANGE && order_op == MDB_PREV) { 3390 // if set range worked and our key exists, it should be 3391 // the one right before this one 3392 if (mdb_cursor_get(cursor, k, &v, MDB_PREV)) 3393 return 0; 3394 } else { 3395 return 0; 3396 } 3397 } 3398 3399 retry: 3400 retries++; 3401 /* 3402 printf("continuing from "); 3403 if (ndb_unpack_text_search_key(k->mv_data, k->mv_size, &search_key)) { 3404 ndb_print_text_search_key(&search_key); 3405 } else { printf("??"); } 3406 printf("\n"); 3407 */ 3408 3409 make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor); 3410 3411 if (unlikely(!ndb_unpack_text_search_key_noteid(&key_cursor, &result->key.note_id))) { 3412 fprintf(stderr, "UNUSUAL: failed to unpack text search key note_id\n"); 3413 return 0; 3414 } 3415 3416 if (last_result) { 3417 if (last_result->key.note_id != result->key.note_id) 3418 return 0; 3419 } 3420 3421 // On success, this could still be not related at all. 3422 // It could just be adjacent to the word. Let's check 3423 // if we have a matching prefix at least. 3424 3425 // Before we unpack the entire key, let's quickly 3426 // unpack just the string to check the prefix. We don't 3427 // need to unpack the entire key if the prefix doesn't 3428 // match 3429 if (!ndb_unpack_text_search_key_string(&key_cursor, 3430 &result->key.str, 3431 &result->key.str_len)) { 3432 // this should never happen 3433 fprintf(stderr, "UNUSUAL: failed to unpack text search key string\n"); 3434 return 0; 3435 } 3436 3437 if (!ndb_prefix_matches(result, search_word)) { 3438 /* 3439 printf("result prefix '%.*s' didn't match search word '%.*s'\n", 3440 result->key.str_len, result->key.str, 3441 search_word->word_len, search_word->word); 3442 */ 3443 // we should only do this if we're going in reverse 3444 if (retries == 0 && op == MDB_SET_RANGE && order_op == MDB_PREV) { 3445 // if set range worked and our key exists, it should be 3446 // the one right before this one 3447 mdb_cursor_get(cursor, k, &v, MDB_PREV); 3448 goto retry; 3449 } else { 3450 return 0; 3451 } 3452 } 3453 3454 // Unpack the remaining text search key, we will need this information 3455 // when building up our search results. 3456 if (!ndb_unpack_remaining_text_search_key(&key_cursor, &result->key)) { 3457 // This should never happen 3458 fprintf(stderr, "UNUSUAL: failed to unpack text search key\n"); 3459 return 0; 3460 } 3461 3462 /* 3463 if (last_result) { 3464 if (result->key.word_index < last_result->key.word_index) { 3465 fprintf(stderr, "skipping '%.*s' because it is before last result '%.*s'\n", 3466 result->key.str_len, result->key.str, 3467 last_result->key.str_len, last_result->key.str); 3468 return 0; 3469 } 3470 } 3471 */ 3472 3473 return 1; 3474 } 3475 3476 static void ndb_text_search_results_init( 3477 struct ndb_text_search_results *results) { 3478 results->num_results = 0; 3479 } 3480 3481 void ndb_default_text_search_config(struct ndb_text_search_config *cfg) 3482 { 3483 cfg->order = NDB_ORDER_DESCENDING; 3484 cfg->limit = MAX_TEXT_SEARCH_RESULTS; 3485 } 3486 3487 void ndb_text_search_config_set_order(struct ndb_text_search_config *cfg, 3488 enum ndb_search_order order) 3489 { 3490 cfg->order = order; 3491 } 3492 3493 void ndb_text_search_config_set_limit(struct ndb_text_search_config *cfg, int limit) 3494 { 3495 cfg->limit = limit; 3496 } 3497 3498 int ndb_text_search(struct ndb_txn *txn, const char *query, 3499 struct ndb_text_search_results *results, 3500 struct ndb_text_search_config *config) 3501 { 3502 unsigned char buffer[1024], *buf; 3503 unsigned char saved_buf[1024], *saved; 3504 struct ndb_text_search_result *result, *last_result; 3505 struct ndb_text_search_result candidate, last_candidate; 3506 struct ndb_search_words search_words; 3507 //struct ndb_text_search_key search_key; 3508 struct ndb_word *search_word; 3509 struct cursor cur; 3510 ndb_text_search_key_order_fn key_order_fn; 3511 MDB_dbi text_db; 3512 MDB_cursor *cursor; 3513 MDB_val k, v; 3514 int i, j, keysize, saved_size, limit; 3515 MDB_cursor_op op, order_op; 3516 3517 saved = NULL; 3518 ndb_text_search_results_init(results); 3519 ndb_search_words_init(&search_words); 3520 3521 // search config 3522 limit = MAX_TEXT_SEARCH_RESULTS; 3523 order_op = MDB_PREV; 3524 key_order_fn = ndb_make_text_search_key_high; 3525 if (config) { 3526 if (config->order == NDB_ORDER_ASCENDING) { 3527 order_op = MDB_NEXT; 3528 key_order_fn = ndb_make_text_search_key_low; 3529 } 3530 limit = min(limit, config->limit); 3531 } 3532 // end search config 3533 3534 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 3535 make_cursor((unsigned char *)query, (unsigned char *)query + strlen(query), &cur); 3536 3537 ndb_parse_words(&cur, &search_words, ndb_parse_search_words); 3538 if (search_words.num_words == 0) 3539 return 0; 3540 3541 if ((i = mdb_cursor_open(txn->mdb_txn, text_db, &cursor))) { 3542 fprintf(stderr, "nd_text_search: mdb_cursor_open failed, error %d\n", i); 3543 return 0; 3544 } 3545 3546 // for each word, we recursively find all of the submatches 3547 while (results->num_results < limit) { 3548 last_result = NULL; 3549 result = &results->results[results->num_results]; 3550 3551 // if we have saved, then we continue from the last root search 3552 // sequence 3553 if (saved) { 3554 buf = saved_buf; 3555 saved = NULL; 3556 keysize = saved_size; 3557 3558 k.mv_data = buf; 3559 k.mv_size = saved_size; 3560 3561 // reposition the cursor so we can continue 3562 if (mdb_cursor_get(cursor, &k, &v, MDB_SET_RANGE)) 3563 break; 3564 3565 op = order_op; 3566 } else { 3567 // construct a packed fulltext search key using this 3568 // word this key doesn't contain any timestamp or index 3569 // info, so it should range match instead of exact 3570 // match 3571 if (!key_order_fn(buffer, sizeof(buffer), 3572 search_words.words[0].word_len, 3573 search_words.words[0].word, &keysize)) 3574 { 3575 // word is too big to fit in 1024-sized key 3576 continue; 3577 } 3578 3579 buf = buffer; 3580 op = MDB_SET_RANGE; 3581 } 3582 3583 for (j = 0; j < search_words.num_words; j++) { 3584 search_word = &search_words.words[j]; 3585 3586 // shouldn't happen but let's be defensive a bit 3587 if (search_word->word_len == 0) 3588 continue; 3589 3590 // if we already matched a note in this phrase, make 3591 // sure we're including the note id in the query 3592 if (last_result) { 3593 // we are narrowing down a search. 3594 // if we already have this note id, just continue 3595 for (i = 0; i < results->num_results; i++) { 3596 if (results->results[i].key.note_id == last_result->key.note_id) 3597 goto cont; 3598 } 3599 3600 if (!ndb_make_noted_text_search_key( 3601 buffer, sizeof(buffer), 3602 search_word->word_len, 3603 search_word->word, 3604 last_result->key.timestamp, 3605 last_result->key.note_id, 3606 &keysize)) 3607 { 3608 continue; 3609 } 3610 3611 buf = buffer; 3612 } 3613 3614 k.mv_data = buf; 3615 k.mv_size = keysize; 3616 3617 if (!ndb_text_search_next_word(cursor, op, &k, 3618 search_word, 3619 last_result, 3620 &candidate, 3621 order_op)) { 3622 break; 3623 } 3624 3625 *result = candidate; 3626 op = MDB_SET_RANGE; 3627 3628 // save the first key match, since we will continue from 3629 // this on the next root word result 3630 if (j == 0 && !saved) { 3631 memcpy(saved_buf, k.mv_data, k.mv_size); 3632 saved = saved_buf; 3633 saved_size = k.mv_size; 3634 } 3635 3636 last_candidate = *result; 3637 last_result = &last_candidate; 3638 } 3639 3640 cont: 3641 // we matched all of the queries! 3642 if (j == search_words.num_words) { 3643 results->num_results++; 3644 } else if (j == 0) { 3645 break; 3646 } 3647 3648 } 3649 3650 mdb_cursor_close(cursor); 3651 3652 return 1; 3653 } 3654 3655 static void ndb_write_blocks(struct ndb_txn *txn, uint64_t note_key, 3656 struct ndb_blocks *blocks) 3657 { 3658 int rc; 3659 MDB_val key, val; 3660 3661 // make sure we're not writing the owned flag to the db 3662 blocks->flags &= ~NDB_BLOCK_FLAG_OWNED; 3663 3664 key.mv_data = ¬e_key; 3665 key.mv_size = sizeof(note_key); 3666 val.mv_data = blocks; 3667 val.mv_size = ndb_blocks_total_size(blocks); 3668 assert((val.mv_size % 8) == 0); 3669 3670 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_BLOCKS], &key, &val, 0))) { 3671 ndb_debug("write version to note_blocks failed: %s\n", 3672 mdb_strerror(rc)); 3673 return; 3674 } 3675 } 3676 3677 static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note, 3678 uint64_t note_key, unsigned char *scratch, 3679 size_t scratch_size) 3680 { 3681 size_t content_len; 3682 const char *content; 3683 struct ndb_blocks *blocks; 3684 3685 content_len = ndb_note_content_length(note); 3686 content = ndb_note_content(note); 3687 3688 if (!ndb_parse_content(scratch, scratch_size, content, content_len, &blocks)) { 3689 //ndb_debug("failed to parse content '%.*s'\n", content_len, content); 3690 return 0; 3691 } 3692 3693 ndb_write_blocks(txn, note_key, blocks); 3694 return 1; 3695 } 3696 3697 static uint64_t ndb_write_note(struct ndb_txn *txn, 3698 struct ndb_writer_note *note, 3699 unsigned char *scratch, size_t scratch_size) 3700 { 3701 int rc; 3702 uint64_t note_key; 3703 MDB_dbi note_db; 3704 MDB_val key, val; 3705 3706 // let's quickly sanity check if we already have this note 3707 if (ndb_get_notekey_by_id(txn, note->note->id)) 3708 return 0; 3709 3710 // get dbs 3711 note_db = txn->lmdb->dbs[NDB_DB_NOTE]; 3712 3713 // get new key 3714 note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1; 3715 3716 // write note to event store 3717 key.mv_data = ¬e_key; 3718 key.mv_size = sizeof(note_key); 3719 val.mv_data = note->note; 3720 val.mv_size = note->note_len; 3721 3722 if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) { 3723 ndb_debug("write note to db failed: %s\n", mdb_strerror(rc)); 3724 return 0; 3725 } 3726 3727 ndb_write_note_id_index(txn, note->note, note_key); 3728 ndb_write_note_kind_index(txn, note->note, note_key); 3729 ndb_write_note_tag_index(txn, note->note, note_key); 3730 3731 // only parse content and do fulltext index on text and longform notes 3732 if (note->note->kind == 1 || note->note->kind == 30023) { 3733 if (!ndb_write_note_fulltext_index(txn, note->note, note_key)) 3734 return 0; 3735 3736 // write note blocks 3737 ndb_write_new_blocks(txn, note->note, note_key, scratch, 3738 scratch_size); 3739 } 3740 3741 if (note->note->kind == 7) { 3742 ndb_write_reaction_stats(txn, note->note); 3743 } 3744 3745 return note_key; 3746 } 3747 3748 // only to be called from the writer thread 3749 static void ndb_write_version(struct ndb_txn *txn, uint64_t version) 3750 { 3751 int rc; 3752 MDB_val key, val; 3753 uint64_t version_key; 3754 3755 version_key = NDB_META_KEY_VERSION; 3756 3757 key.mv_data = &version_key; 3758 key.mv_size = sizeof(version_key); 3759 val.mv_data = &version; 3760 val.mv_size = sizeof(version); 3761 3762 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { 3763 ndb_debug("write version to ndb_meta failed: %s\n", 3764 mdb_strerror(rc)); 3765 return; 3766 } 3767 3768 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 3769 } 3770 3771 struct written_note { 3772 uint64_t note_id; 3773 struct ndb_writer_note *note; 3774 }; 3775 3776 // When the data has been committed to the database, take all of the written 3777 // notes, check them against subscriptions, and then write to the subscription 3778 // inbox for all matching notes 3779 static void ndb_notify_subscriptions(struct ndb_monitor *monitor, 3780 struct written_note *wrote, int num_notes) 3781 { 3782 int i, k; 3783 int pushed; 3784 struct written_note *written; 3785 struct ndb_note *note; 3786 struct ndb_subscription *sub; 3787 3788 for (i = 0; i < monitor->num_subscriptions; i++) { 3789 sub = &monitor->subscriptions[i]; 3790 ndb_debug("checking subscription %d, %d notes\n", i, num_notes); 3791 3792 pushed = 0; 3793 for (k = 0; k < num_notes; k++) { 3794 written = &wrote[k]; 3795 note = written->note->note; 3796 3797 if (ndb_filter_group_matches(&sub->group, note)) { 3798 ndb_debug("pushing note\n"); 3799 3800 if (!prot_queue_push(&sub->inbox, &written->note_id)) { 3801 ndb_debug("couldn't push note to subscriber"); 3802 } else { 3803 pushed++; 3804 } 3805 } else { 3806 ndb_debug("not pushing note\n"); 3807 } 3808 } 3809 3810 // After pushing all of the matching notes, check to see if we 3811 // have a registered subscription callback. If so, we call it. 3812 // The callback needs to call ndb_poll_for_notes to pull data 3813 // that was just pushed to the queue in the for loop above. 3814 if (monitor->sub_cb != NULL && pushed > 0) { 3815 monitor->sub_cb(monitor->sub_cb_ctx, sub->subid); 3816 } 3817 } 3818 } 3819 3820 static void *ndb_writer_thread(void *data) 3821 { 3822 struct ndb_writer *writer = data; 3823 struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; 3824 struct written_note written_notes[THREAD_QUEUE_BATCH]; 3825 size_t scratch_size; 3826 int i, popped, done, any_note, num_notes; 3827 uint64_t note_nkey; 3828 struct ndb_txn txn; 3829 unsigned char *scratch; 3830 3831 // 8mb scratch buffer for parsing note content 3832 scratch_size = 8 * 1024 * 1024; 3833 scratch = malloc(scratch_size); 3834 MDB_txn *mdb_txn = NULL; 3835 ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn); 3836 3837 done = 0; 3838 while (!done) { 3839 txn.mdb_txn = NULL; 3840 num_notes = 0; 3841 popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); 3842 ndb_debug("writer popped %d items\n", popped); 3843 3844 any_note = 0; 3845 for (i = 0 ; i < popped; i++) { 3846 msg = &msgs[i]; 3847 switch (msg->type) { 3848 case NDB_WRITER_NOTE: any_note = 1; break; 3849 case NDB_WRITER_PROFILE: any_note = 1; break; 3850 case NDB_WRITER_DBMETA: any_note = 1; break; 3851 case NDB_WRITER_PROFILE_LAST_FETCH: any_note = 1; break; 3852 case NDB_WRITER_BLOCKS: any_note = 1; break; 3853 case NDB_WRITER_QUIT: break; 3854 } 3855 } 3856 3857 if (any_note && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn)) 3858 { 3859 fprintf(stderr, "writer thread txn_begin failed"); 3860 // should definitely not happen unless DB is full 3861 // or something ? 3862 continue; 3863 } 3864 3865 for (i = 0; i < popped; i++) { 3866 msg = &msgs[i]; 3867 3868 switch (msg->type) { 3869 case NDB_WRITER_QUIT: 3870 // quits are handled before this 3871 done = 1; 3872 continue; 3873 case NDB_WRITER_PROFILE: 3874 note_nkey = 3875 ndb_write_note(&txn, &msg->note, 3876 scratch, scratch_size); 3877 if (note_nkey > 0) { 3878 written_notes[num_notes++] = 3879 (struct written_note){ 3880 .note_id = note_nkey, 3881 .note = &msg->note, 3882 }; 3883 } else { 3884 ndb_debug("failed to write note\n"); 3885 } 3886 if (msg->profile.record.builder) { 3887 // only write if parsing didn't fail 3888 ndb_write_profile(&txn, &msg->profile, 3889 note_nkey); 3890 } 3891 break; 3892 case NDB_WRITER_NOTE: 3893 note_nkey = ndb_write_note(&txn, &msg->note, 3894 scratch, 3895 scratch_size); 3896 3897 if (note_nkey > 0) { 3898 written_notes[num_notes++] = (struct written_note){ 3899 .note_id = note_nkey, 3900 .note = &msg->note, 3901 }; 3902 } 3903 break; 3904 case NDB_WRITER_DBMETA: 3905 ndb_write_version(&txn, msg->ndb_meta.version); 3906 break; 3907 case NDB_WRITER_BLOCKS: 3908 ndb_write_blocks(&txn, msg->blocks.note_key, 3909 msg->blocks.blocks); 3910 break; 3911 case NDB_WRITER_PROFILE_LAST_FETCH: 3912 ndb_writer_last_profile_fetch(&txn, 3913 msg->last_fetch.pubkey, 3914 msg->last_fetch.fetched_at 3915 ); 3916 break; 3917 } 3918 } 3919 3920 // commit writes 3921 if (any_note) { 3922 if (!ndb_end_query(&txn)) { 3923 ndb_debug("writer thread txn commit failed\n"); 3924 } else { 3925 ndb_debug("notifying subscriptions, %d notes\n", num_notes); 3926 ndb_notify_subscriptions(writer->monitor, 3927 written_notes, 3928 num_notes); 3929 // update subscriptions 3930 } 3931 } 3932 3933 // free notes 3934 for (i = 0; i < popped; i++) { 3935 msg = &msgs[i]; 3936 if (msg->type == NDB_WRITER_NOTE) { 3937 free(msg->note.note); 3938 } else if (msg->type == NDB_WRITER_PROFILE) { 3939 free(msg->profile.note.note); 3940 ndb_profile_record_builder_free(&msg->profile.record); 3941 } else if (msg->type == NDB_WRITER_BLOCKS) { 3942 ndb_blocks_free(msg->blocks.blocks); 3943 } 3944 } 3945 } 3946 3947 free(scratch); 3948 ndb_debug("quitting writer thread\n"); 3949 return NULL; 3950 } 3951 3952 static void *ndb_ingester_thread(void *data) 3953 { 3954 secp256k1_context *ctx; 3955 struct thread *thread = data; 3956 struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx; 3957 struct ndb_lmdb *lmdb = ingester->writer->lmdb; 3958 struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg; 3959 struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out; 3960 int i, to_write, popped, done, any_event; 3961 MDB_txn *read_txn = NULL; 3962 int rc; 3963 3964 ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); 3965 ndb_debug("started ingester thread\n"); 3966 3967 done = 0; 3968 while (!done) { 3969 to_write = 0; 3970 any_event = 0; 3971 3972 popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH); 3973 ndb_debug("ingester popped %d items\n", popped); 3974 3975 for (i = 0; i < popped; i++) { 3976 msg = &msgs[i]; 3977 if (msg->type == NDB_INGEST_EVENT) { 3978 any_event = 1; 3979 break; 3980 } 3981 } 3982 3983 if (any_event && (rc = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn))) { 3984 // this is bad 3985 fprintf(stderr, "UNUSUAL ndb_ingester: mdb_txn_begin failed: '%s'\n", 3986 mdb_strerror(rc)); 3987 continue; 3988 } 3989 3990 for (i = 0; i < popped; i++) { 3991 msg = &msgs[i]; 3992 switch (msg->type) { 3993 case NDB_INGEST_QUIT: 3994 done = 1; 3995 break; 3996 3997 case NDB_INGEST_EVENT: 3998 out = &outs[to_write]; 3999 if (ndb_ingester_process_event(ctx, ingester, 4000 &msg->event, out, 4001 read_txn)) { 4002 to_write++; 4003 } 4004 } 4005 } 4006 4007 if (any_event) 4008 mdb_txn_abort(read_txn); 4009 4010 if (to_write > 0) { 4011 ndb_debug("pushing %d events to write queue\n", to_write); 4012 if (!ndb_writer_queue_msgs(ingester->writer, outs, to_write)) { 4013 ndb_debug("failed pushing %d events to write queue\n", to_write); 4014 } 4015 } 4016 } 4017 4018 ndb_debug("quitting ingester thread\n"); 4019 secp256k1_context_destroy(ctx); 4020 return NULL; 4021 } 4022 4023 4024 static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb, 4025 struct ndb_monitor *monitor) 4026 { 4027 writer->lmdb = lmdb; 4028 writer->monitor = monitor; 4029 writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE; 4030 writer->queue_buf = malloc(writer->queue_buflen); 4031 if (writer->queue_buf == NULL) { 4032 fprintf(stderr, "ndb: failed to allocate space for writer queue"); 4033 return 0; 4034 } 4035 4036 // init the writer queue. 4037 prot_queue_init(&writer->inbox, writer->queue_buf, 4038 writer->queue_buflen, sizeof(struct ndb_writer_msg)); 4039 4040 // spin up the writer thread 4041 if (pthread_create(&writer->thread_id, NULL, ndb_writer_thread, writer)) 4042 { 4043 fprintf(stderr, "ndb writer thread failed to create\n"); 4044 return 0; 4045 } 4046 4047 return 1; 4048 } 4049 4050 // initialize the ingester queue and then spawn the thread 4051 static int ndb_ingester_init(struct ndb_ingester *ingester, 4052 struct ndb_writer *writer, 4053 const struct ndb_config *config) 4054 { 4055 int elem_size, num_elems; 4056 static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT }; 4057 4058 // TODO: configurable queue sizes 4059 elem_size = sizeof(struct ndb_ingester_msg); 4060 num_elems = DEFAULT_QUEUE_SIZE; 4061 4062 ingester->writer = writer; 4063 ingester->flags = config->flags; 4064 ingester->filter = config->ingest_filter; 4065 ingester->filter_context = config->filter_context; 4066 4067 if (!threadpool_init(&ingester->tp, config->ingester_threads, 4068 elem_size, num_elems, &quit_msg, ingester, 4069 ndb_ingester_thread)) 4070 { 4071 fprintf(stderr, "ndb ingester threadpool failed to init\n"); 4072 return 0; 4073 } 4074 4075 return 1; 4076 } 4077 4078 static int ndb_writer_destroy(struct ndb_writer *writer) 4079 { 4080 struct ndb_writer_msg msg; 4081 4082 // kill thread 4083 msg.type = NDB_WRITER_QUIT; 4084 if (!prot_queue_push(&writer->inbox, &msg)) { 4085 // queue is too full to push quit message. just kill it. 4086 pthread_exit(&writer->thread_id); 4087 } else { 4088 pthread_join(writer->thread_id, NULL); 4089 } 4090 4091 // cleanup 4092 prot_queue_destroy(&writer->inbox); 4093 4094 free(writer->queue_buf); 4095 4096 return 1; 4097 } 4098 4099 static int ndb_ingester_destroy(struct ndb_ingester *ingester) 4100 { 4101 threadpool_destroy(&ingester->tp); 4102 return 1; 4103 } 4104 4105 static int ndb_ingester_queue_event(struct ndb_ingester *ingester, 4106 char *json, unsigned len, unsigned client) 4107 { 4108 struct ndb_ingester_msg msg; 4109 msg.type = NDB_INGEST_EVENT; 4110 4111 msg.event.json = json; 4112 msg.event.len = len; 4113 msg.event.client = client; 4114 4115 return threadpool_dispatch(&ingester->tp, &msg); 4116 } 4117 4118 static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize) 4119 { 4120 int rc; 4121 MDB_txn *txn; 4122 4123 if ((rc = mdb_env_create(&lmdb->env))) { 4124 fprintf(stderr, "mdb_env_create failed, error %d\n", rc); 4125 return 0; 4126 } 4127 4128 if ((rc = mdb_env_set_mapsize(lmdb->env, mapsize))) { 4129 fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc); 4130 return 0; 4131 } 4132 4133 if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) { 4134 fprintf(stderr, "mdb_env_set_maxdbs failed, error %d\n", rc); 4135 return 0; 4136 } 4137 4138 if ((rc = mdb_env_open(lmdb->env, filename, 0, 0664))) { 4139 fprintf(stderr, "mdb_env_open failed, error %d\n", rc); 4140 return 0; 4141 } 4142 4143 // Initialize DBs 4144 if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) { 4145 fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc); 4146 return 0; 4147 } 4148 4149 // note flatbuffer db 4150 if ((rc = mdb_dbi_open(txn, "note", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NOTE]))) { 4151 fprintf(stderr, "mdb_dbi_open event failed, error %d\n", rc); 4152 return 0; 4153 } 4154 4155 // note metadata db 4156 if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) { 4157 fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc); 4158 return 0; 4159 } 4160 4161 // profile flatbuffer db 4162 if ((rc = mdb_dbi_open(txn, "profile", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_PROFILE]))) { 4163 fprintf(stderr, "mdb_dbi_open profile failed, error %d\n", rc); 4164 return 0; 4165 } 4166 4167 // profile search db 4168 if ((rc = mdb_dbi_open(txn, "profile_search", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_SEARCH]))) { 4169 fprintf(stderr, "mdb_dbi_open profile_search failed, error %d\n", rc); 4170 return 0; 4171 } 4172 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], ndb_search_key_cmp); 4173 4174 // ndb metadata (db version, etc) 4175 if ((rc = mdb_dbi_open(txn, "ndb_meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NDB_META]))) { 4176 fprintf(stderr, "mdb_dbi_open ndb_meta failed, error %d\n", rc); 4177 return 0; 4178 } 4179 4180 // profile last fetches 4181 if ((rc = mdb_dbi_open(txn, "profile_last_fetch", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH]))) { 4182 fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); 4183 return 0; 4184 } 4185 4186 // id+ts index flags 4187 unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; 4188 4189 // index dbs 4190 if ((rc = mdb_dbi_open(txn, "note_id", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_ID]))) { 4191 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 4192 return 0; 4193 } 4194 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare); 4195 4196 if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) { 4197 fprintf(stderr, "mdb_dbi_open profile_pk failed: %s\n", mdb_strerror(rc)); 4198 return 0; 4199 } 4200 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare); 4201 4202 if ((rc = mdb_dbi_open(txn, "note_kind", 4203 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 4204 &lmdb->dbs[NDB_DB_NOTE_KIND]))) { 4205 fprintf(stderr, "mdb_dbi_open note_kind failed: %s\n", mdb_strerror(rc)); 4206 return 0; 4207 } 4208 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_tsid_compare); 4209 4210 if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT, 4211 &lmdb->dbs[NDB_DB_NOTE_TEXT]))) { 4212 fprintf(stderr, "mdb_dbi_open note_text failed: %s\n", mdb_strerror(rc)); 4213 return 0; 4214 } 4215 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare); 4216 4217 if ((rc = mdb_dbi_open(txn, "note_blocks", MDB_CREATE | MDB_INTEGERKEY, 4218 &lmdb->dbs[NDB_DB_NOTE_BLOCKS]))) { 4219 fprintf(stderr, "mdb_dbi_open note_blocks failed: %s\n", mdb_strerror(rc)); 4220 return 0; 4221 } 4222 4223 if ((rc = mdb_dbi_open(txn, "note_tags", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, 4224 &lmdb->dbs[NDB_DB_NOTE_TAGS]))) { 4225 fprintf(stderr, "mdb_dbi_open note_tags failed: %s\n", mdb_strerror(rc)); 4226 return 0; 4227 } 4228 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TAGS], ndb_tag_key_compare); 4229 4230 // Commit the transaction 4231 if ((rc = mdb_txn_commit(txn))) { 4232 fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc); 4233 return 0; 4234 } 4235 4236 return 1; 4237 } 4238 4239 static int ndb_queue_write_version(struct ndb *ndb, uint64_t version) 4240 { 4241 struct ndb_writer_msg msg; 4242 msg.type = NDB_WRITER_DBMETA; 4243 msg.ndb_meta.version = version; 4244 return ndb_writer_queue_msg(&ndb->writer, &msg); 4245 } 4246 4247 static int ndb_run_migrations(struct ndb *ndb) 4248 { 4249 int64_t version, latest_version, i; 4250 4251 latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); 4252 4253 if ((version = ndb_db_version(ndb)) == -1) { 4254 ndb_debug("run_migrations: no version found, assuming new db\n"); 4255 version = latest_version; 4256 4257 // no version found. fresh db? 4258 if (!ndb_queue_write_version(ndb, version)) { 4259 fprintf(stderr, "run_migrations: failed writing db version"); 4260 return 0; 4261 } 4262 4263 return 1; 4264 } else { 4265 ndb_debug("ndb: version %" PRIu64 " found\n", version); 4266 } 4267 4268 if (version < latest_version) 4269 ndb_debug("nostrdb: migrating v%d -> v%d\n", 4270 (int)version, (int)latest_version); 4271 4272 for (i = version; i < latest_version; i++) { 4273 if (!MIGRATIONS[i].fn(ndb)) { 4274 fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); 4275 return 0; 4276 } 4277 4278 if (!ndb_queue_write_version(ndb, i+1)) { 4279 fprintf(stderr, "run_migrations: failed writing db version"); 4280 return 0; 4281 } 4282 4283 version = i+1; 4284 } 4285 4286 ndb->version = version; 4287 4288 return 1; 4289 } 4290 4291 static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb, 4292 void *sub_cb_ctx) 4293 { 4294 monitor->num_subscriptions = 0; 4295 monitor->sub_cb = cb; 4296 monitor->sub_cb_ctx = sub_cb_ctx; 4297 } 4298 4299 void ndb_filter_group_destroy(struct ndb_filter_group *group) 4300 { 4301 struct ndb_filter *filter; 4302 int i; 4303 for (i = 0; i < group->num_filters; i++) { 4304 filter = &group->filters[i]; 4305 ndb_filter_destroy(filter); 4306 } 4307 } 4308 4309 static void ndb_subscription_destroy(struct ndb_subscription *sub) 4310 { 4311 ndb_filter_group_destroy(&sub->group); 4312 prot_queue_destroy(&sub->inbox); 4313 sub->subid = 0; 4314 } 4315 4316 static void ndb_monitor_destroy(struct ndb_monitor *monitor) 4317 { 4318 int i; 4319 4320 for (i = 0; i < monitor->num_subscriptions; i++) { 4321 ndb_subscription_destroy(&monitor->subscriptions[i]); 4322 } 4323 } 4324 4325 int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config) 4326 { 4327 struct ndb *ndb; 4328 //MDB_dbi ind_id; // TODO: ind_pk, etc 4329 4330 ndb = *pndb = calloc(1, sizeof(struct ndb)); 4331 ndb->flags = config->flags; 4332 4333 if (ndb == NULL) { 4334 fprintf(stderr, "ndb_init: malloc failed\n"); 4335 return 0; 4336 } 4337 4338 if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize)) 4339 return 0; 4340 4341 ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx); 4342 4343 if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor)) { 4344 fprintf(stderr, "ndb_writer_init failed\n"); 4345 return 0; 4346 } 4347 4348 if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) { 4349 fprintf(stderr, "failed to initialize %d ingester thread(s)\n", 4350 config->ingester_threads); 4351 return 0; 4352 } 4353 4354 if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE) && 4355 !ndb_run_migrations(ndb)) { 4356 fprintf(stderr, "failed to run migrations\n"); 4357 return 0; 4358 } 4359 4360 // Initialize LMDB environment and spin up threads 4361 return 1; 4362 } 4363 4364 void ndb_destroy(struct ndb *ndb) 4365 { 4366 if (ndb == NULL) 4367 return; 4368 4369 // ingester depends on writer and must be destroyed first 4370 ndb_ingester_destroy(&ndb->ingester); 4371 ndb_writer_destroy(&ndb->writer); 4372 ndb_monitor_destroy(&ndb->monitor); 4373 4374 mdb_env_close(ndb->lmdb.env); 4375 4376 free(ndb); 4377 } 4378 4379 // Process a nostr event from a client 4380 // 4381 // ie: ["EVENT", {"content":"..."} ...] 4382 // 4383 // The client-sent variation of ndb_process_event 4384 int ndb_process_client_event(struct ndb *ndb, const char *json, int len) 4385 { 4386 // Since we need to return as soon as possible, and we're not 4387 // making any assumptions about the lifetime of the string, we 4388 // definitely need to copy the json here. In the future once we 4389 // have our thread that manages a websocket connection, we can 4390 // avoid the copy and just use the buffer we get from that 4391 // thread. 4392 char *json_copy = strdupn(json, len); 4393 if (json_copy == NULL) 4394 return 0; 4395 4396 return ndb_ingester_queue_event(&ndb->ingester, json_copy, len, 1); 4397 } 4398 4399 // Process anostr event from a relay, 4400 // 4401 // ie: ["EVENT", "subid", {"content":"..."}...] 4402 // 4403 // This function returns as soon as possible, first copying the passed 4404 // json and then queueing it up for processing. Worker threads then take 4405 // the json and process it. 4406 // 4407 // Processing: 4408 // 4409 // 1. The event is parsed into ndb_notes and the signature is validated 4410 // 2. A quick lookup is made on the database to see if we already have 4411 // the note id, if we do we don't need to waste time on json parsing 4412 // or note validation. 4413 // 3. Once validation is done we pass it to the writer queue for writing 4414 // to LMDB. 4415 // 4416 int ndb_process_event(struct ndb *ndb, const char *json, int json_len) 4417 { 4418 // Since we need to return as soon as possible, and we're not 4419 // making any assumptions about the lifetime of the string, we 4420 // definitely need to copy the json here. In the future once we 4421 // have our thread that manages a websocket connection, we can 4422 // avoid the copy and just use the buffer we get from that 4423 // thread. 4424 char *json_copy = strdupn(json, json_len); 4425 if (json_copy == NULL) 4426 return 0; 4427 4428 return ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len, 0); 4429 } 4430 4431 4432 int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client) 4433 { 4434 const char *start, *end, *very_end; 4435 start = ldjson; 4436 end = start + json_len; 4437 very_end = ldjson + json_len; 4438 int (* process)(struct ndb *, const char *, int); 4439 #if DEBUG 4440 int processed = 0; 4441 #endif 4442 process = client ? ndb_process_client_event : ndb_process_event; 4443 4444 while ((end = fast_strchr(start, '\n', very_end - start))) { 4445 //printf("processing '%.*s'\n", (int)(end-start), start); 4446 if (!process(ndb, start, end - start)) { 4447 ndb_debug("ndb_process_client_event failed\n"); 4448 return 0; 4449 } 4450 start = end + 1; 4451 #if DEBUG 4452 processed++; 4453 #endif 4454 } 4455 4456 #if DEBUG 4457 ndb_debug("ndb_process_events: processed %d events\n", processed); 4458 #endif 4459 4460 return 1; 4461 } 4462 4463 int ndb_process_events_stream(struct ndb *ndb, FILE* fp) 4464 { 4465 char *line = NULL; 4466 size_t len = 0; 4467 ssize_t nread; 4468 4469 while ((nread = getline(&line, &len, fp)) != -1) { 4470 if (line == NULL) 4471 break; 4472 ndb_process_event(ndb, line, len); 4473 } 4474 4475 if (line) 4476 free(line); 4477 4478 return 1; 4479 } 4480 4481 int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len) 4482 { 4483 return _ndb_process_events(ndb, ldjson, json_len, 1); 4484 } 4485 4486 int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len) 4487 { 4488 return _ndb_process_events(ndb, ldjson, json_len, 0); 4489 } 4490 4491 static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag) 4492 { 4493 return cursor_push_u16(cur, tag->count); 4494 } 4495 4496 int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf, 4497 size_t bufsize) 4498 { 4499 struct ndb_note *note; 4500 int half, size, str_indices_size; 4501 4502 // come on bruh 4503 if (bufsize < sizeof(struct ndb_note) * 2) 4504 return 0; 4505 4506 str_indices_size = bufsize / 32; 4507 size = bufsize - str_indices_size; 4508 half = size / 2; 4509 4510 //debug("size %d half %d str_indices %d\n", size, half, str_indices_size); 4511 4512 // make a safe cursor of our available memory 4513 make_cursor(buf, buf + bufsize, &builder->mem); 4514 4515 note = builder->note = (struct ndb_note *)buf; 4516 4517 // take slices of the memory into subcursors 4518 if (!(cursor_slice(&builder->mem, &builder->note_cur, half) && 4519 cursor_slice(&builder->mem, &builder->strings, half) && 4520 cursor_slice(&builder->mem, &builder->str_indices, str_indices_size))) { 4521 return 0; 4522 } 4523 4524 memset(note, 0, sizeof(*note)); 4525 builder->note_cur.p += sizeof(*note); 4526 4527 note->strings = builder->strings.start - buf; 4528 note->version = 1; 4529 4530 return 1; 4531 } 4532 4533 4534 4535 static inline int ndb_json_parser_init(struct ndb_json_parser *p, 4536 const char *json, int json_len, 4537 unsigned char *buf, int bufsize) 4538 { 4539 int half = bufsize / 2; 4540 4541 unsigned char *tok_start = buf + half; 4542 unsigned char *tok_end = buf + bufsize; 4543 4544 p->toks = (jsmntok_t*)tok_start; 4545 p->toks_end = (jsmntok_t*)tok_end; 4546 p->num_tokens = 0; 4547 p->json = json; 4548 p->json_len = json_len; 4549 4550 // ndb_builder gets the first half of the buffer, and jsmn gets the 4551 // second half. I like this way of alloating memory (without actually 4552 // dynamically allocating memory). You get one big chunk upfront and 4553 // then submodules can recursively subdivide it. Maybe you could do 4554 // something even more clever like golden-ratio style subdivision where 4555 // the more important stuff gets a larger chunk and then it spirals 4556 // downward into smaller chunks. Thanks for coming to my TED talk. 4557 4558 if (!ndb_builder_init(&p->builder, buf, half)) 4559 return 0; 4560 4561 jsmn_init(&p->json_parser); 4562 4563 return 1; 4564 } 4565 4566 static inline int ndb_json_parser_parse(struct ndb_json_parser *p, 4567 struct ndb_id_cb *cb) 4568 { 4569 jsmntok_t *tok; 4570 int cap = ((unsigned char *)p->toks_end - (unsigned char*)p->toks)/sizeof(*p->toks); 4571 int res = 4572 jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap, cb != NULL); 4573 4574 // got an ID! 4575 if (res == -42) { 4576 tok = &p->toks[p->json_parser.toknext-1]; 4577 4578 switch (cb->fn(cb->data, p->json + tok->start)) { 4579 case NDB_IDRES_CONT: 4580 res = jsmn_parse(&p->json_parser, p->json, p->json_len, 4581 p->toks, cap, 0); 4582 break; 4583 case NDB_IDRES_STOP: 4584 return -42; 4585 } 4586 } else if (res == 0) { 4587 return 0; 4588 } 4589 4590 p->num_tokens = res; 4591 p->i = 0; 4592 4593 return 1; 4594 } 4595 4596 static inline int toksize(jsmntok_t *tok) 4597 { 4598 return tok->end - tok->start; 4599 } 4600 4601 4602 4603 static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2) 4604 { 4605 switch (c2) { 4606 case 't': return cursor_push_byte(cur, '\t'); 4607 case 'n': return cursor_push_byte(cur, '\n'); 4608 case 'r': return cursor_push_byte(cur, '\r'); 4609 case 'b': return cursor_push_byte(cur, '\b'); 4610 case 'f': return cursor_push_byte(cur, '\f'); 4611 case '\\': return cursor_push_byte(cur, '\\'); 4612 case '/': return cursor_push_byte(cur, '/'); 4613 case '"': return cursor_push_byte(cur, '"'); 4614 case 'u': 4615 // these aren't handled yet 4616 return 0; 4617 default: 4618 return cursor_push_byte(cur, c1) && cursor_push_byte(cur, c2); 4619 } 4620 } 4621 4622 static int cursor_push_escaped_char(struct cursor *cur, char c) 4623 { 4624 switch (c) { 4625 case '"': return cursor_push_str(cur, "\\\""); 4626 case '\\': return cursor_push_str(cur, "\\\\"); 4627 case '\b': return cursor_push_str(cur, "\\b"); 4628 case '\f': return cursor_push_str(cur, "\\f"); 4629 case '\n': return cursor_push_str(cur, "\\n"); 4630 case '\r': return cursor_push_str(cur, "\\r"); 4631 case '\t': return cursor_push_str(cur, "\\t"); 4632 // TODO: \u hex hex hex hex 4633 } 4634 return cursor_push_byte(cur, c); 4635 } 4636 4637 static int cursor_push_hex_str(struct cursor *cur, unsigned char *buf, int len) 4638 { 4639 int i; 4640 4641 if (len % 2 != 0) 4642 return 0; 4643 4644 if (!cursor_push_byte(cur, '"')) 4645 return 0; 4646 4647 for (i = 0; i < len; i++) { 4648 unsigned int c = ((const unsigned char *)buf)[i]; 4649 if (!cursor_push_byte(cur, hexchar(c >> 4))) 4650 return 0; 4651 if (!cursor_push_byte(cur, hexchar(c & 0xF))) 4652 return 0; 4653 } 4654 4655 if (!cursor_push_byte(cur, '"')) 4656 return 0; 4657 4658 return 1; 4659 } 4660 4661 static int cursor_push_jsonstr(struct cursor *cur, const char *str) 4662 { 4663 int i; 4664 int len; 4665 4666 len = strlen(str); 4667 4668 if (!cursor_push_byte(cur, '"')) 4669 return 0; 4670 4671 for (i = 0; i < len; i++) { 4672 if (!cursor_push_escaped_char(cur, str[i])) 4673 return 0; 4674 } 4675 4676 if (!cursor_push_byte(cur, '"')) 4677 return 0; 4678 4679 return 1; 4680 } 4681 4682 4683 static inline int cursor_push_json_tag_str(struct cursor *cur, struct ndb_str str) 4684 { 4685 if (str.flag == NDB_PACKED_ID) 4686 return cursor_push_hex_str(cur, str.id, 32); 4687 4688 return cursor_push_jsonstr(cur, str.str); 4689 } 4690 4691 static int cursor_push_json_tag(struct cursor *cur, struct ndb_note *note, 4692 struct ndb_tag *tag) 4693 { 4694 int i; 4695 4696 if (!cursor_push_byte(cur, '[')) 4697 return 0; 4698 4699 for (i = 0; i < tag->count; i++) { 4700 if (!cursor_push_json_tag_str(cur, ndb_tag_str(note, tag, i))) 4701 return 0; 4702 if (i != tag->count-1 && !cursor_push_byte(cur, ',')) 4703 return 0; 4704 } 4705 4706 return cursor_push_byte(cur, ']'); 4707 } 4708 4709 static int cursor_push_json_tags(struct cursor *cur, struct ndb_note *note) 4710 { 4711 int i; 4712 struct ndb_iterator iter, *it = &iter; 4713 ndb_tags_iterate_start(note, it); 4714 4715 if (!cursor_push_byte(cur, '[')) 4716 return 0; 4717 4718 i = 0; 4719 while (ndb_tags_iterate_next(it)) { 4720 if (!cursor_push_json_tag(cur, note, it->tag)) 4721 return 0; 4722 if (i != note->tags.count-1 && !cursor_push_str(cur, ",")) 4723 return 0; 4724 i++; 4725 } 4726 4727 if (!cursor_push_byte(cur, ']')) 4728 return 0; 4729 4730 return 1; 4731 } 4732 4733 static int ndb_event_commitment(struct ndb_note *ev, unsigned char *buf, int buflen) 4734 { 4735 char timebuf[16] = {0}; 4736 char kindbuf[16] = {0}; 4737 char pubkey[65]; 4738 struct cursor cur; 4739 int ok; 4740 4741 if (!hex_encode(ev->pubkey, sizeof(ev->pubkey), pubkey)) 4742 return 0; 4743 4744 make_cursor(buf, buf + buflen, &cur); 4745 4746 // TODO: update in 2106 ... 4747 snprintf(timebuf, sizeof(timebuf), "%d", (uint32_t)ev->created_at); 4748 snprintf(kindbuf, sizeof(kindbuf), "%d", ev->kind); 4749 4750 ok = 4751 cursor_push_str(&cur, "[0,\"") && 4752 cursor_push_str(&cur, pubkey) && 4753 cursor_push_str(&cur, "\",") && 4754 cursor_push_str(&cur, timebuf) && 4755 cursor_push_str(&cur, ",") && 4756 cursor_push_str(&cur, kindbuf) && 4757 cursor_push_str(&cur, ",") && 4758 cursor_push_json_tags(&cur, ev) && 4759 cursor_push_str(&cur, ",") && 4760 cursor_push_jsonstr(&cur, ndb_note_str(ev, &ev->content).str) && 4761 cursor_push_str(&cur, "]"); 4762 4763 if (!ok) 4764 return 0; 4765 4766 return cur.p - cur.start; 4767 } 4768 4769 static int cursor_push_hex(struct cursor *c, unsigned char *bytes, int len) 4770 { 4771 int i; 4772 unsigned char chr; 4773 if (c->p + (len * 2) >= c->end) 4774 return 0; 4775 4776 for (i = 0; i < len; i++) { 4777 chr = bytes[i]; 4778 4779 *(c->p++) = hexchar(chr >> 4); 4780 *(c->p++) = hexchar(chr & 0xF); 4781 } 4782 4783 return 1; 4784 } 4785 4786 static int cursor_push_int_str(struct cursor *c, uint64_t num) 4787 { 4788 char timebuf[16] = {0}; 4789 snprintf(timebuf, sizeof(timebuf), "%" PRIu64, num); 4790 return cursor_push_str(c, timebuf); 4791 } 4792 4793 int ndb_note_json(struct ndb_note *note, char *buf, int buflen) 4794 { 4795 struct cursor cur, *c = &cur; 4796 4797 make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, &cur); 4798 4799 int ok = cursor_push_str(c, "{\"id\":\"") && 4800 cursor_push_hex(c, ndb_note_id(note), 32) && 4801 cursor_push_str(c, "\",\"pubkey\":\"") && 4802 cursor_push_hex(c, ndb_note_pubkey(note), 32) && 4803 cursor_push_str(c, "\",\"created_at\":") && 4804 cursor_push_int_str(c, ndb_note_created_at(note)) && 4805 cursor_push_str(c, ",\"kind\":") && 4806 cursor_push_int_str(c, ndb_note_kind(note)) && 4807 cursor_push_str(c, ",\"tags\":") && 4808 cursor_push_json_tags(c, note) && 4809 cursor_push_str(c, ",\"content\":") && 4810 cursor_push_jsonstr(c, ndb_note_content(note)) && 4811 cursor_push_str(c, ",\"sig\":\"") && 4812 cursor_push_hex(c, ndb_note_sig(note), 64) && 4813 cursor_push_c_str(c, "\"}"); 4814 4815 if (!ok) { 4816 return 0; 4817 } 4818 4819 return cur.p - cur.start; 4820 } 4821 4822 static int cursor_push_json_elem_array(struct cursor *cur, 4823 const struct ndb_filter *filter, 4824 struct ndb_filter_elements *elems) 4825 { 4826 int i; 4827 unsigned char *id; 4828 const char *str; 4829 uint64_t val; 4830 4831 if (!cursor_push_byte(cur, '[')) 4832 return 0; 4833 4834 for (i = 0; i < elems->count; i++) { 4835 4836 switch (elems->field.elem_type) { 4837 case NDB_ELEMENT_STRING: 4838 str = ndb_filter_get_string_element(filter, elems, i); 4839 if (!cursor_push_jsonstr(cur, str)) 4840 return 0; 4841 break; 4842 case NDB_ELEMENT_ID: 4843 id = ndb_filter_get_id_element(filter, elems, i); 4844 if (!cursor_push_hex_str(cur, id, 32)) 4845 return 0; 4846 break; 4847 case NDB_ELEMENT_INT: 4848 val = ndb_filter_get_int_element(elems, i); 4849 if (!cursor_push_int_str(cur, val)) 4850 return 0; 4851 break; 4852 case NDB_ELEMENT_UNKNOWN: 4853 ndb_debug("unknown element in cursor_push_json_elem_array"); 4854 return 0; 4855 } 4856 4857 if (i != elems->count-1) { 4858 if (!cursor_push_byte(cur, ',')) 4859 return 0; 4860 } 4861 } 4862 4863 if (!cursor_push_str(cur, "]")) 4864 return 0; 4865 4866 return 1; 4867 } 4868 4869 int ndb_filter_json(const struct ndb_filter *filter, char *buf, int buflen) 4870 { 4871 struct cursor cur, *c = &cur; 4872 struct ndb_filter_elements *elems; 4873 int i; 4874 4875 if (!filter->finalized) { 4876 ndb_debug("filter not finalized in ndb_filter_json\n"); 4877 return 0; 4878 } 4879 4880 make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, c); 4881 4882 if (!cursor_push_str(c, "{")) 4883 return 0; 4884 4885 for (i = 0; i < filter->num_elements; i++) { 4886 elems = ndb_filter_get_elements(filter, i); 4887 switch (elems->field.type) { 4888 case NDB_FILTER_IDS: 4889 if (!cursor_push_str(c, "\"ids\":")) 4890 return 0; 4891 if (!cursor_push_json_elem_array(c, filter, elems)) 4892 return 0; 4893 break; 4894 case NDB_FILTER_AUTHORS: 4895 if (!cursor_push_str(c, "\"authors\":")) 4896 return 0; 4897 if (!cursor_push_json_elem_array(c, filter, elems)) 4898 return 0; 4899 break; 4900 case NDB_FILTER_KINDS: 4901 if (!cursor_push_str(c, "\"kinds\":")) 4902 return 0; 4903 if (!cursor_push_json_elem_array(c, filter, elems)) 4904 return 0; 4905 break; 4906 case NDB_FILTER_TAGS: 4907 if (!cursor_push_str(c, "\"#")) 4908 return 0; 4909 if (!cursor_push_byte(c, elems->field.tag)) 4910 return 0; 4911 if (!cursor_push_str(c, "\":")) 4912 return 0; 4913 if (!cursor_push_json_elem_array(c, filter, elems)) 4914 return 0; 4915 break; 4916 case NDB_FILTER_SINCE: 4917 if (!cursor_push_str(c, "\"since\":")) 4918 return 0; 4919 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 4920 return 0; 4921 break; 4922 case NDB_FILTER_UNTIL: 4923 if (!cursor_push_str(c, "\"until\":")) 4924 return 0; 4925 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 4926 return 0; 4927 break; 4928 case NDB_FILTER_LIMIT: 4929 if (!cursor_push_str(c, "\"limit\":")) 4930 return 0; 4931 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 4932 return 0; 4933 break; 4934 } 4935 4936 if (i != filter->num_elements-1) { 4937 if (!cursor_push_byte(c, ',')) { 4938 return 0; 4939 } 4940 } 4941 4942 } 4943 4944 if (!cursor_push_c_str(c, "}")) 4945 return 0; 4946 4947 return cur.p - cur.start; 4948 } 4949 4950 int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen) { 4951 int len; 4952 4953 if (!(len = ndb_event_commitment(note, buf, buflen))) 4954 return 0; 4955 4956 //fprintf(stderr, "%.*s\n", len, buf); 4957 4958 sha256((struct sha256*)note->id, buf, len); 4959 4960 return 1; 4961 } 4962 4963 int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32], 4964 unsigned char sig[64]) 4965 { 4966 unsigned char aux[32]; 4967 secp256k1_keypair *pair = (secp256k1_keypair*) keypair->pair; 4968 4969 if (!fill_random(aux, sizeof(aux))) 4970 return 0; 4971 4972 secp256k1_context *ctx = 4973 secp256k1_context_create(SECP256K1_CONTEXT_NONE); 4974 4975 return secp256k1_schnorrsig_sign32(ctx, sig, id, pair, aux); 4976 } 4977 4978 int ndb_create_keypair(struct ndb_keypair *kp) 4979 { 4980 secp256k1_keypair *keypair = (secp256k1_keypair*)kp->pair; 4981 secp256k1_xonly_pubkey pubkey; 4982 4983 secp256k1_context *ctx = 4984 secp256k1_context_create(SECP256K1_CONTEXT_NONE);; 4985 4986 /* Try to create a keypair with a valid context, it should only 4987 * fail if the secret key is zero or out of range. */ 4988 if (!secp256k1_keypair_create(ctx, keypair, kp->secret)) 4989 return 0; 4990 4991 if (!secp256k1_keypair_xonly_pub(ctx, &pubkey, NULL, keypair)) 4992 return 0; 4993 4994 /* Serialize the public key. Should always return 1 for a valid public key. */ 4995 return secp256k1_xonly_pubkey_serialize(ctx, kp->pubkey, &pubkey); 4996 } 4997 4998 int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair) 4999 { 5000 if (!hex_decode(secstr, strlen(secstr), keypair->secret, 32)) { 5001 fprintf(stderr, "could not hex decode secret key\n"); 5002 return 0; 5003 } 5004 5005 return ndb_create_keypair(keypair); 5006 } 5007 5008 int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note, 5009 struct ndb_keypair *keypair) 5010 { 5011 int strings_len = builder->strings.p - builder->strings.start; 5012 unsigned char *note_end = builder->note_cur.p + strings_len; 5013 int total_size = note_end - builder->note_cur.start; 5014 5015 // move the strings buffer next to the end of our ndb_note 5016 memmove(builder->note_cur.p, builder->strings.start, strings_len); 5017 5018 // set the strings location 5019 builder->note->strings = builder->note_cur.p - builder->note_cur.start; 5020 5021 // record the total size 5022 //builder->note->size = total_size; 5023 5024 *note = builder->note; 5025 5026 // generate id and sign if we're building this manually 5027 if (keypair) { 5028 // use the remaining memory for building our id buffer 5029 unsigned char *end = builder->mem.end; 5030 unsigned char *start = (unsigned char*)(*note) + total_size; 5031 5032 ndb_builder_set_pubkey(builder, keypair->pubkey); 5033 5034 if (!ndb_calculate_id(builder->note, start, end - start)) 5035 return 0; 5036 5037 if (!ndb_sign_id(keypair, (*note)->id, (*note)->sig)) 5038 return 0; 5039 } 5040 5041 // make sure we're aligned as a whole 5042 total_size = (total_size + 7) & ~7; 5043 assert((total_size % 8) == 0); 5044 return total_size; 5045 } 5046 5047 struct ndb_note * ndb_builder_note(struct ndb_builder *builder) 5048 { 5049 return builder->note; 5050 } 5051 5052 static union ndb_packed_str ndb_offset_str(uint32_t offset) 5053 { 5054 // ensure accidents like -1 don't corrupt our packed_str 5055 union ndb_packed_str str; 5056 // most significant byte is reserved for ndb_packtype 5057 str.offset = offset & 0xFFFFFF; 5058 return str; 5059 } 5060 5061 5062 /// find an existing string via str_indices. these indices only exist in the 5063 /// builder phase just for this purpose. 5064 static inline int ndb_builder_find_str(struct ndb_builder *builder, 5065 const char *str, int len, 5066 union ndb_packed_str *pstr) 5067 { 5068 // find existing matching string to avoid duplicate strings 5069 int indices = cursor_count(&builder->str_indices, sizeof(uint32_t)); 5070 for (int i = 0; i < indices; i++) { 5071 uint32_t index = ((uint32_t*)builder->str_indices.start)[i]; 5072 const char *some_str = (const char*)builder->strings.start + index; 5073 5074 if (!memcmp(some_str, str, len) && some_str[len] == '\0') { 5075 // found an existing matching str, use that index 5076 *pstr = ndb_offset_str(index); 5077 return 1; 5078 } 5079 } 5080 5081 return 0; 5082 } 5083 5084 static int ndb_builder_push_str(struct ndb_builder *builder, const char *str, 5085 int len, union ndb_packed_str *pstr) 5086 { 5087 uint32_t loc; 5088 5089 // no string found, push a new one 5090 loc = builder->strings.p - builder->strings.start; 5091 if (!(cursor_push(&builder->strings, (unsigned char*)str, len) && 5092 cursor_push_byte(&builder->strings, '\0'))) { 5093 return 0; 5094 } 5095 5096 *pstr = ndb_offset_str(loc); 5097 5098 // record in builder indices. ignore return value, if we can't cache it 5099 // then whatever 5100 cursor_push_u32(&builder->str_indices, loc); 5101 5102 return 1; 5103 } 5104 5105 static int ndb_builder_push_packed_id(struct ndb_builder *builder, 5106 unsigned char *id, 5107 union ndb_packed_str *pstr) 5108 { 5109 // Don't both find id duplicates. very rarely are they duplicated 5110 // and it slows things down quite a bit. If we really care about this 5111 // We can switch to a hash table. 5112 //if (ndb_builder_find_str(builder, (const char*)id, 32, pstr)) { 5113 // pstr->packed.flag = NDB_PACKED_ID; 5114 // return 1; 5115 //} 5116 5117 if (ndb_builder_push_str(builder, (const char*)id, 32, pstr)) { 5118 pstr->packed.flag = NDB_PACKED_ID; 5119 return 1; 5120 } 5121 5122 return 0; 5123 } 5124 5125 union ndb_packed_str ndb_chars_to_packed_str(char c1, char c2) 5126 { 5127 union ndb_packed_str str; 5128 str.packed.flag = NDB_PACKED_STR; 5129 str.packed.str[0] = c1; 5130 str.packed.str[1] = c2; 5131 str.packed.str[2] = '\0'; 5132 return str; 5133 } 5134 5135 static union ndb_packed_str ndb_char_to_packed_str(char c) 5136 { 5137 union ndb_packed_str str; 5138 str.packed.flag = NDB_PACKED_STR; 5139 str.packed.str[0] = c; 5140 str.packed.str[1] = '\0'; 5141 return str; 5142 } 5143 5144 5145 /// Check for small strings to pack 5146 static inline int ndb_builder_try_compact_str(struct ndb_builder *builder, 5147 const char *str, int len, 5148 union ndb_packed_str *pstr, 5149 int pack_ids) 5150 { 5151 unsigned char id_buf[32]; 5152 5153 if (len == 0) { 5154 *pstr = ndb_char_to_packed_str(0); 5155 return 1; 5156 } else if (len == 1) { 5157 *pstr = ndb_char_to_packed_str(str[0]); 5158 return 1; 5159 } else if (len == 2) { 5160 *pstr = ndb_chars_to_packed_str(str[0], str[1]); 5161 return 1; 5162 } else if (pack_ids && len == 64 && hex_decode(str, 64, id_buf, 32)) { 5163 return ndb_builder_push_packed_id(builder, id_buf, pstr); 5164 } 5165 5166 return 0; 5167 } 5168 5169 5170 static int ndb_builder_push_unpacked_str(struct ndb_builder *builder, 5171 const char *str, int len, 5172 union ndb_packed_str *pstr) 5173 { 5174 if (ndb_builder_find_str(builder, str, len, pstr)) 5175 return 1; 5176 5177 return ndb_builder_push_str(builder, str, len, pstr); 5178 } 5179 5180 int ndb_builder_make_str(struct ndb_builder *builder, const char *str, int len, 5181 union ndb_packed_str *pstr, int pack_ids) 5182 { 5183 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 5184 return 1; 5185 5186 return ndb_builder_push_unpacked_str(builder, str, len, pstr); 5187 } 5188 5189 int ndb_builder_set_content(struct ndb_builder *builder, const char *content, 5190 int len) 5191 { 5192 int pack_ids = 0; 5193 builder->note->content_length = len; 5194 return ndb_builder_make_str(builder, content, len, 5195 &builder->note->content, pack_ids); 5196 } 5197 5198 5199 static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len, 5200 const char *s) 5201 { 5202 if (tok->type == JSMN_STRING && (int)strlen(s) == tok_len && 5203 memcmp(json + tok->start, s, tok_len) == 0) { 5204 return 1; 5205 } 5206 return 0; 5207 } 5208 5209 static int ndb_builder_finalize_tag(struct ndb_builder *builder, 5210 union ndb_packed_str offset) 5211 { 5212 if (!cursor_push_u32(&builder->note_cur, offset.offset)) 5213 return 0; 5214 builder->current_tag->count++; 5215 return 1; 5216 } 5217 5218 /// Unescape and push json strings 5219 static int ndb_builder_make_json_str(struct ndb_builder *builder, 5220 const char *str, int len, 5221 union ndb_packed_str *pstr, 5222 int *written, int pack_ids) 5223 { 5224 // let's not care about de-duping these. we should just unescape 5225 // in-place directly into the strings table. 5226 if (written) 5227 *written = len; 5228 5229 const char *p, *end, *start; 5230 unsigned char *builder_start; 5231 5232 // always try compact strings first 5233 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 5234 return 1; 5235 5236 end = str + len; 5237 start = str; // Initialize start to the beginning of the string 5238 5239 *pstr = ndb_offset_str(builder->strings.p - builder->strings.start); 5240 builder_start = builder->strings.p; 5241 5242 for (p = str; p < end; p++) { 5243 if (*p == '\\' && p+1 < end) { 5244 // Push the chunk of unescaped characters before this escape sequence 5245 if (start < p && !cursor_push(&builder->strings, 5246 (unsigned char *)start, 5247 p - start)) { 5248 return 0; 5249 } 5250 5251 if (!cursor_push_unescaped_char(&builder->strings, *p, *(p+1))) 5252 return 0; 5253 5254 p++; // Skip the character following the backslash 5255 start = p + 1; // Update the start pointer to the next character 5256 } 5257 } 5258 5259 // Handle the last chunk after the last escape sequence (or if there are no escape sequences at all) 5260 if (start < p && !cursor_push(&builder->strings, (unsigned char *)start, 5261 p - start)) { 5262 return 0; 5263 } 5264 5265 if (written) 5266 *written = builder->strings.p - builder_start; 5267 5268 // TODO: dedupe these!? 5269 return cursor_push_byte(&builder->strings, '\0'); 5270 } 5271 5272 static int ndb_builder_push_json_tag(struct ndb_builder *builder, 5273 const char *str, int len) 5274 { 5275 union ndb_packed_str pstr; 5276 int pack_ids = 1; 5277 if (!ndb_builder_make_json_str(builder, str, len, &pstr, NULL, pack_ids)) 5278 return 0; 5279 return ndb_builder_finalize_tag(builder, pstr); 5280 } 5281 5282 // Push a json array into an ndb tag ["p", "abcd..."] -> struct ndb_tag 5283 static int ndb_builder_tag_from_json_array(struct ndb_json_parser *p, 5284 jsmntok_t *array) 5285 { 5286 jsmntok_t *str_tok; 5287 const char *str; 5288 5289 if (array->size == 0) 5290 return 0; 5291 5292 if (!ndb_builder_new_tag(&p->builder)) 5293 return 0; 5294 5295 for (int i = 0; i < array->size; i++) { 5296 str_tok = &array[i+1]; 5297 str = p->json + str_tok->start; 5298 5299 if (!ndb_builder_push_json_tag(&p->builder, str, 5300 toksize(str_tok))) { 5301 return 0; 5302 } 5303 } 5304 5305 return 1; 5306 } 5307 5308 // Push json tags into ndb data 5309 // [["t", "hashtag"], ["p", "abcde..."]] -> struct ndb_tags 5310 static inline int ndb_builder_process_json_tags(struct ndb_json_parser *p, 5311 jsmntok_t *array) 5312 { 5313 jsmntok_t *tag = array; 5314 5315 if (array->size == 0) 5316 return 1; 5317 5318 for (int i = 0; i < array->size; i++) { 5319 if (!ndb_builder_tag_from_json_array(p, &tag[i+1])) 5320 return 0; 5321 tag += tag[i+1].size; 5322 } 5323 5324 return 1; 5325 } 5326 5327 static int parse_unsigned_int(const char *start, int len, unsigned int *num) 5328 { 5329 unsigned int number = 0; 5330 const char *p = start, *end = start + len; 5331 int digits = 0; 5332 5333 while (p < end) { 5334 char c = *p; 5335 5336 if (c < '0' || c > '9') 5337 break; 5338 5339 // Check for overflow 5340 char digit = c - '0'; 5341 if (number > (UINT_MAX - digit) / 10) 5342 return 0; // Overflow detected 5343 5344 number = number * 10 + digit; 5345 5346 p++; 5347 digits++; 5348 } 5349 5350 if (digits == 0) 5351 return 0; 5352 5353 *num = number; 5354 return 1; 5355 } 5356 5357 int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce, 5358 unsigned char *buf, int bufsize, struct ndb_id_cb *cb) 5359 { 5360 jsmntok_t *tok = NULL; 5361 int tok_len, res; 5362 struct ndb_json_parser parser; 5363 5364 ndb_json_parser_init(&parser, json, len, buf, bufsize); 5365 5366 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 5367 return res; 5368 5369 if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY) 5370 return 0; 5371 5372 parser.i = 1; 5373 tok = &parser.toks[parser.i++]; 5374 tok_len = toksize(tok); 5375 if (tok->type != JSMN_STRING) 5376 return 0; 5377 5378 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 5379 fce->evtype = NDB_FCE_EVENT; 5380 struct ndb_event *ev = &fce->event; 5381 return ndb_parse_json_note(&parser, &ev->note); 5382 } 5383 5384 return 0; 5385 } 5386 5387 5388 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, 5389 unsigned char *buf, int bufsize, 5390 struct ndb_id_cb *cb) 5391 { 5392 jsmntok_t *tok = NULL; 5393 int tok_len, res; 5394 struct ndb_json_parser parser; 5395 5396 tce->subid_len = 0; 5397 tce->subid = ""; 5398 5399 ndb_json_parser_init(&parser, json, len, buf, bufsize); 5400 5401 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 5402 return res; 5403 5404 if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY) 5405 return 0; 5406 5407 parser.i = 1; 5408 tok = &parser.toks[parser.i++]; 5409 tok_len = toksize(tok); 5410 if (tok->type != JSMN_STRING) 5411 return 0; 5412 5413 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 5414 tce->evtype = NDB_TCE_EVENT; 5415 struct ndb_event *ev = &tce->event; 5416 5417 tok = &parser.toks[parser.i++]; 5418 if (tok->type != JSMN_STRING) 5419 return 0; 5420 5421 tce->subid = json + tok->start; 5422 tce->subid_len = toksize(tok); 5423 5424 return ndb_parse_json_note(&parser, &ev->note); 5425 } else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) { 5426 tce->evtype = NDB_TCE_EOSE; 5427 5428 tok = &parser.toks[parser.i++]; 5429 if (tok->type != JSMN_STRING) 5430 return 0; 5431 5432 tce->subid = json + tok->start; 5433 tce->subid_len = toksize(tok); 5434 return 1; 5435 } else if (tok_len == 2 && !memcmp("OK", json + tok->start, 2)) { 5436 if (parser.num_tokens != 5) 5437 return 0; 5438 5439 struct ndb_command_result *cr = &tce->command_result; 5440 5441 tce->evtype = NDB_TCE_OK; 5442 5443 tok = &parser.toks[parser.i++]; 5444 if (tok->type != JSMN_STRING) 5445 return 0; 5446 5447 tce->subid = json + tok->start; 5448 tce->subid_len = toksize(tok); 5449 5450 tok = &parser.toks[parser.i++]; 5451 if (tok->type != JSMN_PRIMITIVE || toksize(tok) == 0) 5452 return 0; 5453 5454 cr->ok = (json + tok->start)[0] == 't'; 5455 5456 tok = &parser.toks[parser.i++]; 5457 if (tok->type != JSMN_STRING) 5458 return 0; 5459 5460 tce->command_result.msg = json + tok->start; 5461 tce->command_result.msglen = toksize(tok); 5462 5463 return 1; 5464 } else if (tok_len == 4 && !memcmp("AUTH", json + tok->start, 4)) { 5465 tce->evtype = NDB_TCE_AUTH; 5466 5467 tok = &parser.toks[parser.i++]; 5468 if (tok->type != JSMN_STRING) 5469 return 0; 5470 5471 tce->subid = json + tok->start; 5472 tce->subid_len = toksize(tok); 5473 5474 return 1; 5475 } 5476 5477 return 0; 5478 } 5479 5480 static enum ndb_filter_fieldtype 5481 ndb_filter_parse_field(const char *tok, int len, char *tagchar) 5482 { 5483 *tagchar = 0; 5484 5485 if (len == 0) 5486 return 0; 5487 5488 if (len == 7 && !strncmp(tok, "authors", 7)) { 5489 return NDB_FILTER_AUTHORS; 5490 } else if (len == 3 && !strncmp(tok, "ids", 3)) { 5491 return NDB_FILTER_IDS; 5492 } else if (len == 5 && !strncmp(tok, "kinds", 5)) { 5493 return NDB_FILTER_KINDS; 5494 } else if (len == 2 && tok[0] == '#') { 5495 *tagchar = tok[1]; 5496 return NDB_FILTER_TAGS; 5497 } else if (len == 5 && !strncmp(tok, "since", 5)) { 5498 return NDB_FILTER_SINCE; 5499 } else if (len == 5 && !strncmp(tok, "until", 5)) { 5500 return NDB_FILTER_UNTIL; 5501 } else if (len == 5 && !strncmp(tok, "limit", 5)) { 5502 return NDB_FILTER_LIMIT; 5503 } 5504 5505 return 0; 5506 } 5507 5508 static int ndb_filter_parse_json_ids(struct ndb_json_parser *parser, 5509 struct ndb_filter *filter) 5510 { 5511 jsmntok_t *tok; 5512 const char *start; 5513 unsigned char hexbuf[32]; 5514 int tok_len, i, size; 5515 5516 tok = &parser->toks[parser->i++]; 5517 5518 if (tok->type != JSMN_ARRAY) { 5519 ndb_debug("parse_json_ids: not an array\n"); 5520 return 0; 5521 } 5522 5523 size = tok->size; 5524 5525 for (i = 0; i < size; parser->i++, i++) { 5526 tok = &parser->toks[parser->i]; 5527 start = parser->json + tok->start; 5528 tok_len = toksize(tok); 5529 5530 if (tok->type != JSMN_STRING) { 5531 ndb_debug("parse_json_ids: not a string '%d'\n", tok->type); 5532 return 0; 5533 } 5534 5535 if (tok_len != 64) { 5536 ndb_debug("parse_json_ids: not len 64: '%.*s'\n", tok_len, start); 5537 return 0; 5538 } 5539 5540 // id 5541 if (!hex_decode(start, tok_len, hexbuf, sizeof(hexbuf))) { 5542 ndb_debug("parse_json_ids: hex decode failed\n"); 5543 return 0; 5544 } 5545 5546 ndb_debug("adding id elem\n"); 5547 if (!ndb_filter_add_id_element(filter, hexbuf)) { 5548 ndb_debug("parse_json_ids: failed to add id element\n"); 5549 return 0; 5550 } 5551 } 5552 5553 parser->i--; 5554 return 1; 5555 } 5556 5557 static int ndb_filter_parse_json_elems(struct ndb_json_parser *parser, 5558 struct ndb_filter *filter) 5559 { 5560 jsmntok_t *tok; 5561 const char *start; 5562 int tok_len; 5563 unsigned char hexbuf[32]; 5564 enum ndb_generic_element_type typ; 5565 tok = NULL; 5566 int i, size; 5567 5568 tok = &parser->toks[parser->i++]; 5569 5570 if (tok->type != JSMN_ARRAY) 5571 return 0; 5572 5573 size = tok->size; 5574 5575 for (i = 0; i < size; i++, parser->i++) { 5576 tok = &parser->toks[parser->i]; 5577 start = parser->json + tok->start; 5578 tok_len = toksize(tok); 5579 5580 if (tok->type != JSMN_STRING) 5581 return 0; 5582 5583 if (i == 0) { 5584 if (tok_len == 64 && hex_decode(start, 64, hexbuf, sizeof(hexbuf))) { 5585 typ = NDB_ELEMENT_ID; 5586 if (!ndb_filter_add_id_element(filter, hexbuf)) { 5587 ndb_debug("failed to push id elem\n"); 5588 return 0; 5589 } 5590 } else { 5591 typ = NDB_ELEMENT_STRING; 5592 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 5593 return 0; 5594 } 5595 } else if (typ == NDB_ELEMENT_ID) { 5596 if (!hex_decode(start, 64, hexbuf, sizeof(hexbuf))) 5597 return 0; 5598 if (!ndb_filter_add_id_element(filter, hexbuf)) 5599 return 0; 5600 } else if (typ == NDB_ELEMENT_STRING) { 5601 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 5602 return 0; 5603 } else { 5604 // ??? 5605 return 0; 5606 } 5607 } 5608 5609 parser->i--; 5610 return 1; 5611 } 5612 5613 static int ndb_filter_parse_json_int(struct ndb_json_parser *parser, 5614 struct ndb_filter *filter) 5615 { 5616 jsmntok_t *tok; 5617 const char *start; 5618 int tok_len; 5619 unsigned int value; 5620 5621 tok = &parser->toks[parser->i]; 5622 start = parser->json + tok->start; 5623 tok_len = toksize(tok); 5624 5625 if (tok->type != JSMN_PRIMITIVE) 5626 return 0; 5627 5628 if (!parse_unsigned_int(start, tok_len, &value)) 5629 return 0; 5630 5631 if (!ndb_filter_add_int_element(filter, (uint64_t)value)) 5632 return 0; 5633 5634 ndb_debug("added int elem %d\n", value); 5635 5636 return 1; 5637 } 5638 5639 5640 static int ndb_filter_parse_json_ints(struct ndb_json_parser *parser, 5641 struct ndb_filter *filter) 5642 { 5643 jsmntok_t *tok; 5644 int size, i; 5645 5646 tok = &parser->toks[parser->i++]; 5647 5648 if (tok->type != JSMN_ARRAY) 5649 return 0; 5650 5651 size = tok->size; 5652 5653 for (i = 0; i < size; parser->i++, i++) { 5654 if (!ndb_filter_parse_json_int(parser, filter)) 5655 return 0; 5656 } 5657 5658 parser->i--; 5659 return 1; 5660 } 5661 5662 5663 static int ndb_filter_parse_json(struct ndb_json_parser *parser, 5664 struct ndb_filter *filter) 5665 { 5666 jsmntok_t *tok = NULL; 5667 const char *json = parser->json; 5668 const char *start; 5669 char tag; 5670 int tok_len; 5671 enum ndb_filter_fieldtype field; 5672 5673 if (parser->toks[parser->i++].type != JSMN_OBJECT) 5674 return 0; 5675 5676 for (; parser->i < parser->num_tokens; parser->i++) { 5677 tok = &parser->toks[parser->i++]; 5678 start = json + tok->start; 5679 tok_len = toksize(tok); 5680 5681 if (!(field = ndb_filter_parse_field(start, tok_len, &tag))) { 5682 ndb_debug("failed field '%.*s'\n", tok_len, start); 5683 continue; 5684 } 5685 5686 if (tag) { 5687 ndb_debug("starting tag field '%c'\n", tag); 5688 if (!ndb_filter_start_tag_field(filter, tag)) { 5689 ndb_debug("failed to start tag field '%c'\n", tag); 5690 return 0; 5691 } 5692 } else if (!ndb_filter_start_field(filter, field)) { 5693 ndb_debug("field already started\n"); 5694 return 0; 5695 } 5696 5697 // we parsed a top-level field 5698 switch(field) { 5699 case NDB_FILTER_AUTHORS: 5700 case NDB_FILTER_IDS: 5701 if (!ndb_filter_parse_json_ids(parser, filter)) { 5702 ndb_debug("failed to parse filter ids/authors\n"); 5703 return 0; 5704 } 5705 break; 5706 case NDB_FILTER_SINCE: 5707 case NDB_FILTER_UNTIL: 5708 case NDB_FILTER_LIMIT: 5709 if (!ndb_filter_parse_json_int(parser, filter)) { 5710 ndb_debug("failed to parse filter since/until/limit\n"); 5711 return 0; 5712 } 5713 break; 5714 case NDB_FILTER_KINDS: 5715 if (!ndb_filter_parse_json_ints(parser, filter)) { 5716 ndb_debug("failed to parse filter kinds\n"); 5717 return 0; 5718 } 5719 break; 5720 case NDB_FILTER_TAGS: 5721 if (!ndb_filter_parse_json_elems(parser, filter)) { 5722 ndb_debug("failed to parse filter tags\n"); 5723 return 0; 5724 } 5725 break; 5726 } 5727 5728 ndb_filter_end_field(filter); 5729 } 5730 5731 return ndb_filter_end(filter); 5732 } 5733 5734 int ndb_parse_json_note(struct ndb_json_parser *parser, struct ndb_note **note) 5735 { 5736 jsmntok_t *tok = NULL; 5737 unsigned char hexbuf[64]; 5738 const char *json = parser->json; 5739 const char *start; 5740 int i, tok_len, parsed; 5741 5742 parsed = 0; 5743 5744 if (parser->toks[parser->i].type != JSMN_OBJECT) 5745 return 0; 5746 5747 // TODO: build id buffer and verify at end 5748 5749 for (i = parser->i + 1; i < parser->num_tokens; i++) { 5750 tok = &parser->toks[i]; 5751 start = json + tok->start; 5752 tok_len = toksize(tok); 5753 5754 //printf("toplevel %.*s %d\n", tok_len, json + tok->start, tok->type); 5755 if (tok_len == 0 || i + 1 >= parser->num_tokens) 5756 continue; 5757 5758 if (start[0] == 'p' && jsoneq(json, tok, tok_len, "pubkey")) { 5759 // pubkey 5760 tok = &parser->toks[i+1]; 5761 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 5762 parsed |= NDB_PARSED_PUBKEY; 5763 ndb_builder_set_pubkey(&parser->builder, hexbuf); 5764 } else if (tok_len == 2 && start[0] == 'i' && start[1] == 'd') { 5765 // id 5766 tok = &parser->toks[i+1]; 5767 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 5768 parsed |= NDB_PARSED_ID; 5769 ndb_builder_set_id(&parser->builder, hexbuf); 5770 } else if (tok_len == 3 && start[0] == 's' && start[1] == 'i' && start[2] == 'g') { 5771 // sig 5772 tok = &parser->toks[i+1]; 5773 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 5774 parsed |= NDB_PARSED_SIG; 5775 ndb_builder_set_sig(&parser->builder, hexbuf); 5776 } else if (start[0] == 'k' && jsoneq(json, tok, tok_len, "kind")) { 5777 // kind 5778 tok = &parser->toks[i+1]; 5779 start = json + tok->start; 5780 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 5781 return 0; 5782 if (!parse_unsigned_int(start, toksize(tok), 5783 &parser->builder.note->kind)) 5784 return 0; 5785 parsed |= NDB_PARSED_KIND; 5786 } else if (start[0] == 'c') { 5787 if (jsoneq(json, tok, tok_len, "created_at")) { 5788 // created_at 5789 tok = &parser->toks[i+1]; 5790 start = json + tok->start; 5791 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 5792 return 0; 5793 // TODO: update to int64 in 2106 ... xD 5794 unsigned int bigi; 5795 if (!parse_unsigned_int(start, toksize(tok), &bigi)) 5796 return 0; 5797 parser->builder.note->created_at = bigi; 5798 parsed |= NDB_PARSED_CREATED_AT; 5799 } else if (jsoneq(json, tok, tok_len, "content")) { 5800 // content 5801 tok = &parser->toks[i+1]; 5802 union ndb_packed_str pstr; 5803 tok_len = toksize(tok); 5804 int written, pack_ids = 0; 5805 if (!ndb_builder_make_json_str(&parser->builder, 5806 json + tok->start, 5807 tok_len, &pstr, 5808 &written, pack_ids)) { 5809 ndb_debug("ndb_builder_make_json_str failed\n"); 5810 return 0; 5811 } 5812 parser->builder.note->content_length = written; 5813 parser->builder.note->content = pstr; 5814 parsed |= NDB_PARSED_CONTENT; 5815 } 5816 } else if (start[0] == 't' && jsoneq(json, tok, tok_len, "tags")) { 5817 tok = &parser->toks[i+1]; 5818 ndb_builder_process_json_tags(parser, tok); 5819 i += tok->size; 5820 parsed |= NDB_PARSED_TAGS; 5821 } 5822 } 5823 5824 //ndb_debug("parsed %d = %d, &->%d", parsed, NDB_PARSED_ALL, parsed & NDB_PARSED_ALL); 5825 if (parsed != NDB_PARSED_ALL) 5826 return 0; 5827 5828 return ndb_builder_finalize(&parser->builder, note, NULL); 5829 } 5830 5831 int ndb_filter_from_json(const char *json, int len, struct ndb_filter *filter, 5832 unsigned char *buf, int bufsize) 5833 { 5834 struct ndb_json_parser parser; 5835 int res; 5836 5837 if (filter->finalized) 5838 return 0; 5839 5840 ndb_json_parser_init(&parser, json, len, buf, bufsize); 5841 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 5842 return res; 5843 5844 if (parser.num_tokens < 1) 5845 return 0; 5846 5847 return ndb_filter_parse_json(&parser, filter); 5848 } 5849 5850 int ndb_note_from_json(const char *json, int len, struct ndb_note **note, 5851 unsigned char *buf, int bufsize) 5852 { 5853 struct ndb_json_parser parser; 5854 int res; 5855 5856 ndb_json_parser_init(&parser, json, len, buf, bufsize); 5857 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 5858 return res; 5859 5860 if (parser.num_tokens < 1) 5861 return 0; 5862 5863 return ndb_parse_json_note(&parser, note); 5864 } 5865 5866 void ndb_builder_set_pubkey(struct ndb_builder *builder, unsigned char *pubkey) 5867 { 5868 memcpy(builder->note->pubkey, pubkey, 32); 5869 } 5870 5871 void ndb_builder_set_id(struct ndb_builder *builder, unsigned char *id) 5872 { 5873 memcpy(builder->note->id, id, 32); 5874 } 5875 5876 void ndb_builder_set_sig(struct ndb_builder *builder, unsigned char *sig) 5877 { 5878 memcpy(builder->note->sig, sig, 64); 5879 } 5880 5881 void ndb_builder_set_kind(struct ndb_builder *builder, uint32_t kind) 5882 { 5883 builder->note->kind = kind; 5884 } 5885 5886 void ndb_builder_set_created_at(struct ndb_builder *builder, uint64_t created_at) 5887 { 5888 builder->note->created_at = created_at; 5889 } 5890 5891 int ndb_builder_new_tag(struct ndb_builder *builder) 5892 { 5893 builder->note->tags.count++; 5894 struct ndb_tag tag = {0}; 5895 builder->current_tag = (struct ndb_tag *)builder->note_cur.p; 5896 return cursor_push_tag(&builder->note_cur, &tag); 5897 } 5898 5899 void ndb_stat_counts_init(struct ndb_stat_counts *counts) 5900 { 5901 counts->count = 0; 5902 counts->key_size = 0; 5903 counts->value_size = 0; 5904 } 5905 5906 static void ndb_stat_init(struct ndb_stat *stat) 5907 { 5908 // init stats 5909 int i; 5910 5911 for (i = 0; i < NDB_CKIND_COUNT; i++) { 5912 ndb_stat_counts_init(&stat->common_kinds[i]); 5913 } 5914 5915 for (i = 0; i < NDB_DBS; i++) { 5916 ndb_stat_counts_init(&stat->dbs[i]); 5917 } 5918 5919 ndb_stat_counts_init(&stat->other_kinds); 5920 } 5921 5922 int ndb_stat(struct ndb *ndb, struct ndb_stat *stat) 5923 { 5924 int rc; 5925 MDB_cursor *cur; 5926 MDB_val k, v; 5927 MDB_dbi db; 5928 struct ndb_txn txn; 5929 struct ndb_note *note; 5930 int i; 5931 enum ndb_common_kind common_kind; 5932 5933 // initialize to 0 5934 ndb_stat_init(stat); 5935 5936 if (!ndb_begin_query(ndb, &txn)) { 5937 fprintf(stderr, "ndb_stat failed at ndb_begin_query\n"); 5938 return 0; 5939 } 5940 5941 // stat each dbi in the database 5942 for (i = 0; i < NDB_DBS; i++) 5943 { 5944 db = ndb->lmdb.dbs[i]; 5945 5946 if ((rc = mdb_cursor_open(txn.mdb_txn, db, &cur))) { 5947 fprintf(stderr, "ndb_stat: mdb_cursor_open failed, error '%s'\n", 5948 mdb_strerror(rc)); 5949 return 0; 5950 } 5951 5952 // loop over every entry and count kv sizes 5953 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 5954 // we gather more detailed per-kind stats if we're in 5955 // the notes db 5956 if (i == NDB_DB_NOTE) { 5957 note = v.mv_data; 5958 common_kind = ndb_kind_to_common_kind(note->kind); 5959 5960 // uncommon kind? just count them in bulk 5961 if ((int)common_kind == -1) { 5962 stat->other_kinds.count++; 5963 stat->other_kinds.key_size += k.mv_size; 5964 stat->other_kinds.value_size += v.mv_size; 5965 } else { 5966 stat->common_kinds[common_kind].count++; 5967 stat->common_kinds[common_kind].key_size += k.mv_size; 5968 stat->common_kinds[common_kind].value_size += v.mv_size; 5969 } 5970 } 5971 5972 stat->dbs[i].count++; 5973 stat->dbs[i].key_size += k.mv_size; 5974 stat->dbs[i].value_size += v.mv_size; 5975 } 5976 5977 // close the cursor, they are per-dbi 5978 mdb_cursor_close(cur); 5979 } 5980 5981 ndb_end_query(&txn); 5982 5983 return 1; 5984 } 5985 5986 /// Push an element to the current tag 5987 /// 5988 /// Basic idea is to call ndb_builder_new_tag 5989 inline int ndb_builder_push_tag_str(struct ndb_builder *builder, 5990 const char *str, int len) 5991 { 5992 union ndb_packed_str pstr; 5993 int pack_ids = 1; 5994 if (!ndb_builder_make_str(builder, str, len, &pstr, pack_ids)) 5995 return 0; 5996 return ndb_builder_finalize_tag(builder, pstr); 5997 } 5998 5999 // 6000 // CONFIG 6001 // 6002 void ndb_default_config(struct ndb_config *config) 6003 { 6004 int cores = get_cpu_cores(); 6005 config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB 6006 config->ingester_threads = cores == -1 ? 4 : cores; 6007 config->flags = 0; 6008 config->ingest_filter = NULL; 6009 config->filter_context = NULL; 6010 config->sub_cb_ctx = NULL; 6011 config->sub_cb = NULL; 6012 } 6013 6014 void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *context) 6015 { 6016 config->sub_cb_ctx = context; 6017 config->sub_cb = fn; 6018 } 6019 6020 void ndb_config_set_ingest_threads(struct ndb_config *config, int threads) 6021 { 6022 config->ingester_threads = threads; 6023 } 6024 6025 void ndb_config_set_flags(struct ndb_config *config, int flags) 6026 { 6027 config->flags = flags; 6028 } 6029 6030 void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize) 6031 { 6032 config->mapsize = mapsize; 6033 } 6034 6035 void ndb_config_set_ingest_filter(struct ndb_config *config, 6036 ndb_ingest_filter_fn fn, void *filter_ctx) 6037 { 6038 config->ingest_filter = fn; 6039 config->filter_context = filter_ctx; 6040 } 6041 6042 int ndb_print_tag_index(struct ndb_txn *txn) 6043 { 6044 MDB_cursor *cur; 6045 MDB_val k, v; 6046 int i; 6047 6048 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TAGS], &cur)) 6049 return 0; 6050 6051 i = 1; 6052 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6053 printf("%d ", i); 6054 print_tag_kv(txn, &k, &v); 6055 i++; 6056 } 6057 6058 return 1; 6059 } 6060 6061 int ndb_print_kind_keys(struct ndb_txn *txn) 6062 { 6063 MDB_cursor *cur; 6064 MDB_val k, v; 6065 int i; 6066 struct ndb_u64_tsid *tsid; 6067 6068 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_KIND], &cur)) 6069 return 0; 6070 6071 i = 1; 6072 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6073 tsid = k.mv_data; 6074 printf("%d note_kind %" PRIu64 " %" PRIu64 "\n", 6075 i, tsid->u64, tsid->timestamp); 6076 6077 i++; 6078 } 6079 6080 return 1; 6081 } 6082 6083 // used by ndb.c 6084 int ndb_print_search_keys(struct ndb_txn *txn) 6085 { 6086 MDB_cursor *cur; 6087 MDB_val k, v; 6088 int i; 6089 struct ndb_text_search_key search_key; 6090 6091 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TEXT], &cur)) 6092 return 0; 6093 6094 i = 1; 6095 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6096 if (!ndb_unpack_text_search_key(k.mv_data, k.mv_size, &search_key)) { 6097 fprintf(stderr, "error decoding key %d\n", i); 6098 continue; 6099 } 6100 6101 ndb_print_text_search_key(&search_key); 6102 printf("\n"); 6103 6104 i++; 6105 } 6106 6107 return 1; 6108 } 6109 6110 struct ndb_tags *ndb_note_tags(struct ndb_note *note) 6111 { 6112 return ¬e->tags; 6113 } 6114 6115 struct ndb_str ndb_note_str(struct ndb_note *note, union ndb_packed_str *pstr) 6116 { 6117 struct ndb_str str; 6118 str.flag = pstr->packed.flag; 6119 6120 if (str.flag == NDB_PACKED_STR) { 6121 str.str = pstr->packed.str; 6122 return str; 6123 } 6124 6125 str.str = ((const char *)note) + note->strings + (pstr->offset & 0xFFFFFF); 6126 return str; 6127 } 6128 6129 struct ndb_str ndb_tag_str(struct ndb_note *note, struct ndb_tag *tag, int ind) 6130 { 6131 return ndb_note_str(note, &tag->strs[ind]); 6132 } 6133 6134 int ndb_str_len(struct ndb_str *str) 6135 { 6136 if (str->flag == NDB_PACKED_ID) 6137 return 32; 6138 return strlen(str->str); 6139 } 6140 6141 struct ndb_str ndb_iter_tag_str(struct ndb_iterator *iter, int ind) 6142 { 6143 return ndb_tag_str(iter->note, iter->tag, ind); 6144 } 6145 6146 unsigned char * ndb_note_id(struct ndb_note *note) 6147 { 6148 return note->id; 6149 } 6150 6151 unsigned char * ndb_note_pubkey(struct ndb_note *note) 6152 { 6153 return note->pubkey; 6154 } 6155 6156 unsigned char * ndb_note_sig(struct ndb_note *note) 6157 { 6158 return note->sig; 6159 } 6160 6161 uint32_t ndb_note_created_at(struct ndb_note *note) 6162 { 6163 return note->created_at; 6164 } 6165 6166 uint32_t ndb_note_kind(struct ndb_note *note) 6167 { 6168 return note->kind; 6169 } 6170 6171 void _ndb_note_set_kind(struct ndb_note *note, uint32_t kind) 6172 { 6173 note->kind = kind; 6174 } 6175 6176 const char *ndb_note_content(struct ndb_note *note) 6177 { 6178 return ndb_note_str(note, ¬e->content).str; 6179 } 6180 6181 uint32_t ndb_note_content_length(struct ndb_note *note) 6182 { 6183 return note->content_length; 6184 } 6185 6186 struct ndb_note * ndb_note_from_bytes(unsigned char *bytes) 6187 { 6188 struct ndb_note *note = (struct ndb_note *)bytes; 6189 if (note->version != 1) 6190 return 0; 6191 return note; 6192 } 6193 6194 void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter) 6195 { 6196 iter->note = note; 6197 iter->tag = NULL; 6198 iter->index = -1; 6199 } 6200 6201 int ndb_tags_iterate_next(struct ndb_iterator *iter) 6202 { 6203 struct ndb_tags *tags; 6204 6205 if (iter->tag == NULL || iter->index == -1) { 6206 iter->tag = iter->note->tags.tag; 6207 iter->index = 0; 6208 return iter->note->tags.count != 0; 6209 } 6210 6211 tags = &iter->note->tags; 6212 6213 if (++iter->index < tags->count) { 6214 uint32_t tag_data_size = iter->tag->count * sizeof(iter->tag->strs[0]); 6215 iter->tag = (struct ndb_tag *)(iter->tag->strs[0].bytes + tag_data_size); 6216 return 1; 6217 } 6218 6219 return 0; 6220 } 6221 6222 uint16_t ndb_tags_count(struct ndb_tags *tags) 6223 { 6224 return tags->count; 6225 } 6226 6227 uint16_t ndb_tag_count(struct ndb_tag *tags) 6228 { 6229 return tags->count; 6230 } 6231 6232 enum ndb_common_kind ndb_kind_to_common_kind(int kind) 6233 { 6234 switch (kind) 6235 { 6236 case 0: return NDB_CKIND_PROFILE; 6237 case 1: return NDB_CKIND_TEXT; 6238 case 3: return NDB_CKIND_CONTACTS; 6239 case 4: return NDB_CKIND_DM; 6240 case 5: return NDB_CKIND_DELETE; 6241 case 6: return NDB_CKIND_REPOST; 6242 case 7: return NDB_CKIND_REACTION; 6243 case 9735: return NDB_CKIND_ZAP; 6244 case 9734: return NDB_CKIND_ZAP_REQUEST; 6245 case 23194: return NDB_CKIND_NWC_REQUEST; 6246 case 23195: return NDB_CKIND_NWC_RESPONSE; 6247 case 27235: return NDB_CKIND_HTTP_AUTH; 6248 case 30000: return NDB_CKIND_LIST; 6249 case 30023: return NDB_CKIND_LONGFORM; 6250 case 30315: return NDB_CKIND_STATUS; 6251 } 6252 6253 return -1; 6254 } 6255 6256 const char *ndb_kind_name(enum ndb_common_kind ck) 6257 { 6258 switch (ck) { 6259 case NDB_CKIND_PROFILE: return "profile"; 6260 case NDB_CKIND_TEXT: return "text"; 6261 case NDB_CKIND_CONTACTS: return "contacts"; 6262 case NDB_CKIND_DM: return "dm"; 6263 case NDB_CKIND_DELETE: return "delete"; 6264 case NDB_CKIND_REPOST: return "repost"; 6265 case NDB_CKIND_REACTION: return "reaction"; 6266 case NDB_CKIND_ZAP: return "zap"; 6267 case NDB_CKIND_ZAP_REQUEST: return "zap_request"; 6268 case NDB_CKIND_NWC_REQUEST: return "nwc_request"; 6269 case NDB_CKIND_NWC_RESPONSE: return "nwc_response"; 6270 case NDB_CKIND_HTTP_AUTH: return "http_auth"; 6271 case NDB_CKIND_LIST: return "list"; 6272 case NDB_CKIND_LONGFORM: return "longform"; 6273 case NDB_CKIND_STATUS: return "status"; 6274 case NDB_CKIND_COUNT: return "unknown"; 6275 } 6276 6277 return "unknown"; 6278 } 6279 6280 const char *ndb_db_name(enum ndb_dbs db) 6281 { 6282 switch (db) { 6283 case NDB_DB_NOTE: 6284 return "note"; 6285 case NDB_DB_META: 6286 return "note_metadata"; 6287 case NDB_DB_PROFILE: 6288 return "profile"; 6289 case NDB_DB_NOTE_ID: 6290 return "note_index"; 6291 case NDB_DB_PROFILE_PK: 6292 return "profile_pubkey_index"; 6293 case NDB_DB_NDB_META: 6294 return "nostrdb_metadata"; 6295 case NDB_DB_PROFILE_SEARCH: 6296 return "profile_search"; 6297 case NDB_DB_PROFILE_LAST_FETCH: 6298 return "profile_last_fetch"; 6299 case NDB_DB_NOTE_KIND: 6300 return "note_kind_index"; 6301 case NDB_DB_NOTE_TEXT: 6302 return "note_fulltext"; 6303 case NDB_DB_NOTE_BLOCKS: 6304 return "note_blocks"; 6305 case NDB_DB_NOTE_TAGS: 6306 return "note_tags"; 6307 case NDB_DBS: 6308 return "count"; 6309 } 6310 6311 return "unknown"; 6312 } 6313 6314 static struct ndb_blocks *ndb_note_to_blocks(struct ndb_note *note) 6315 { 6316 const char *content; 6317 size_t content_len; 6318 struct ndb_blocks *blocks; 6319 6320 content = ndb_note_content(note); 6321 content_len = ndb_note_content_length(note); 6322 6323 // something weird is going on 6324 if (content_len >= INT32_MAX) 6325 return NULL; 6326 6327 unsigned char *buffer = malloc(content_len); 6328 if (!buffer) 6329 return NULL; 6330 6331 if (!ndb_parse_content(buffer, content_len, content, content_len, &blocks)) { 6332 free(buffer); 6333 return NULL; 6334 } 6335 6336 //blocks = realloc(blocks, ndb_blocks_total_size(blocks)); 6337 //if (blocks == NULL) 6338 //return NULL; 6339 6340 blocks->flags |= NDB_BLOCK_FLAG_OWNED; 6341 6342 return blocks; 6343 } 6344 6345 struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, uint64_t note_key) 6346 { 6347 struct ndb_blocks *blocks, *blocks_to_writer; 6348 size_t blocks_size; 6349 struct ndb_note *note; 6350 size_t note_len; 6351 6352 if ((blocks = ndb_lookup_by_key(txn, note_key, NDB_DB_NOTE_BLOCKS, ¬e_len))) { 6353 return blocks; 6354 } 6355 6356 // If we don't have note blocks, let's lazily generate them. This is 6357 // migration-friendly instead of doing them all at once 6358 if (!(note = ndb_get_note_by_key(txn, note_key, ¬e_len))) { 6359 // no note found, can't return note blocks 6360 return NULL; 6361 } 6362 6363 if (!(blocks = ndb_note_to_blocks(note))) 6364 return NULL; 6365 6366 // send a copy to the writer 6367 blocks_size = ndb_blocks_total_size(blocks); 6368 blocks_to_writer = malloc(blocks_size); 6369 memcpy(blocks_to_writer, blocks, blocks_size); 6370 assert(blocks->flags & NDB_BLOCK_FLAG_OWNED); 6371 6372 // we generated new blocks, let's store them in the DB 6373 struct ndb_writer_blocks write_blocks = { 6374 .blocks = blocks_to_writer, 6375 .note_key = note_key 6376 }; 6377 6378 assert(write_blocks.blocks != blocks); 6379 6380 struct ndb_writer_msg msg = { .type = NDB_WRITER_BLOCKS }; 6381 msg.blocks = write_blocks; 6382 6383 ndb_writer_queue_msg(&ndb->writer, &msg); 6384 6385 return blocks; 6386 } 6387 6388 struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid, int *index) 6389 { 6390 struct ndb_subscription *sub, *tsub; 6391 int i; 6392 6393 for (i = 0, sub = NULL; i < ndb->monitor.num_subscriptions; i++) { 6394 tsub = &ndb->monitor.subscriptions[i]; 6395 if (tsub->subid == subid) { 6396 sub = tsub; 6397 if (index) 6398 *index = i; 6399 break; 6400 } 6401 } 6402 6403 return sub; 6404 } 6405 6406 int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, 6407 int note_id_capacity) 6408 { 6409 struct ndb_subscription *sub; 6410 6411 if (subid == 0) 6412 return 0; 6413 6414 if (!(sub = ndb_find_subscription(ndb, subid, NULL))) 6415 return 0; 6416 6417 return prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity); 6418 } 6419 6420 int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, 6421 int note_id_capacity) 6422 { 6423 struct ndb_subscription *sub; 6424 6425 // this is not a valid subscription id 6426 if (subid == 0) 6427 return 0; 6428 6429 if (!(sub = ndb_find_subscription(ndb, subid, NULL))) 6430 return 0; 6431 6432 return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity); 6433 } 6434 6435 int ndb_unsubscribe(struct ndb *ndb, uint64_t subid) 6436 { 6437 struct ndb_subscription *sub; 6438 int index, elems_to_move; 6439 6440 if (!(sub = ndb_find_subscription(ndb, subid, &index))) 6441 return 0; 6442 6443 ndb_subscription_destroy(sub); 6444 6445 elems_to_move = (--ndb->monitor.num_subscriptions) - index; 6446 6447 memmove(&ndb->monitor.subscriptions[index], 6448 &ndb->monitor.subscriptions[index+1], 6449 elems_to_move * sizeof(*sub)); 6450 6451 return 1; 6452 } 6453 6454 int ndb_num_subscriptions(struct ndb *ndb) 6455 { 6456 return ndb->monitor.num_subscriptions; 6457 } 6458 6459 uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filters) 6460 { 6461 static uint64_t subids = 0; 6462 struct ndb_subscription *sub; 6463 int index; 6464 size_t buflen; 6465 uint64_t subid; 6466 char *buf; 6467 6468 if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) { 6469 fprintf(stderr, "too many subscriptions\n"); 6470 return 0; 6471 } 6472 6473 index = ndb->monitor.num_subscriptions++; 6474 sub = &ndb->monitor.subscriptions[index]; 6475 subid = ++subids; 6476 sub->subid = subid; 6477 6478 ndb_filter_group_init(&sub->group); 6479 if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) 6480 return 0; 6481 6482 // 500k ought to be enough for anyone 6483 buflen = sizeof(uint64_t) * 65536; 6484 buf = malloc(buflen); 6485 6486 if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) { 6487 fprintf(stderr, "failed to push prot queue\n"); 6488 return 0; 6489 } 6490 6491 return subid; 6492 }