nostrdb.c (180991B)
1 2 #include "nostrdb.h" 3 #include "jsmn.h" 4 #include "hex.h" 5 #include "cursor.h" 6 #include "random.h" 7 #include "ccan/crypto/sha256/sha256.h" 8 #include "bolt11/bolt11.h" 9 #include "bolt11/amount.h" 10 #include "lmdb.h" 11 #include "util.h" 12 #include "cpu.h" 13 #include "block.h" 14 #include "threadpool.h" 15 #include "thread.h" 16 #include "protected_queue.h" 17 #include "memchr.h" 18 #include "print_util.h" 19 #include <stdlib.h> 20 #include <limits.h> 21 #include <assert.h> 22 23 #include "bindings/c/profile_json_parser.h" 24 #include "bindings/c/profile_builder.h" 25 #include "bindings/c/meta_builder.h" 26 #include "bindings/c/meta_reader.h" 27 #include "bindings/c/profile_verifier.h" 28 #include "secp256k1.h" 29 #include "secp256k1_ecdh.h" 30 #include "secp256k1_schnorrsig.h" 31 32 #define max(a,b) ((a) > (b) ? (a) : (b)) 33 #define min(a,b) ((a) < (b) ? (a) : (b)) 34 35 // the maximum number of things threads pop and push in bulk 36 #define THREAD_QUEUE_BATCH 4096 37 38 // maximum number of active subscriptions 39 #define MAX_SUBSCRIPTIONS 256 40 #define MAX_SCAN_CURSORS 12 41 #define MAX_FILTERS 16 42 43 // the maximum size of inbox queues 44 static const int DEFAULT_QUEUE_SIZE = 32768; 45 46 // increase if we need bigger filters 47 #define NDB_FILTER_PAGES 64 48 49 #define ndb_flag_set(flags, f) ((flags & f) == f) 50 51 #define NDB_PARSED_ID (1 << 0) 52 #define NDB_PARSED_PUBKEY (1 << 1) 53 #define NDB_PARSED_SIG (1 << 2) 54 #define NDB_PARSED_CREATED_AT (1 << 3) 55 #define NDB_PARSED_KIND (1 << 4) 56 #define NDB_PARSED_CONTENT (1 << 5) 57 #define NDB_PARSED_TAGS (1 << 6) 58 #define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS) 59 60 typedef int (*ndb_migrate_fn)(struct ndb_txn *); 61 typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len, 62 int word_index); 63 64 // these must be byte-aligned, they are directly accessing the serialized data 65 // representation 66 #pragma pack(push, 1) 67 68 union ndb_packed_str { 69 struct { 70 char str[3]; 71 // we assume little endian everywhere. sorry not sorry. 72 unsigned char flag; // NDB_PACKED_STR, etc 73 } packed; 74 75 uint32_t offset; 76 unsigned char bytes[4]; 77 }; 78 79 struct ndb_tag { 80 uint16_t count; 81 union ndb_packed_str strs[0]; 82 }; 83 84 struct ndb_tags { 85 uint16_t padding; 86 uint16_t count; 87 }; 88 89 // v1 90 struct ndb_note { 91 unsigned char version; // v=1 92 unsigned char padding[3]; // keep things aligned 93 unsigned char id[32]; 94 unsigned char pubkey[32]; 95 unsigned char sig[64]; 96 97 uint64_t created_at; 98 uint32_t kind; 99 uint32_t content_length; 100 union ndb_packed_str content; 101 uint32_t strings; 102 // nothing can come after tags since it contains variadic data 103 struct ndb_tags tags; 104 }; 105 106 #pragma pack(pop) 107 108 109 struct ndb_migration { 110 ndb_migrate_fn fn; 111 }; 112 113 struct ndb_profile_record_builder { 114 flatcc_builder_t *builder; 115 void *flatbuf; 116 }; 117 118 // controls whether to continue or stop the json parser 119 enum ndb_idres { 120 NDB_IDRES_CONT, 121 NDB_IDRES_STOP, 122 }; 123 124 // closure data for the id-detecting ingest controller 125 struct ndb_ingest_controller 126 { 127 MDB_txn *read_txn; 128 struct ndb_lmdb *lmdb; 129 }; 130 131 enum ndb_writer_msgtype { 132 NDB_WRITER_QUIT, // kill thread immediately 133 NDB_WRITER_NOTE, // write a note to the db 134 NDB_WRITER_PROFILE, // write a profile to the db 135 NDB_WRITER_DBMETA, // write ndb metadata 136 NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched 137 NDB_WRITER_BLOCKS, // write parsed note blocks 138 NDB_WRITER_MIGRATE, // migrate the database 139 }; 140 141 // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) 142 enum ndb_meta_key { 143 NDB_META_KEY_VERSION = 1 144 }; 145 146 struct ndb_json_parser { 147 const char *json; 148 int json_len; 149 struct ndb_builder builder; 150 jsmn_parser json_parser; 151 jsmntok_t *toks, *toks_end; 152 int i; 153 int num_tokens; 154 }; 155 156 // useful to pass to threads on its own 157 struct ndb_lmdb { 158 MDB_env *env; 159 MDB_dbi dbs[NDB_DBS]; 160 }; 161 162 struct ndb_writer { 163 struct ndb_lmdb *lmdb; 164 struct ndb_monitor *monitor; 165 166 uint32_t ndb_flags; 167 void *queue_buf; 168 int queue_buflen; 169 pthread_t thread_id; 170 171 struct prot_queue inbox; 172 }; 173 174 struct ndb_ingester { 175 struct ndb_lmdb *lmdb; 176 uint32_t flags; 177 struct threadpool tp; 178 struct prot_queue *writer_inbox; 179 void *filter_context; 180 ndb_ingest_filter_fn filter; 181 }; 182 183 struct ndb_filter_group { 184 struct ndb_filter filters[MAX_FILTERS]; 185 int num_filters; 186 }; 187 188 struct ndb_subscription { 189 uint64_t subid; 190 struct ndb_filter_group group; 191 struct prot_queue inbox; 192 }; 193 194 struct ndb_monitor { 195 struct ndb_subscription subscriptions[MAX_SUBSCRIPTIONS]; 196 ndb_sub_fn sub_cb; 197 void *sub_cb_ctx; 198 int num_subscriptions; 199 200 // monitor isn't a full inbox. We want pollers to be able to poll 201 // subscriptions efficiently without going through a message queue, so 202 // we use a simple mutex here. 203 pthread_mutex_t mutex; 204 }; 205 206 struct ndb { 207 struct ndb_lmdb lmdb; 208 struct ndb_ingester ingester; 209 struct ndb_monitor monitor; 210 struct ndb_writer writer; 211 int version; 212 uint32_t flags; // setting flags 213 // lmdb environ handles, etc 214 }; 215 216 /// 217 /// Query Plans 218 /// 219 /// There are general strategies for performing certain types of query 220 /// depending on the filter. For example, for large contact list queries 221 /// with many authors, we simply do a descending scan on created_at 222 /// instead of doing 1000s of pubkey scans. 223 /// 224 /// Query plans are calculated from filters via `ndb_filter_plan` 225 /// 226 enum ndb_query_plan { 227 NDB_PLAN_KINDS, 228 NDB_PLAN_IDS, 229 NDB_PLAN_AUTHORS, 230 NDB_PLAN_AUTHOR_KINDS, 231 NDB_PLAN_CREATED, 232 NDB_PLAN_TAGS, 233 NDB_PLAN_SEARCH, 234 }; 235 236 // A id + u64 + timestamp 237 struct ndb_id_u64_ts { 238 unsigned char id[32]; // pubkey, id, etc 239 uint64_t u64; // kind, etc 240 uint64_t timestamp; 241 }; 242 243 // A clustered key with an id and a timestamp 244 struct ndb_tsid { 245 unsigned char id[32]; 246 uint64_t timestamp; 247 }; 248 249 // A u64 + timestamp id. Just using this for kinds at the moment. 250 struct ndb_u64_ts { 251 uint64_t u64; // kind, etc 252 uint64_t timestamp; 253 }; 254 255 struct ndb_word 256 { 257 const char *word; 258 int word_len; 259 }; 260 261 struct ndb_search_words 262 { 263 struct ndb_word words[MAX_TEXT_SEARCH_WORDS]; 264 int num_words; 265 }; 266 267 268 static inline int is_replaceable_kind(uint64_t kind) 269 { 270 return kind == 0 || kind == 3 271 || (10000 <= kind && kind < 20000) 272 || (30000 <= kind && kind < 40000); 273 } 274 275 276 // ndb_text_search_key 277 // 278 // This is compressed when in lmdb: 279 // 280 // note_id: varint 281 // strlen: varint 282 // str: cstr 283 // timestamp: varint 284 // word_index: varint 285 // 286 static int ndb_make_text_search_key(unsigned char *buf, int bufsize, 287 int word_index, int word_len, const char *str, 288 uint64_t timestamp, uint64_t note_id, 289 int *keysize) 290 { 291 struct cursor cur; 292 make_cursor(buf, buf + bufsize, &cur); 293 294 // TODO: need update this to uint64_t 295 // we push this first because our query function can pull this off 296 // quickly to check matches 297 if (!cursor_push_varint(&cur, (int32_t)note_id)) 298 return 0; 299 300 // string length 301 if (!cursor_push_varint(&cur, word_len)) 302 return 0; 303 304 // non-null terminated, lowercase string 305 if (!cursor_push_lowercase(&cur, str, word_len)) 306 return 0; 307 308 // TODO: need update this to uint64_t 309 if (!cursor_push_varint(&cur, (int)timestamp)) 310 return 0; 311 312 // the index of the word in the content so that we can do more accurate 313 // phrase searches 314 if (!cursor_push_varint(&cur, word_index)) 315 return 0; 316 317 // pad to 8-byte alignment 318 if (!cursor_align(&cur, 8)) 319 return 0; 320 321 *keysize = cur.p - cur.start; 322 assert((*keysize % 8) == 0); 323 324 return 1; 325 } 326 327 static int ndb_make_noted_text_search_key(unsigned char *buf, int bufsize, 328 int wordlen, const char *word, 329 uint64_t timestamp, uint64_t note_id, 330 int *keysize) 331 { 332 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 333 timestamp, note_id, keysize); 334 } 335 336 static int ndb_make_text_search_key_low(unsigned char *buf, int bufsize, 337 int wordlen, const char *word, 338 uint64_t since, 339 int *keysize) 340 { 341 uint64_t note_id; 342 note_id = 0; 343 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 344 since, note_id, keysize); 345 } 346 347 static int ndb_make_text_search_key_high(unsigned char *buf, int bufsize, 348 int wordlen, const char *word, 349 uint64_t until, 350 int *keysize) 351 { 352 uint64_t note_id; 353 note_id = INT32_MAX; 354 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 355 until, note_id, keysize); 356 } 357 358 typedef int (*ndb_text_search_key_order_fn)(unsigned char *buf, int bufsize, int wordlen, const char *word, uint64_t timestamp, int *keysize); 359 360 /** From LMDB: Compare two items lexically */ 361 static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) { 362 int diff; 363 ssize_t len_diff; 364 unsigned int len; 365 366 len = a->mv_size; 367 len_diff = (ssize_t) a->mv_size - (ssize_t) b->mv_size; 368 if (len_diff > 0) { 369 len = b->mv_size; 370 len_diff = 1; 371 } 372 373 diff = memcmp(a->mv_data, b->mv_data, len); 374 return diff ? diff : len_diff<0 ? -1 : len_diff; 375 } 376 377 static int ndb_tag_key_compare(const MDB_val *a, const MDB_val *b) 378 { 379 MDB_val va, vb; 380 uint64_t ts_a, ts_b; 381 int cmp; 382 383 va.mv_data = a->mv_data; 384 va.mv_size = a->mv_size - 8; 385 386 vb.mv_data = b->mv_data; 387 vb.mv_size = b->mv_size - 8; 388 389 if ((cmp = mdb_cmp_memn(&va, &vb))) 390 return cmp; 391 392 ts_a = *(uint64_t*)((unsigned char *)va.mv_data + va.mv_size); 393 ts_b = *(uint64_t*)((unsigned char *)vb.mv_data + vb.mv_size); 394 395 if (ts_a < ts_b) 396 return -1; 397 else if (ts_a > ts_b) 398 return 1; 399 400 return 0; 401 } 402 403 static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b) 404 { 405 struct cursor ca, cb; 406 uint64_t sa, sb, nid_a, nid_b; 407 MDB_val a2, b2; 408 409 make_cursor(a->mv_data, (unsigned char *)a->mv_data + a->mv_size, &ca); 410 make_cursor(b->mv_data, (unsigned char *)b->mv_data + b->mv_size, &cb); 411 412 // note_id 413 if (unlikely(!cursor_pull_varint(&ca, &nid_a) || !cursor_pull_varint(&cb, &nid_b))) 414 return 0; 415 416 // string size 417 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 418 return 0; 419 420 a2.mv_data = ca.p; 421 a2.mv_size = sa; 422 423 b2.mv_data = cb.p; 424 b2.mv_size = sb; 425 426 int cmp = mdb_cmp_memn(&a2, &b2); 427 if (cmp) return cmp; 428 429 // skip over string 430 ca.p += sa; 431 cb.p += sb; 432 433 // timestamp 434 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 435 return 0; 436 437 if (sa < sb) return -1; 438 else if (sa > sb) return 1; 439 440 // note_id 441 if (nid_a < nid_b) return -1; 442 else if (nid_a > nid_b) return 1; 443 444 // word index 445 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 446 return 0; 447 448 if (sa < sb) return -1; 449 else if (sa > sb) return 1; 450 451 return 0; 452 } 453 454 static inline int ndb_unpack_text_search_key_noteid( 455 struct cursor *cur, uint64_t *note_id) 456 { 457 if (!cursor_pull_varint(cur, note_id)) 458 return 0; 459 460 return 1; 461 } 462 463 // faster peek of just the string instead of unpacking everything 464 // this is used to quickly discard range query matches if there is no 465 // common prefix 466 static inline int ndb_unpack_text_search_key_string(struct cursor *cur, 467 const char **str, 468 int *str_len) 469 { 470 uint64_t len; 471 472 if (!cursor_pull_varint(cur, &len)) 473 return 0; 474 475 *str_len = len; 476 477 *str = (const char *)cur->p; 478 479 if (!cursor_skip(cur, *str_len)) 480 return 0; 481 482 return 1; 483 } 484 485 // should be called after ndb_unpack_text_search_key_string. It continues 486 // the unpacking of a text search key if we've already started it. 487 static inline int 488 ndb_unpack_remaining_text_search_key(struct cursor *cur, 489 struct ndb_text_search_key *key) 490 { 491 if (!cursor_pull_varint(cur, &key->timestamp)) 492 return 0; 493 494 if (!cursor_pull_varint(cur, &key->word_index)) 495 return 0; 496 497 return 1; 498 } 499 500 // unpack a fulltext search key 501 // 502 // full version of string + unpack remaining. This is split up because text 503 // searching only requires to pull the string for prefix searching, and the 504 // remaining is optional 505 static inline int ndb_unpack_text_search_key(unsigned char *p, int len, 506 struct ndb_text_search_key *key) 507 { 508 struct cursor c; 509 make_cursor(p, p + len, &c); 510 511 if (!ndb_unpack_text_search_key_noteid(&c, &key->note_id)) 512 return 0; 513 514 if (!ndb_unpack_text_search_key_string(&c, &key->str, &key->str_len)) 515 return 0; 516 517 return ndb_unpack_remaining_text_search_key(&c, key); 518 } 519 520 // Copies only lowercase characters to the destination string and fills the rest with null bytes. 521 // `dst` and `src` are pointers to the destination and source strings, respectively. 522 // `n` is the maximum number of characters to copy. 523 static void lowercase_strncpy(char *dst, const char *src, int n) { 524 int j = 0, i = 0; 525 526 if (!dst || !src || n == 0) { 527 return; 528 } 529 530 while (src[i] != '\0' && j < n) { 531 dst[j++] = tolower(src[i++]); 532 } 533 534 // Null-terminate and fill the destination string 535 while (j < n) { 536 dst[j++] = '\0'; 537 } 538 } 539 540 static inline int ndb_filter_elem_is_ptr(struct ndb_filter_field *field) { 541 return field->elem_type == NDB_ELEMENT_STRING || field->elem_type == NDB_ELEMENT_ID; 542 } 543 544 // Copy the filter 545 int ndb_filter_clone(struct ndb_filter *dst, struct ndb_filter *src) 546 { 547 size_t src_size, elem_size, data_size; 548 549 memcpy(dst, src, sizeof(*src)); 550 551 elem_size = src->elem_buf.end - src->elem_buf.start; 552 data_size = src->data_buf.end - src->data_buf.start; 553 src_size = data_size + elem_size; 554 555 // let's only allow finalized filters to be cloned 556 if (!src || !src->finalized) 557 return 0; 558 559 dst->elem_buf.start = malloc(src_size); 560 dst->elem_buf.end = dst->elem_buf.start + elem_size; 561 dst->elem_buf.p = dst->elem_buf.end; 562 563 dst->data_buf.start = dst->elem_buf.start + elem_size; 564 dst->data_buf.end = dst->data_buf.start + data_size; 565 dst->data_buf.p = dst->data_buf.end; 566 567 if (dst->elem_buf.start == NULL) 568 return 0; 569 570 memcpy(dst->elem_buf.start, src->elem_buf.start, src_size); 571 572 return 1; 573 } 574 575 // "Finalize" the filter. This resizes the allocated heap buffers so that they 576 // are as small as possible. This also prevents new fields from being added 577 int ndb_filter_end(struct ndb_filter *filter) 578 { 579 #ifdef NDB_LOG 580 size_t orig_size; 581 #endif 582 size_t data_len, elem_len; 583 if (filter->finalized == 1) 584 return 0; 585 586 // move the data buffer to the end of the element buffer and update 587 // all of the element pointers accordingly 588 data_len = filter->data_buf.p - filter->data_buf.start; 589 elem_len = filter->elem_buf.p - filter->elem_buf.start; 590 #ifdef NDB_LOG 591 orig_size = filter->data_buf.end - filter->elem_buf.start; 592 #endif 593 594 // cap the elem buff 595 filter->elem_buf.end = filter->elem_buf.p; 596 597 // move the data buffer to the end of the element buffer 598 memmove(filter->elem_buf.p, filter->data_buf.start, data_len); 599 600 // realloc the whole thing 601 filter->elem_buf.start = realloc(filter->elem_buf.start, elem_len + data_len); 602 filter->elem_buf.end = filter->elem_buf.start + elem_len; 603 filter->elem_buf.p = filter->elem_buf.end; 604 605 filter->data_buf.start = filter->elem_buf.end; 606 filter->data_buf.end = filter->data_buf.start + data_len; 607 filter->data_buf.p = filter->data_buf.end; 608 609 filter->finalized = 1; 610 611 ndb_debug("ndb_filter_end: %ld -> %ld\n", orig_size, elem_len + data_len); 612 613 return 1; 614 } 615 616 static inline struct ndb_filter_elements * 617 ndb_filter_get_elements_by_offset(const struct ndb_filter *filter, int offset) 618 { 619 struct ndb_filter_elements *els; 620 621 if (offset < 0) 622 return NULL; 623 624 els = (struct ndb_filter_elements *)(filter->elem_buf.start + offset); 625 626 if ((unsigned char *)els > filter->elem_buf.p) 627 return NULL; 628 629 return els; 630 } 631 632 struct ndb_filter_elements * 633 ndb_filter_current_element(const struct ndb_filter *filter) 634 { 635 return ndb_filter_get_elements_by_offset(filter, filter->current); 636 } 637 638 struct ndb_filter_elements * 639 ndb_filter_get_elements(const struct ndb_filter *filter, int index) 640 { 641 if (filter->num_elements <= 0) 642 return NULL; 643 644 if (index > filter->num_elements-1) 645 return NULL; 646 647 return ndb_filter_get_elements_by_offset(filter, filter->elements[index]); 648 } 649 650 static inline unsigned char * 651 ndb_filter_elements_data(const struct ndb_filter *filter, int offset) 652 { 653 unsigned char *data; 654 655 if (offset < 0) 656 return NULL; 657 658 data = filter->data_buf.start + offset; 659 if (data > filter->data_buf.p) 660 return NULL; 661 662 return data; 663 } 664 665 unsigned char * 666 ndb_filter_get_id_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index) 667 { 668 return ndb_filter_elements_data(filter, els->elements[index]); 669 } 670 671 const char * 672 ndb_filter_get_string_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index) 673 { 674 return (const char *)ndb_filter_elements_data(filter, els->elements[index]); 675 } 676 677 uint64_t * 678 ndb_filter_get_int_element_ptr(struct ndb_filter_elements *els, int index) 679 { 680 return &els->elements[index]; 681 } 682 683 uint64_t 684 ndb_filter_get_int_element(const struct ndb_filter_elements *els, int index) 685 { 686 return els->elements[index]; 687 } 688 689 int ndb_filter_init_with(struct ndb_filter *filter, int pages) 690 { 691 struct cursor cur; 692 int page_size, elem_size, data_size, buf_size; 693 694 page_size = 4096; // assuming this, not a big deal if we're wrong 695 696 buf_size = page_size * pages; 697 elem_size = buf_size / 4; 698 data_size = buf_size - elem_size; 699 700 unsigned char *buf = malloc(buf_size); 701 if (!buf) 702 return 0; 703 704 // init memory arena for the cursor 705 make_cursor(buf, buf + buf_size, &cur); 706 707 cursor_slice(&cur, &filter->elem_buf, elem_size); 708 cursor_slice(&cur, &filter->data_buf, data_size); 709 710 // make sure we are fully allocated 711 assert(cur.p == cur.end); 712 713 // make sure elem_buf is the start of the buffer 714 assert(filter->elem_buf.start == cur.start); 715 716 filter->num_elements = 0; 717 filter->elements[0] = 0; 718 filter->current = -1; 719 filter->finalized = 0; 720 721 return 1; 722 } 723 724 int ndb_filter_init(struct ndb_filter *filter) { 725 return ndb_filter_init_with(filter, NDB_FILTER_PAGES); 726 } 727 728 void ndb_filter_destroy(struct ndb_filter *filter) 729 { 730 if (filter->elem_buf.start) 731 free(filter->elem_buf.start); 732 733 memset(filter, 0, sizeof(*filter)); 734 } 735 736 static const char *ndb_filter_field_name(enum ndb_filter_fieldtype field) 737 { 738 switch (field) { 739 case NDB_FILTER_IDS: return "ids"; 740 case NDB_FILTER_AUTHORS: return "authors"; 741 case NDB_FILTER_KINDS: return "kinds"; 742 case NDB_FILTER_TAGS: return "tags"; 743 case NDB_FILTER_SINCE: return "since"; 744 case NDB_FILTER_UNTIL: return "until"; 745 case NDB_FILTER_LIMIT: return "limit"; 746 case NDB_FILTER_SEARCH: return "search"; 747 } 748 749 return "unknown"; 750 } 751 752 static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filter_fieldtype field, char tag) 753 { 754 int i; 755 struct ndb_filter_elements *els, *el; 756 757 if (ndb_filter_current_element(filter)) { 758 fprintf(stderr, "ndb_filter_start_field: filter field already in progress, did you forget to call ndb_filter_end_field?\n"); 759 return 0; 760 } 761 762 // you can only start and end fields once 763 for (i = 0; i < filter->num_elements; i++) { 764 el = ndb_filter_get_elements(filter, i); 765 assert(el); 766 // TODO: fix this tags check to try to find matching tags 767 if (el->field.type == field && field != NDB_FILTER_TAGS) { 768 fprintf(stderr, "ndb_filter_start_field: field '%s' already exists\n", 769 ndb_filter_field_name(field)); 770 return 0; 771 } 772 } 773 774 filter->current = filter->elem_buf.p - filter->elem_buf.start; 775 els = ndb_filter_current_element(filter); 776 assert(els); 777 778 // advance elem buffer to the variable data section 779 if (!cursor_skip(&filter->elem_buf, sizeof(struct ndb_filter_elements))) { 780 fprintf(stderr, "ndb_filter_start_field: '%s' oom (todo: realloc?)\n", 781 ndb_filter_field_name(field)); 782 return 0; 783 } 784 785 els->field.type = field; 786 els->field.tag = tag; 787 els->field.elem_type = 0; 788 els->count = 0; 789 790 return 1; 791 } 792 793 int ndb_filter_start_field(struct ndb_filter *filter, enum ndb_filter_fieldtype field) 794 { 795 return ndb_filter_start_field_impl(filter, field, 0); 796 } 797 798 int ndb_filter_start_tag_field(struct ndb_filter *filter, char tag) 799 { 800 return ndb_filter_start_field_impl(filter, NDB_FILTER_TAGS, tag); 801 } 802 803 static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_element el) 804 { 805 struct ndb_filter_elements *current; 806 uint64_t offset; 807 808 if (!(current = ndb_filter_current_element(filter))) 809 return 0; 810 811 offset = filter->data_buf.p - filter->data_buf.start; 812 813 switch (current->field.type) { 814 case NDB_FILTER_IDS: 815 case NDB_FILTER_AUTHORS: 816 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 817 return 0; 818 break; 819 case NDB_FILTER_KINDS: 820 offset = el.integer; 821 break; 822 case NDB_FILTER_SINCE: 823 case NDB_FILTER_UNTIL: 824 case NDB_FILTER_LIMIT: 825 // only one allowed for since/until 826 if (current->count != 0) 827 return 0; 828 offset = el.integer; 829 break; 830 case NDB_FILTER_SEARCH: 831 if (current->field.elem_type != NDB_ELEMENT_STRING) { 832 return 0; 833 } 834 if (!cursor_push(&filter->data_buf, (unsigned char *)el.string.string, el.string.len)) 835 return 0; 836 if (!cursor_push_byte(&filter->data_buf, 0)) 837 return 0; 838 break; 839 case NDB_FILTER_TAGS: 840 switch (current->field.elem_type) { 841 case NDB_ELEMENT_ID: 842 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 843 return 0; 844 break; 845 case NDB_ELEMENT_STRING: 846 if (!cursor_push(&filter->data_buf, (unsigned char *)el.string.string, el.string.len)) 847 return 0; 848 if (!cursor_push_byte(&filter->data_buf, 0)) 849 return 0; 850 break; 851 case NDB_ELEMENT_INT: 852 // ints are not allowed in tag filters 853 case NDB_ELEMENT_UNKNOWN: 854 return 0; 855 } 856 // push a pointer of the string in the databuf as an element 857 break; 858 } 859 860 if (!cursor_push(&filter->elem_buf, (unsigned char *)&offset, 861 sizeof(offset))) { 862 return 0; 863 } 864 865 current->count++; 866 867 return 1; 868 } 869 870 static int ndb_filter_set_elem_type(struct ndb_filter *filter, 871 enum ndb_generic_element_type elem_type) 872 { 873 enum ndb_generic_element_type current_elem_type; 874 struct ndb_filter_elements *current; 875 876 if (!(current = ndb_filter_current_element(filter))) 877 return 0; 878 879 current_elem_type = current->field.elem_type; 880 881 // element types must be uniform 882 if (current_elem_type != elem_type && current_elem_type != NDB_ELEMENT_UNKNOWN) { 883 fprintf(stderr, "ndb_filter_set_elem_type: element types must be uniform\n"); 884 return 0; 885 } 886 887 current->field.elem_type = elem_type; 888 889 return 1; 890 } 891 892 893 int ndb_filter_add_str_element_len(struct ndb_filter *filter, const char *str, int len) 894 { 895 union ndb_filter_element el; 896 struct ndb_filter_elements *current; 897 898 if (!(current = ndb_filter_current_element(filter))) 899 return 0; 900 901 // only generic tags and search queries are allowed to have strings 902 switch (current->field.type) { 903 case NDB_FILTER_SINCE: 904 case NDB_FILTER_UNTIL: 905 case NDB_FILTER_LIMIT: 906 case NDB_FILTER_IDS: 907 case NDB_FILTER_AUTHORS: 908 case NDB_FILTER_KINDS: 909 return 0; 910 case NDB_FILTER_SEARCH: 911 if (current->count == 1) { 912 // you can't add more than one string to a search 913 return 0; 914 } 915 break; 916 case NDB_FILTER_TAGS: 917 break; 918 } 919 920 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_STRING)) 921 return 0; 922 923 el.string.string = str; 924 el.string.len = len; 925 926 return ndb_filter_add_element(filter, el); 927 } 928 929 int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str) 930 { 931 return ndb_filter_add_str_element_len(filter, str, strlen(str)); 932 } 933 934 int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer) 935 { 936 union ndb_filter_element el; 937 struct ndb_filter_elements *current; 938 if (!(current = ndb_filter_current_element(filter))) 939 return 0; 940 941 switch (current->field.type) { 942 case NDB_FILTER_IDS: 943 case NDB_FILTER_AUTHORS: 944 case NDB_FILTER_TAGS: 945 case NDB_FILTER_SEARCH: 946 return 0; 947 case NDB_FILTER_KINDS: 948 case NDB_FILTER_SINCE: 949 case NDB_FILTER_UNTIL: 950 case NDB_FILTER_LIMIT: 951 break; 952 } 953 954 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_INT)) 955 return 0; 956 957 el.integer = integer; 958 959 return ndb_filter_add_element(filter, el); 960 } 961 962 int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id) 963 { 964 union ndb_filter_element el; 965 struct ndb_filter_elements *current; 966 967 if (!(current = ndb_filter_current_element(filter))) 968 return 0; 969 970 // only certain filter types allow pushing id elements 971 switch (current->field.type) { 972 case NDB_FILTER_SINCE: 973 case NDB_FILTER_UNTIL: 974 case NDB_FILTER_LIMIT: 975 case NDB_FILTER_KINDS: 976 case NDB_FILTER_SEARCH: 977 return 0; 978 case NDB_FILTER_IDS: 979 case NDB_FILTER_AUTHORS: 980 case NDB_FILTER_TAGS: 981 break; 982 } 983 984 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_ID)) 985 return 0; 986 987 // this is needed so that generic filters know its an id 988 el.id = id; 989 990 return ndb_filter_add_element(filter, el); 991 } 992 993 static int ndb_tag_filter_matches(struct ndb_filter *filter, 994 struct ndb_filter_elements *els, 995 struct ndb_note *note) 996 { 997 int i; 998 const unsigned char *id; 999 const char *el_str; 1000 struct ndb_iterator iter, *it = &iter; 1001 struct ndb_str str; 1002 1003 ndb_tags_iterate_start(note, it); 1004 1005 while (ndb_tags_iterate_next(it)) { 1006 // we're looking for tags with 2 or more entries: ["p", id], etc 1007 if (it->tag->count < 2) 1008 continue; 1009 1010 str = ndb_tag_str(note, it->tag, 0); 1011 1012 // we only care about packed strings (single char, etc) 1013 if (str.flag != NDB_PACKED_STR) 1014 continue; 1015 1016 // do we have #e matching e (or p, etc) 1017 if (str.str[0] != els->field.tag || str.str[1] != 0) 1018 continue; 1019 1020 str = ndb_tag_str(note, it->tag, 1); 1021 1022 switch (els->field.elem_type) { 1023 case NDB_ELEMENT_ID: 1024 // if our filter element type is an id, then we 1025 // expect a packed id in the tag, otherwise skip 1026 if (str.flag != NDB_PACKED_ID) 1027 continue; 1028 break; 1029 case NDB_ELEMENT_STRING: 1030 // if our filter element type is a string, then 1031 // we should not expect an id 1032 if (str.flag == NDB_PACKED_ID) 1033 continue; 1034 1035 break; 1036 case NDB_ELEMENT_UNKNOWN: 1037 default: 1038 // For some reason the element type is not set. It's 1039 // possible nothing was added to the generic filter? 1040 // Let's just fail here and log a note for debugging 1041 fprintf(stderr, "UNUSUAL ndb_tag_filter_matches: have unknown element type %d\n", els->field.elem_type); 1042 return 0; 1043 } 1044 1045 for (i = 0; i < els->count; i++) { 1046 switch (els->field.elem_type) { 1047 case NDB_ELEMENT_ID: 1048 id = ndb_filter_get_id_element(filter, els, i); 1049 if (!memcmp(id, str.id, 32)) 1050 return 1; 1051 break; 1052 case NDB_ELEMENT_STRING: 1053 el_str = ndb_filter_get_string_element(filter, els, i); 1054 if (!strcmp(el_str, str.str)) 1055 return 1; 1056 break; 1057 case NDB_ELEMENT_INT: 1058 // int elements int tag queries are not supported 1059 case NDB_ELEMENT_UNKNOWN: 1060 return 0; 1061 } 1062 } 1063 } 1064 1065 return 0; 1066 } 1067 1068 struct search_id_state { 1069 struct ndb_filter *filter; 1070 struct ndb_filter_elements *els; 1071 unsigned char *key; 1072 }; 1073 1074 static int compare_ids(const void *pa, const void *pb) 1075 { 1076 unsigned char *a = *(unsigned char **)pa; 1077 unsigned char *b = *(unsigned char **)pb; 1078 1079 return memcmp(a, b, 32); 1080 } 1081 1082 static int search_ids(const void *ctx, const void *mid_ptr) 1083 { 1084 struct search_id_state *state; 1085 unsigned char *mid_id; 1086 uint32_t mid; 1087 1088 state = (struct search_id_state *)ctx; 1089 mid = *(uint32_t *)mid_ptr; 1090 1091 mid_id = ndb_filter_elements_data(state->filter, mid); 1092 assert(mid_id); 1093 1094 return memcmp(state->key, mid_id, 32); 1095 } 1096 1097 static int compare_kinds(const void *pa, const void *pb) 1098 { 1099 1100 // NOTE: this should match type in `union ndb_filter_element` 1101 uint64_t a = *(uint64_t *)pa; 1102 uint64_t b = *(uint64_t *)pb; 1103 1104 if (a < b) { 1105 return -1; 1106 } else if (a > b) { 1107 return 1; 1108 } else { 1109 return 0; 1110 } 1111 } 1112 1113 // 1114 // returns 1 if a filter matches a note 1115 static int ndb_filter_matches_with(struct ndb_filter *filter, 1116 struct ndb_note *note, int already_matched) 1117 { 1118 int i, j; 1119 struct ndb_filter_elements *els; 1120 struct search_id_state state; 1121 1122 state.filter = filter; 1123 1124 for (i = 0; i < filter->num_elements; i++) { 1125 els = ndb_filter_get_elements(filter, i); 1126 state.els = els; 1127 assert(els); 1128 1129 // if we know we already match from a query scan result, 1130 // we can skip this check 1131 if ((1 << els->field.type) & already_matched) 1132 continue; 1133 1134 switch (els->field.type) { 1135 case NDB_FILTER_KINDS: 1136 for (j = 0; j < els->count; j++) { 1137 if ((unsigned int)els->elements[j] == note->kind) 1138 goto cont; 1139 } 1140 break; 1141 case NDB_FILTER_IDS: 1142 state.key = ndb_note_id(note); 1143 if (bsearch(&state, &els->elements[0], els->count, 1144 sizeof(els->elements[0]), search_ids)) { 1145 continue; 1146 } 1147 break; 1148 case NDB_FILTER_AUTHORS: 1149 state.key = ndb_note_pubkey(note); 1150 if (bsearch(&state, &els->elements[0], els->count, 1151 sizeof(els->elements[0]), search_ids)) { 1152 continue; 1153 } 1154 break; 1155 case NDB_FILTER_TAGS: 1156 if (ndb_tag_filter_matches(filter, els, note)) 1157 continue; 1158 break; 1159 case NDB_FILTER_SINCE: 1160 assert(els->count == 1); 1161 if (note->created_at >= els->elements[0]) 1162 continue; 1163 break; 1164 case NDB_FILTER_UNTIL: 1165 assert(els->count == 1); 1166 if (note->created_at < els->elements[0]) 1167 continue; 1168 break; 1169 case NDB_FILTER_SEARCH: 1170 // TODO: matching search filters will need an accelerated 1171 // data structure, like our minimal perfect hashmap 1172 // idea for mutewords. 1173 // 1174 // We'll also want to store tokenized words in the filter 1175 // itself, so that we can walk over each word and check 1176 // the hashmap to see if the note contains at least 1177 // one word. 1178 // 1179 // For now we always return true, since we assume 1180 // the search index will be walked for these kinds 1181 // of queries. 1182 continue; 1183 case NDB_FILTER_LIMIT: 1184 cont: 1185 continue; 1186 } 1187 1188 // all need to match 1189 return 0; 1190 } 1191 1192 return 1; 1193 } 1194 1195 int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note) 1196 { 1197 return ndb_filter_matches_with(filter, note, 0); 1198 } 1199 1200 // because elements are stored as offsets and qsort doesn't support context, 1201 // we do a dumb thing where we convert elements to pointers and back if we 1202 // are doing an id or string sort 1203 static void sort_filter_elements(struct ndb_filter *filter, 1204 struct ndb_filter_elements *els, 1205 int (*cmp)(const void *, const void *)) 1206 { 1207 int i; 1208 1209 assert(ndb_filter_elem_is_ptr(&els->field)); 1210 1211 for (i = 0; i < els->count; i++) 1212 els->elements[i] += (uint64_t)filter->data_buf.start; 1213 1214 qsort(&els->elements[0], els->count, sizeof(els->elements[0]), cmp); 1215 1216 for (i = 0; i < els->count; i++) 1217 els->elements[i] -= (uint64_t)filter->data_buf.start; 1218 } 1219 1220 static int ndb_filter_field_eq(struct ndb_filter *a_filt, 1221 struct ndb_filter_elements *a_field, 1222 struct ndb_filter *b_filt, 1223 struct ndb_filter_elements *b_field) 1224 { 1225 int i; 1226 const char *a_str, *b_str; 1227 unsigned char *a_id, *b_id; 1228 uint64_t a_int, b_int; 1229 1230 if (a_field->count != b_field->count) 1231 return 0; 1232 1233 if (a_field->field.type != b_field->field.type) { 1234 ndb_debug("UNUSUAL: field types do not match in ndb_filter_field_eq\n"); 1235 return 0; 1236 } 1237 1238 if (a_field->field.elem_type != b_field->field.elem_type) { 1239 ndb_debug("UNUSUAL: field element types do not match in ndb_filter_field_eq\n"); 1240 return 0; 1241 } 1242 1243 if (a_field->field.elem_type == NDB_ELEMENT_UNKNOWN) { 1244 ndb_debug("UNUSUAL: field element types are unknown\n"); 1245 return 0; 1246 } 1247 1248 for (i = 0; i < a_field->count; i++) { 1249 switch (a_field->field.elem_type) { 1250 case NDB_ELEMENT_UNKNOWN: 1251 return 0; 1252 case NDB_ELEMENT_STRING: 1253 a_str = ndb_filter_get_string_element(a_filt, a_field, i); 1254 b_str = ndb_filter_get_string_element(b_filt, b_field, i); 1255 if (strcmp(a_str, b_str)) 1256 return 0; 1257 break; 1258 case NDB_ELEMENT_ID: 1259 a_id = ndb_filter_get_id_element(a_filt, a_field, i); 1260 b_id = ndb_filter_get_id_element(b_filt, b_field, i); 1261 if (memcmp(a_id, b_id, 32)) 1262 return 0; 1263 break; 1264 case NDB_ELEMENT_INT: 1265 a_int = ndb_filter_get_int_element(a_field, i); 1266 b_int = ndb_filter_get_int_element(b_field, i); 1267 if (a_int != b_int) 1268 return 0; 1269 break; 1270 } 1271 } 1272 1273 return 1; 1274 } 1275 1276 void ndb_filter_end_field(struct ndb_filter *filter) 1277 { 1278 int cur_offset; 1279 struct ndb_filter_elements *cur; 1280 1281 cur_offset = filter->current; 1282 1283 if (!(cur = ndb_filter_current_element(filter))) 1284 return; 1285 1286 filter->elements[filter->num_elements++] = cur_offset; 1287 1288 // sort elements for binary search 1289 switch (cur->field.type) { 1290 case NDB_FILTER_IDS: 1291 case NDB_FILTER_AUTHORS: 1292 sort_filter_elements(filter, cur, compare_ids); 1293 break; 1294 case NDB_FILTER_KINDS: 1295 qsort(&cur->elements[0], cur->count, 1296 sizeof(cur->elements[0]), compare_kinds); 1297 break; 1298 case NDB_FILTER_TAGS: 1299 // TODO: generic tag search sorting 1300 break; 1301 case NDB_FILTER_SINCE: 1302 case NDB_FILTER_UNTIL: 1303 case NDB_FILTER_LIMIT: 1304 case NDB_FILTER_SEARCH: 1305 // don't need to sort these 1306 break; 1307 } 1308 1309 filter->current = -1; 1310 1311 } 1312 1313 static void ndb_filter_group_init(struct ndb_filter_group *group) 1314 { 1315 group->num_filters = 0; 1316 } 1317 1318 static int ndb_filter_group_add(struct ndb_filter_group *group, 1319 struct ndb_filter *filter) 1320 { 1321 if (group->num_filters + 1 > MAX_FILTERS) 1322 return 0; 1323 1324 return ndb_filter_clone(&group->filters[group->num_filters++], filter); 1325 } 1326 1327 static int ndb_filter_group_matches(struct ndb_filter_group *group, 1328 struct ndb_note *note) 1329 { 1330 int i; 1331 struct ndb_filter *filter; 1332 1333 if (group->num_filters == 0) 1334 return 1; 1335 1336 for (i = 0; i < group->num_filters; i++) { 1337 filter = &group->filters[i]; 1338 1339 if (ndb_filter_matches(filter, note)) 1340 return 1; 1341 } 1342 1343 return 0; 1344 } 1345 1346 static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id, 1347 uint64_t timestamp, const char *search) 1348 { 1349 memcpy(key->id, id, 32); 1350 key->timestamp = timestamp; 1351 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 1352 key->search[sizeof(key->search) - 1] = '\0'; 1353 } 1354 1355 static int ndb_write_profile_search_index(struct ndb_txn *txn, 1356 struct ndb_search_key *index_key, 1357 uint64_t profile_key) 1358 { 1359 int rc; 1360 MDB_val key, val; 1361 1362 key.mv_data = index_key; 1363 key.mv_size = sizeof(*index_key); 1364 val.mv_data = &profile_key; 1365 val.mv_size = sizeof(profile_key); 1366 1367 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 1368 &key, &val, 0))) 1369 { 1370 ndb_debug("ndb_write_profile_search_index failed: %s\n", 1371 mdb_strerror(rc)); 1372 return 0; 1373 } 1374 1375 return 1; 1376 } 1377 1378 1379 // map usernames and display names to profile keys for user searching 1380 static int ndb_write_profile_search_indices(struct ndb_txn *txn, 1381 struct ndb_note *note, 1382 uint64_t profile_key, 1383 void *profile_root) 1384 { 1385 struct ndb_search_key index; 1386 NdbProfileRecord_table_t profile_record; 1387 NdbProfile_table_t profile; 1388 1389 profile_record = NdbProfileRecord_as_root(profile_root); 1390 profile = NdbProfileRecord_profile_get(profile_record); 1391 1392 const char *name = NdbProfile_name_get(profile); 1393 const char *display_name = NdbProfile_display_name_get(profile); 1394 1395 // words + pubkey + created 1396 if (name) { 1397 ndb_make_search_key(&index, note->pubkey, note->created_at, 1398 name); 1399 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 1400 return 0; 1401 } 1402 1403 if (display_name) { 1404 // don't write the same name/display_name twice 1405 if (name && !strcmp(display_name, name)) { 1406 return 1; 1407 } 1408 ndb_make_search_key(&index, note->pubkey, note->created_at, 1409 display_name); 1410 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 1411 return 0; 1412 } 1413 1414 return 1; 1415 } 1416 1417 static inline void ndb_tsid_init(struct ndb_tsid *key, unsigned char *id, 1418 uint64_t timestamp) 1419 { 1420 memcpy(key->id, id, 32); 1421 key->timestamp = timestamp; 1422 } 1423 1424 static inline void ndb_tsid_low(struct ndb_tsid *key, unsigned char *id) 1425 { 1426 memcpy(key->id, id, 32); 1427 key->timestamp = 0; 1428 } 1429 1430 static inline void ndb_u64_ts_init(struct ndb_u64_ts *key, uint64_t integer, 1431 uint64_t timestamp) 1432 { 1433 key->u64 = integer; 1434 key->timestamp = timestamp; 1435 } 1436 1437 // useful for range-searching for the latest key with a clustered created_at timen 1438 static inline void ndb_tsid_high(struct ndb_tsid *key, const unsigned char *id) 1439 { 1440 memcpy(key->id, id, 32); 1441 key->timestamp = UINT64_MAX; 1442 } 1443 1444 static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags) 1445 { 1446 txn->lmdb = &ndb->lmdb; 1447 MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn; 1448 if (!txn->lmdb->env) 1449 return 0; 1450 return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0; 1451 } 1452 1453 int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn) 1454 { 1455 return _ndb_begin_query(ndb, txn, MDB_RDONLY); 1456 } 1457 1458 // this should only be used in migrations, etc 1459 static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn) 1460 { 1461 return _ndb_begin_query(ndb, txn, 0); 1462 } 1463 1464 static int ndb_db_is_index(enum ndb_dbs index) 1465 { 1466 switch (index) { 1467 case NDB_DB_NOTE: 1468 case NDB_DB_META: 1469 case NDB_DB_PROFILE: 1470 case NDB_DB_NOTE_ID: 1471 case NDB_DB_NDB_META: 1472 case NDB_DB_PROFILE_SEARCH: 1473 case NDB_DB_PROFILE_LAST_FETCH: 1474 case NDB_DBS: 1475 return 0; 1476 case NDB_DB_PROFILE_PK: 1477 case NDB_DB_NOTE_KIND: 1478 case NDB_DB_NOTE_TEXT: 1479 case NDB_DB_NOTE_BLOCKS: 1480 case NDB_DB_NOTE_TAGS: 1481 case NDB_DB_NOTE_PUBKEY: 1482 case NDB_DB_NOTE_PUBKEY_KIND: 1483 return 1; 1484 } 1485 1486 return 0; 1487 } 1488 1489 static inline void ndb_id_u64_ts_init(struct ndb_id_u64_ts *key, 1490 unsigned char *id, uint64_t iu64, 1491 uint64_t timestamp) 1492 { 1493 memcpy(key->id, id, 32); 1494 key->u64 = iu64; 1495 key->timestamp = timestamp; 1496 } 1497 1498 static int ndb_write_note_pubkey_index(struct ndb_txn *txn, struct ndb_note *note, 1499 uint64_t note_key) 1500 { 1501 int rc; 1502 struct ndb_tsid key; 1503 MDB_val k, v; 1504 1505 ndb_tsid_init(&key, ndb_note_pubkey(note), ndb_note_created_at(note)); 1506 1507 k.mv_data = &key; 1508 k.mv_size = sizeof(key); 1509 1510 v.mv_data = ¬e_key; 1511 v.mv_size = sizeof(note_key); 1512 1513 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY], &k, &v, 0))) { 1514 fprintf(stderr, "write note pubkey index failed: %s\n", 1515 mdb_strerror(rc)); 1516 return 0; 1517 } 1518 1519 return 1; 1520 } 1521 1522 static int ndb_write_note_pubkey_kind_index(struct ndb_txn *txn, 1523 struct ndb_note *note, 1524 uint64_t note_key) 1525 { 1526 int rc; 1527 struct ndb_id_u64_ts key; 1528 MDB_val k, v; 1529 1530 ndb_id_u64_ts_init(&key, ndb_note_pubkey(note), ndb_note_kind(note), 1531 ndb_note_created_at(note)); 1532 1533 k.mv_data = &key; 1534 k.mv_size = sizeof(key); 1535 1536 v.mv_data = ¬e_key; 1537 v.mv_size = sizeof(note_key); 1538 1539 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND], &k, &v, 0))) { 1540 fprintf(stderr, "write note pubkey_kind index failed: %s\n", 1541 mdb_strerror(rc)); 1542 return 0; 1543 } 1544 1545 return 1; 1546 } 1547 1548 1549 static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, int num_indices) 1550 { 1551 MDB_val k, v; 1552 MDB_cursor *cur; 1553 int i, drop_dbi, count, rc; 1554 uint64_t note_key; 1555 struct ndb_note *note; 1556 enum ndb_dbs index; 1557 1558 // 0 means empty, not delete the dbi 1559 drop_dbi = 0; 1560 1561 // ensure they are all index dbs 1562 for (i = 0; i < num_indices; i++) { 1563 if (!ndb_db_is_index(indices[i])) { 1564 fprintf(stderr, "ndb_rebuild_note_index: %s is not an index db\n", ndb_db_name(indices[i])); 1565 return -1; 1566 } 1567 } 1568 1569 // empty the index dbs before we rebuild 1570 for (i = 0; i < num_indices; i++) { 1571 index = indices[i]; 1572 if (mdb_drop(txn->mdb_txn, index, drop_dbi)) { 1573 fprintf(stderr, "ndb_rebuild_pubkey_index: mdb_drop failed for %s\n", ndb_db_name(index)); 1574 return -1; 1575 } 1576 } 1577 1578 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE], &cur))) { 1579 fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); 1580 return -1; 1581 } 1582 1583 count = 0; 1584 1585 // loop through all notes and write search indices 1586 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1587 note = v.mv_data; 1588 note_key = *((uint64_t*)k.mv_data); 1589 1590 for (i = 0; i < num_indices; i++) { 1591 index = indices[i]; 1592 switch (index) { 1593 case NDB_DB_NOTE: 1594 case NDB_DB_META: 1595 case NDB_DB_PROFILE: 1596 case NDB_DB_NOTE_ID: 1597 case NDB_DB_NDB_META: 1598 case NDB_DB_PROFILE_SEARCH: 1599 case NDB_DB_PROFILE_LAST_FETCH: 1600 case NDB_DBS: 1601 // this should never happen since we check at 1602 // the start 1603 count = -1; 1604 goto cleanup; 1605 case NDB_DB_PROFILE_PK: 1606 case NDB_DB_NOTE_KIND: 1607 case NDB_DB_NOTE_TEXT: 1608 case NDB_DB_NOTE_BLOCKS: 1609 case NDB_DB_NOTE_TAGS: 1610 fprintf(stderr, "%s index rebuild not supported yet. sorry.\n", ndb_db_name(index)); 1611 count = -1; 1612 goto cleanup; 1613 case NDB_DB_NOTE_PUBKEY: 1614 if (!ndb_write_note_pubkey_index(txn, note, note_key)) { 1615 count = -1; 1616 goto cleanup; 1617 } 1618 break; 1619 case NDB_DB_NOTE_PUBKEY_KIND: 1620 if (!ndb_write_note_pubkey_kind_index(txn, note, note_key)) { 1621 count = -1; 1622 goto cleanup; 1623 } 1624 break; 1625 } 1626 } 1627 1628 count++; 1629 } 1630 1631 cleanup: 1632 mdb_cursor_close(cur); 1633 1634 return count; 1635 } 1636 1637 1638 // Migrations 1639 // 1640 1641 // This was before we had note_profile_pubkey{,_kind} indices. Let's create them. 1642 static int ndb_migrate_profile_indices(struct ndb_txn *txn) 1643 { 1644 int count; 1645 1646 enum ndb_dbs indices[] = {NDB_DB_NOTE_PUBKEY, NDB_DB_NOTE_PUBKEY_KIND}; 1647 if ((count = ndb_rebuild_note_indices(txn, indices, 2)) != -1) { 1648 fprintf(stderr, "migrated %d notes to have pubkey and pubkey_kind indices\n", count); 1649 return 1; 1650 } else { 1651 fprintf(stderr, "error migrating notes to have pubkey and pubkey_kind indices, aborting.\n"); 1652 return 0; 1653 } 1654 } 1655 1656 static int ndb_migrate_user_search_indices(struct ndb_txn *txn) 1657 { 1658 int rc; 1659 MDB_cursor *cur; 1660 MDB_val k, v; 1661 void *profile_root; 1662 NdbProfileRecord_table_t record; 1663 struct ndb_note *note; 1664 uint64_t note_key, profile_key; 1665 size_t len; 1666 int count; 1667 1668 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { 1669 fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); 1670 return 0; 1671 } 1672 1673 count = 0; 1674 1675 // loop through all profiles and write search indices 1676 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1677 profile_root = v.mv_data; 1678 profile_key = *((uint64_t*)k.mv_data); 1679 record = NdbProfileRecord_as_root(profile_root); 1680 note_key = NdbProfileRecord_note_key(record); 1681 note = ndb_get_note_by_key(txn, note_key, &len); 1682 1683 if (note == NULL) { 1684 continue; 1685 } 1686 1687 if (!ndb_write_profile_search_indices(txn, note, profile_key, 1688 profile_root)) { 1689 fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n"); 1690 continue; 1691 } 1692 1693 count++; 1694 } 1695 1696 fprintf(stderr, "migrated %d profiles to include search indices\n", count); 1697 1698 mdb_cursor_close(cur); 1699 1700 return 1; 1701 } 1702 1703 static int ndb_migrate_lower_user_search_indices(struct ndb_txn *txn) 1704 { 1705 // just drop the search db so we can rebuild it 1706 if (mdb_drop(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 0)) { 1707 fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n"); 1708 return 0; 1709 } 1710 1711 return ndb_migrate_user_search_indices(txn); 1712 } 1713 1714 int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile); 1715 1716 1717 int ndb_db_version(struct ndb_txn *txn) 1718 { 1719 uint64_t version, version_key; 1720 MDB_val k, v; 1721 1722 version_key = NDB_META_KEY_VERSION; 1723 k.mv_data = &version_key; 1724 k.mv_size = sizeof(version_key); 1725 1726 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &k, &v)) { 1727 version = -1; 1728 } else { 1729 if (v.mv_size != 8) { 1730 fprintf(stderr, "run_migrations: invalid version size?"); 1731 return 0; 1732 } 1733 version = *((uint64_t*)v.mv_data); 1734 } 1735 1736 return version; 1737 } 1738 1739 // custom pubkey+kind+timestamp comparison function. This is used by lmdb to 1740 // perform b+ tree searches over the pubkey+kind+timestamp index 1741 static int ndb_id_u64_ts_compare(const MDB_val *a, const MDB_val *b) 1742 { 1743 struct ndb_id_u64_ts *tsa, *tsb; 1744 MDB_val a2 = *a, b2 = *b; 1745 1746 a2.mv_size = sizeof(tsa->id); 1747 b2.mv_size = sizeof(tsb->id); 1748 1749 int cmp = mdb_cmp_memn(&a2, &b2); 1750 if (cmp) return cmp; 1751 1752 tsa = a->mv_data; 1753 tsb = b->mv_data; 1754 1755 if (tsa->u64 < tsb->u64) 1756 return -1; 1757 else if (tsa->u64 > tsb->u64) 1758 return 1; 1759 1760 if (tsa->timestamp < tsb->timestamp) 1761 return -1; 1762 else if (tsa->timestamp > tsb->timestamp) 1763 return 1; 1764 1765 return 0; 1766 } 1767 1768 // custom kind+timestamp comparison function. This is used by lmdb to perform 1769 // b+ tree searches over the kind+timestamp index 1770 static int ndb_u64_ts_compare(const MDB_val *a, const MDB_val *b) 1771 { 1772 struct ndb_u64_ts *tsa, *tsb; 1773 tsa = a->mv_data; 1774 tsb = b->mv_data; 1775 1776 if (tsa->u64 < tsb->u64) 1777 return -1; 1778 else if (tsa->u64 > tsb->u64) 1779 return 1; 1780 1781 if (tsa->timestamp < tsb->timestamp) 1782 return -1; 1783 else if (tsa->timestamp > tsb->timestamp) 1784 return 1; 1785 1786 return 0; 1787 } 1788 1789 static int ndb_tsid_compare(const MDB_val *a, const MDB_val *b) 1790 { 1791 struct ndb_tsid *tsa, *tsb; 1792 MDB_val a2 = *a, b2 = *b; 1793 1794 a2.mv_size = sizeof(tsa->id); 1795 b2.mv_size = sizeof(tsb->id); 1796 1797 int cmp = mdb_cmp_memn(&a2, &b2); 1798 if (cmp) return cmp; 1799 1800 tsa = a->mv_data; 1801 tsb = b->mv_data; 1802 1803 if (tsa->timestamp < tsb->timestamp) 1804 return -1; 1805 else if (tsa->timestamp > tsb->timestamp) 1806 return 1; 1807 return 0; 1808 } 1809 1810 enum ndb_ingester_msgtype { 1811 NDB_INGEST_EVENT, // write json to the ingester queue for processing 1812 NDB_INGEST_QUIT, // kill ingester thread immediately 1813 }; 1814 1815 struct ndb_ingester_event { 1816 char *json; 1817 unsigned client : 1; // ["EVENT", {...}] messages 1818 unsigned len : 31; 1819 }; 1820 1821 struct ndb_writer_note { 1822 struct ndb_note *note; 1823 size_t note_len; 1824 }; 1825 1826 struct ndb_writer_profile { 1827 struct ndb_writer_note note; 1828 struct ndb_profile_record_builder record; 1829 }; 1830 1831 struct ndb_ingester_msg { 1832 enum ndb_ingester_msgtype type; 1833 union { 1834 struct ndb_ingester_event event; 1835 }; 1836 }; 1837 1838 struct ndb_writer_ndb_meta { 1839 // these are 64 bit because I'm paranoid of db-wide alignment issues 1840 uint64_t version; 1841 }; 1842 1843 // Used in the writer thread when writing ndb_profile_fetch_record's 1844 // kv = pubkey: recor 1845 struct ndb_writer_last_fetch { 1846 unsigned char pubkey[32]; 1847 uint64_t fetched_at; 1848 }; 1849 1850 // write note blocks 1851 struct ndb_writer_blocks { 1852 struct ndb_blocks *blocks; 1853 uint64_t note_key; 1854 }; 1855 1856 // The different types of messages that the writer thread can write to the 1857 // database 1858 struct ndb_writer_msg { 1859 enum ndb_writer_msgtype type; 1860 union { 1861 struct ndb_writer_note note; 1862 struct ndb_writer_profile profile; 1863 struct ndb_writer_ndb_meta ndb_meta; 1864 struct ndb_writer_last_fetch last_fetch; 1865 struct ndb_writer_blocks blocks; 1866 }; 1867 }; 1868 1869 static inline int ndb_writer_queue_msg(struct ndb_writer *writer, 1870 struct ndb_writer_msg *msg) 1871 { 1872 return prot_queue_push(&writer->inbox, msg); 1873 } 1874 1875 static uint64_t ndb_write_note_and_profile(struct ndb_txn *txn, struct ndb_writer_profile *profile, unsigned char *scratch, size_t scratch_size, uint32_t ndb_flags); 1876 static int ndb_migrate_utf8_profile_names(struct ndb_txn *txn) 1877 { 1878 int rc; 1879 MDB_cursor *cur; 1880 MDB_val k, v; 1881 void *profile_root; 1882 NdbProfileRecord_table_t record; 1883 struct ndb_note *note, *copied_note; 1884 uint64_t note_key; 1885 size_t len; 1886 int count, failed, ret; 1887 struct ndb_writer_profile profile; 1888 1889 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { 1890 fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc); 1891 return 0; 1892 } 1893 1894 size_t scratch_size = 8 * 1024 * 1024; 1895 unsigned char *scratch = malloc(scratch_size); 1896 1897 ret = 1; 1898 count = 0; 1899 failed = 0; 1900 1901 // loop through all profiles and write search indices 1902 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1903 profile_root = v.mv_data; 1904 record = NdbProfileRecord_as_root(profile_root); 1905 note_key = NdbProfileRecord_note_key(record); 1906 note = ndb_get_note_by_key(txn, note_key, &len); 1907 1908 if (note == NULL) { 1909 failed++; 1910 continue; 1911 } 1912 1913 struct ndb_profile_record_builder *b = &profile.record; 1914 1915 // reprocess profile 1916 if (!ndb_process_profile_note(note, b)) { 1917 failed++; 1918 continue; 1919 } 1920 1921 // the writer needs to own this note, and its expected to free it 1922 copied_note = malloc(len); 1923 memcpy(copied_note, note, len); 1924 1925 profile.note.note = copied_note; 1926 profile.note.note_len = len; 1927 1928 // we don't pass in flags when migrating... a bit sketchy but 1929 // whatever. noone is using this to customize nostrdb atm 1930 if (ndb_write_note_and_profile(txn, &profile, scratch, scratch_size, 0)) { 1931 count++; 1932 } 1933 } 1934 1935 fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count); 1936 1937 if (failed != 0) { 1938 fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed); 1939 } 1940 1941 free(scratch); 1942 mdb_cursor_close(cur); 1943 1944 return ret; 1945 } 1946 1947 static struct ndb_migration MIGRATIONS[] = { 1948 { .fn = ndb_migrate_user_search_indices }, 1949 { .fn = ndb_migrate_lower_user_search_indices }, 1950 { .fn = ndb_migrate_utf8_profile_names }, 1951 { .fn = ndb_migrate_profile_indices }, 1952 }; 1953 1954 1955 int ndb_end_query(struct ndb_txn *txn) 1956 { 1957 // this works on read or write queries. 1958 return mdb_txn_commit(txn->mdb_txn) == 0; 1959 } 1960 1961 int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32], 1962 unsigned char sig[64]) 1963 { 1964 secp256k1_xonly_pubkey xonly_pubkey; 1965 int ok; 1966 1967 ok = secp256k1_xonly_pubkey_parse((secp256k1_context*)ctx, &xonly_pubkey, 1968 pubkey) != 0; 1969 if (!ok) return 0; 1970 1971 ok = secp256k1_schnorrsig_verify((secp256k1_context*)ctx, sig, id, 32, 1972 &xonly_pubkey) > 0; 1973 if (!ok) return 0; 1974 1975 return 1; 1976 } 1977 1978 static void ndb_writer_last_profile_fetch(struct ndb_txn *txn, 1979 const unsigned char *pubkey, 1980 uint64_t fetched_at) 1981 { 1982 int rc; 1983 MDB_val key, val; 1984 1985 key.mv_data = (unsigned char*)pubkey; 1986 key.mv_size = 32; 1987 val.mv_data = &fetched_at; 1988 val.mv_size = sizeof(fetched_at); 1989 1990 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], 1991 &key, &val, 0))) 1992 { 1993 ndb_debug("write version to ndb_meta failed: %s\n", 1994 mdb_strerror(rc)); 1995 return; 1996 } 1997 1998 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 1999 } 2000 2001 2002 // We just received a profile that we haven't processed yet, but it could 2003 // be an older one! Make sure we only write last fetched profile if it's a new 2004 // one 2005 // 2006 // To do this, we first check the latest profile in the database. If the 2007 // created_date for this profile note is newer, then we write a 2008 // last_profile_fetch record, otherwise we do not. 2009 // 2010 // WARNING: This function is only valid when called from the writer thread 2011 static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn, 2012 struct ndb_note *note) 2013 { 2014 size_t len; 2015 uint64_t profile_key, note_key; 2016 void *root; 2017 struct ndb_note *last_profile; 2018 NdbProfileRecord_table_t record; 2019 2020 if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) { 2021 record = NdbProfileRecord_as_root(root); 2022 note_key = NdbProfileRecord_note_key(record); 2023 last_profile = ndb_get_note_by_key(txn, note_key, &len); 2024 if (last_profile == NULL) { 2025 return 0; 2026 } 2027 2028 // found profile, let's see if it's newer than ours 2029 if (note->created_at > last_profile->created_at) { 2030 // this is a new profile note, record last fetched time 2031 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 2032 } 2033 } else { 2034 // couldn't fetch profile. record last fetched time 2035 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 2036 } 2037 2038 return 1; 2039 } 2040 2041 int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, 2042 uint64_t fetched_at) 2043 { 2044 struct ndb_writer_msg msg; 2045 msg.type = NDB_WRITER_PROFILE_LAST_FETCH; 2046 memcpy(&msg.last_fetch.pubkey[0], pubkey, 32); 2047 msg.last_fetch.fetched_at = fetched_at; 2048 2049 return ndb_writer_queue_msg(&ndb->writer, &msg); 2050 } 2051 2052 2053 // When doing cursor scans from greatest to lowest, this function positions the 2054 // cursor at the first element before descending. MDB_SET_RANGE puts us right 2055 // after the first element, so we have to go back one. 2056 static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v) 2057 { 2058 // Position cursor at the next key greater than or equal to the 2059 // specified key 2060 if (mdb_cursor_get(cur, k, v, MDB_SET_RANGE)) { 2061 // Failed :(. It could be the last element? 2062 if (mdb_cursor_get(cur, k, v, MDB_LAST)) 2063 return 0; 2064 } else { 2065 // if set range worked and our key exists, it should be 2066 // the one right before this one 2067 if (mdb_cursor_get(cur, k, v, MDB_PREV)) 2068 return 0; 2069 } 2070 2071 return 1; 2072 } 2073 2074 // get some value based on a clustered id key 2075 int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id, 2076 MDB_val *val) 2077 { 2078 MDB_val k, v; 2079 MDB_cursor *cur; 2080 int success = 0, rc; 2081 struct ndb_tsid tsid; 2082 2083 // position at the most recent 2084 ndb_tsid_high(&tsid, id); 2085 2086 k.mv_data = &tsid; 2087 k.mv_size = sizeof(tsid); 2088 2089 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur))) { 2090 ndb_debug("ndb_get_tsid: failed to open cursor: '%s'\n", mdb_strerror(rc)); 2091 return 0; 2092 } 2093 2094 if (!ndb_cursor_start(cur, &k, &v)) 2095 goto cleanup; 2096 2097 if (memcmp(k.mv_data, id, 32) == 0) { 2098 *val = v; 2099 success = 1; 2100 } 2101 2102 cleanup: 2103 mdb_cursor_close(cur); 2104 return success; 2105 } 2106 2107 static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key, 2108 enum ndb_dbs store, size_t *len) 2109 { 2110 MDB_val k, v; 2111 2112 k.mv_data = &key; 2113 k.mv_size = sizeof(key); 2114 2115 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 2116 ndb_debug("ndb_lookup_by_key: mdb_get note failed\n"); 2117 return NULL; 2118 } 2119 2120 if (len) 2121 *len = v.mv_size; 2122 2123 return v.mv_data; 2124 } 2125 2126 static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind, 2127 enum ndb_dbs store, const unsigned char *pk, 2128 size_t *len, uint64_t *primkey) 2129 { 2130 MDB_val k, v; 2131 void *res = NULL; 2132 if (len) 2133 *len = 0; 2134 2135 if (!ndb_get_tsid(txn, ind, pk, &k)) { 2136 //ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n"); 2137 return 0; 2138 } 2139 2140 if (primkey) 2141 *primkey = *(uint64_t*)k.mv_data; 2142 2143 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 2144 ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n"); 2145 return 0; 2146 } 2147 2148 res = v.mv_data; 2149 assert(((uint64_t)res % 4) == 0); 2150 if (len) 2151 *len = v.mv_size; 2152 return res; 2153 } 2154 2155 void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pk, size_t *len, uint64_t *key) 2156 { 2157 return ndb_lookup_tsid(txn, NDB_DB_PROFILE_PK, NDB_DB_PROFILE, pk, len, key); 2158 } 2159 2160 struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *key) 2161 { 2162 return ndb_lookup_tsid(txn, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len, key); 2163 } 2164 2165 static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn, 2166 enum ndb_dbs db, 2167 const unsigned char *id) 2168 { 2169 MDB_val k; 2170 2171 if (!ndb_get_tsid(txn, db, id, &k)) 2172 return 0; 2173 2174 return *(uint32_t*)k.mv_data; 2175 } 2176 2177 uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id) 2178 { 2179 return ndb_get_indexkey_by_id(txn, NDB_DB_NOTE_ID, id); 2180 } 2181 2182 uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id) 2183 { 2184 return ndb_get_indexkey_by_id(txn, NDB_DB_PROFILE_PK, id); 2185 } 2186 2187 struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 2188 { 2189 return ndb_lookup_by_key(txn, key, NDB_DB_NOTE, len); 2190 } 2191 2192 void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 2193 { 2194 return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len); 2195 } 2196 2197 uint64_t 2198 ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey) 2199 { 2200 MDB_val k, v; 2201 2202 k.mv_data = (unsigned char*)pubkey; 2203 k.mv_size = 32; 2204 2205 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) { 2206 //ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n"); 2207 return 0; 2208 } 2209 2210 return *((uint64_t*)v.mv_data); 2211 } 2212 2213 2214 static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id) 2215 { 2216 MDB_val val; 2217 2218 if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val)) 2219 return 0; 2220 2221 return 1; 2222 } 2223 2224 static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb, 2225 MDB_txn *mdb_txn) 2226 { 2227 txn->lmdb = lmdb; 2228 txn->mdb_txn = mdb_txn; 2229 } 2230 2231 static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid) 2232 { 2233 unsigned char id[32]; 2234 struct ndb_ingest_controller *c = data; 2235 struct ndb_txn txn; 2236 2237 hex_decode(hexid, 64, id, sizeof(id)); 2238 2239 // let's see if we already have it 2240 2241 ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn); 2242 if (!ndb_has_note(&txn, id)) 2243 return NDB_IDRES_CONT; 2244 2245 return NDB_IDRES_STOP; 2246 } 2247 2248 static int ndbprofile_parse_json(flatcc_builder_t *B, 2249 const char *buf, size_t bufsiz, int flags, NdbProfile_ref_t *profile) 2250 { 2251 flatcc_json_parser_t parser, *ctx = &parser; 2252 flatcc_json_parser_init(ctx, B, buf, buf + bufsiz, flags); 2253 2254 if (flatcc_builder_start_buffer(B, 0, 0, 0)) 2255 return 0; 2256 2257 NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile); 2258 if (ctx->error) 2259 return 0; 2260 2261 if (!flatcc_builder_end_buffer(B, *profile)) 2262 return 0; 2263 2264 ctx->end_loc = buf; 2265 2266 2267 return 1; 2268 } 2269 2270 void ndb_profile_record_builder_init(struct ndb_profile_record_builder *b) 2271 { 2272 b->builder = malloc(sizeof(*b->builder)); 2273 b->flatbuf = NULL; 2274 } 2275 2276 void ndb_profile_record_builder_free(struct ndb_profile_record_builder *b) 2277 { 2278 if (b->builder) 2279 free(b->builder); 2280 if (b->flatbuf) 2281 free(b->flatbuf); 2282 2283 b->builder = NULL; 2284 b->flatbuf = NULL; 2285 } 2286 2287 int ndb_process_profile_note(struct ndb_note *note, 2288 struct ndb_profile_record_builder *profile) 2289 { 2290 int res; 2291 2292 NdbProfile_ref_t profile_table; 2293 flatcc_builder_t *builder; 2294 2295 ndb_profile_record_builder_init(profile); 2296 builder = profile->builder; 2297 flatcc_builder_init(builder); 2298 2299 NdbProfileRecord_start_as_root(builder); 2300 2301 //printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note)); 2302 if (!(res = ndbprofile_parse_json(builder, ndb_note_content(note), 2303 note->content_length, 2304 flatcc_json_parser_f_skip_unknown, 2305 &profile_table))) 2306 { 2307 ndb_debug("profile_parse_json failed %d '%.*s'\n", res, 2308 note->content_length, ndb_note_content(note)); 2309 ndb_profile_record_builder_free(profile); 2310 return 0; 2311 } 2312 2313 uint64_t received_at = time(NULL); 2314 const char *lnurl = "fixme"; 2315 2316 NdbProfileRecord_profile_add(builder, profile_table); 2317 NdbProfileRecord_received_at_add(builder, received_at); 2318 2319 flatcc_builder_ref_t lnurl_off; 2320 lnurl_off = flatcc_builder_create_string_str(builder, lnurl); 2321 2322 NdbProfileRecord_lnurl_add(builder, lnurl_off); 2323 2324 //*profile = flatcc_builder_finalize_aligned_buffer(builder, profile_len); 2325 return 1; 2326 } 2327 2328 static int ndb_ingester_queue_event(struct ndb_ingester *ingester, 2329 char *json, unsigned len, unsigned client) 2330 { 2331 struct ndb_ingester_msg msg; 2332 msg.type = NDB_INGEST_EVENT; 2333 2334 msg.event.json = json; 2335 msg.event.len = len; 2336 msg.event.client = client; 2337 2338 return threadpool_dispatch(&ingester->tp, &msg); 2339 } 2340 2341 2342 static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json, 2343 int len, unsigned client) 2344 { 2345 // Without this, we get bus errors in the json parser inside when 2346 // trying to ingest empty kind 6 reposts... we should probably do fuzz 2347 // testing on inputs to the json parser 2348 if (len == 0) 2349 return 0; 2350 2351 // Since we need to return as soon as possible, and we're not 2352 // making any assumptions about the lifetime of the string, we 2353 // definitely need to copy the json here. In the future once we 2354 // have our thread that manages a websocket connection, we can 2355 // avoid the copy and just use the buffer we get from that 2356 // thread. 2357 char *json_copy = strdupn(json, len); 2358 if (json_copy == NULL) 2359 return 0; 2360 2361 return ndb_ingester_queue_event(ingester, json_copy, len, client); 2362 } 2363 2364 2365 static int ndb_ingester_process_note(secp256k1_context *ctx, 2366 struct ndb_note *note, 2367 size_t note_size, 2368 struct ndb_writer_msg *out, 2369 struct ndb_ingester *ingester) 2370 { 2371 enum ndb_ingest_filter_action action; 2372 action = NDB_INGEST_ACCEPT; 2373 2374 if (ingester->filter) 2375 action = ingester->filter(ingester->filter_context, note); 2376 2377 if (action == NDB_INGEST_REJECT) 2378 return 0; 2379 2380 // some special situations we might want to skip sig validation, 2381 // like during large imports 2382 if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) { 2383 // if we're skipping validation we don't need to verify 2384 } else { 2385 // verify! If it's an invalid note we don't need to 2386 // bother writing it to the database 2387 if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) { 2388 ndb_debug("signature verification failed\n"); 2389 return 0; 2390 } 2391 } 2392 2393 // we didn't find anything. let's send it 2394 // to the writer thread 2395 note = realloc(note, note_size); 2396 assert(((uint64_t)note % 4) == 0); 2397 2398 if (note->kind == 0) { 2399 struct ndb_profile_record_builder *b = 2400 &out->profile.record; 2401 2402 ndb_process_profile_note(note, b); 2403 2404 out->type = NDB_WRITER_PROFILE; 2405 out->profile.note.note = note; 2406 out->profile.note.note_len = note_size; 2407 return 1; 2408 } else if (note->kind == 6) { 2409 // process the repost if we have a repost event 2410 ndb_debug("processing kind 6 repost\n"); 2411 ndb_ingest_event(ingester, ndb_note_content(note), 2412 ndb_note_content_length(note), 0); 2413 } 2414 2415 out->type = NDB_WRITER_NOTE; 2416 out->note.note = note; 2417 out->note.note_len = note_size; 2418 2419 return 1; 2420 } 2421 2422 2423 static int ndb_ingester_process_event(secp256k1_context *ctx, 2424 struct ndb_ingester *ingester, 2425 struct ndb_ingester_event *ev, 2426 struct ndb_writer_msg *out, 2427 MDB_txn *read_txn 2428 ) 2429 { 2430 struct ndb_tce tce; 2431 struct ndb_fce fce; 2432 struct ndb_note *note; 2433 struct ndb_ingest_controller controller; 2434 struct ndb_id_cb cb; 2435 void *buf; 2436 int ok; 2437 size_t bufsize, note_size; 2438 2439 ok = 0; 2440 2441 // we will use this to check if we already have it in the DB during 2442 // ID parsing 2443 controller.read_txn = read_txn; 2444 controller.lmdb = ingester->lmdb; 2445 cb.fn = ndb_ingester_json_controller; 2446 cb.data = &controller; 2447 2448 // since we're going to be passing this allocated note to a different 2449 // thread, we can't use thread-local buffers. just allocate a block 2450 bufsize = max(ev->len * 8.0, 4096); 2451 buf = malloc(bufsize); 2452 if (!buf) { 2453 ndb_debug("couldn't malloc buf\n"); 2454 return 0; 2455 } 2456 2457 note_size = 2458 ev->client ? 2459 ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) : 2460 ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb); 2461 2462 if ((int)note_size == -42) { 2463 // we already have this! 2464 //ndb_debug("already have id??\n"); 2465 goto cleanup; 2466 } else if (note_size == 0) { 2467 ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json); 2468 goto cleanup; 2469 } 2470 2471 //ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json); 2472 2473 if (ev->client) { 2474 switch (fce.evtype) { 2475 case NDB_FCE_EVENT: 2476 note = fce.event.note; 2477 if (note != buf) { 2478 ndb_debug("note buffer not equal to malloc'd buffer\n"); 2479 goto cleanup; 2480 } 2481 2482 if (!ndb_ingester_process_note(ctx, note, note_size, 2483 out, ingester)) { 2484 ndb_debug("failed to process note\n"); 2485 goto cleanup; 2486 } else { 2487 // we're done with the original json, free it 2488 free(ev->json); 2489 return 1; 2490 } 2491 } 2492 } else { 2493 switch (tce.evtype) { 2494 case NDB_TCE_AUTH: goto cleanup; 2495 case NDB_TCE_NOTICE: goto cleanup; 2496 case NDB_TCE_EOSE: goto cleanup; 2497 case NDB_TCE_OK: goto cleanup; 2498 case NDB_TCE_EVENT: 2499 note = tce.event.note; 2500 if (note != buf) { 2501 ndb_debug("note buffer not equal to malloc'd buffer\n"); 2502 goto cleanup; 2503 } 2504 2505 if (!ndb_ingester_process_note(ctx, note, note_size, 2506 out, ingester)) { 2507 ndb_debug("failed to process note\n"); 2508 goto cleanup; 2509 } else { 2510 // we're done with the original json, free it 2511 free(ev->json); 2512 return 1; 2513 } 2514 } 2515 } 2516 2517 2518 cleanup: 2519 free(ev->json); 2520 free(buf); 2521 2522 return ok; 2523 } 2524 2525 static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db) 2526 { 2527 MDB_cursor *mc; 2528 MDB_val key, val; 2529 2530 if (mdb_cursor_open(txn, db, &mc)) 2531 return 0; 2532 2533 if (mdb_cursor_get(mc, &key, &val, MDB_LAST)) { 2534 mdb_cursor_close(mc); 2535 return 0; 2536 } 2537 2538 mdb_cursor_close(mc); 2539 2540 assert(key.mv_size == 8); 2541 return *((uint64_t*)key.mv_data); 2542 } 2543 2544 // 2545 // make a search key meant for user queries without any other note info 2546 static void ndb_make_search_key_low(struct ndb_search_key *key, const char *search) 2547 { 2548 memset(key->id, 0, sizeof(key->id)); 2549 key->timestamp = 0; 2550 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 2551 key->search[sizeof(key->search) - 1] = '\0'; 2552 } 2553 2554 int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query) 2555 { 2556 int rc; 2557 struct ndb_search_key s; 2558 MDB_val k, v; 2559 search->cursor = NULL; 2560 2561 MDB_cursor **cursor = (MDB_cursor **)&search->cursor; 2562 2563 ndb_make_search_key_low(&s, query); 2564 2565 k.mv_data = &s; 2566 k.mv_size = sizeof(s); 2567 2568 if ((rc = mdb_cursor_open(txn->mdb_txn, 2569 txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 2570 cursor))) { 2571 printf("search_profile: cursor opened failed: %s\n", 2572 mdb_strerror(rc)); 2573 return 0; 2574 } 2575 2576 // Position cursor at the next key greater than or equal to the specified key 2577 if (mdb_cursor_get(search->cursor, &k, &v, MDB_SET_RANGE)) { 2578 printf("search_profile: cursor get failed\n"); 2579 goto cleanup; 2580 } else { 2581 search->key = k.mv_data; 2582 assert(v.mv_size == 8); 2583 search->profile_key = *((uint64_t*)v.mv_data); 2584 return 1; 2585 } 2586 2587 cleanup: 2588 mdb_cursor_close(search->cursor); 2589 search->cursor = NULL; 2590 return 0; 2591 } 2592 2593 void ndb_search_profile_end(struct ndb_search *search) 2594 { 2595 if (search->cursor) 2596 mdb_cursor_close(search->cursor); 2597 } 2598 2599 int ndb_search_profile_next(struct ndb_search *search) 2600 { 2601 int rc; 2602 MDB_val k, v; 2603 unsigned char *init_id; 2604 2605 init_id = search->key->id; 2606 k.mv_data = search->key; 2607 k.mv_size = sizeof(*search->key); 2608 2609 retry: 2610 if ((rc = mdb_cursor_get(search->cursor, &k, &v, MDB_NEXT))) { 2611 ndb_debug("ndb_search_profile_next: %s\n", 2612 mdb_strerror(rc)); 2613 return 0; 2614 } else { 2615 search->key = k.mv_data; 2616 assert(v.mv_size == 8); 2617 search->profile_key = *((uint64_t*)v.mv_data); 2618 2619 // skip duplicate pubkeys 2620 if (!memcmp(init_id, search->key->id, 32)) 2621 goto retry; 2622 } 2623 2624 return 1; 2625 } 2626 2627 static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b) 2628 { 2629 int cmp; 2630 struct ndb_search_key *ska, *skb; 2631 2632 ska = a->mv_data; 2633 skb = b->mv_data; 2634 2635 MDB_val a2 = *a; 2636 MDB_val b2 = *b; 2637 2638 a2.mv_data = ska->search; 2639 a2.mv_size = sizeof(ska->search) + sizeof(ska->id); 2640 2641 cmp = mdb_cmp_memn(&a2, &b2); 2642 if (cmp) return cmp; 2643 2644 if (ska->timestamp < skb->timestamp) 2645 return -1; 2646 else if (ska->timestamp > skb->timestamp) 2647 return 1; 2648 2649 return 0; 2650 } 2651 2652 static int ndb_write_profile_pk_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key) 2653 2654 { 2655 MDB_val key, val; 2656 int rc; 2657 struct ndb_tsid tsid; 2658 MDB_dbi pk_db; 2659 2660 pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK]; 2661 2662 // write profile_pk + created_at index 2663 ndb_tsid_init(&tsid, note->pubkey, note->created_at); 2664 2665 key.mv_data = &tsid; 2666 key.mv_size = sizeof(tsid); 2667 val.mv_data = &profile_key; 2668 val.mv_size = sizeof(profile_key); 2669 2670 if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) { 2671 ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n", 2672 profile_key, mdb_strerror(rc)); 2673 return 0; 2674 } 2675 2676 return 1; 2677 } 2678 2679 static int ndb_write_profile(struct ndb_txn *txn, 2680 struct ndb_writer_profile *profile, 2681 uint64_t note_key) 2682 { 2683 uint64_t profile_key; 2684 struct ndb_note *note; 2685 void *flatbuf; 2686 size_t flatbuf_len; 2687 int rc; 2688 2689 MDB_val key, val; 2690 MDB_dbi profile_db; 2691 2692 note = profile->note.note; 2693 2694 // add note_key to profile record 2695 NdbProfileRecord_note_key_add(profile->record.builder, note_key); 2696 NdbProfileRecord_end_as_root(profile->record.builder); 2697 2698 flatbuf = profile->record.flatbuf = 2699 flatcc_builder_finalize_aligned_buffer(profile->record.builder, &flatbuf_len); 2700 2701 assert(((uint64_t)flatbuf % 8) == 0); 2702 2703 // TODO: this may not be safe!? 2704 flatbuf_len = (flatbuf_len + 7) & ~7; 2705 2706 //assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0); 2707 2708 // get dbs 2709 profile_db = txn->lmdb->dbs[NDB_DB_PROFILE]; 2710 2711 // get new key 2712 profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1; 2713 2714 // write profile to profile store 2715 key.mv_data = &profile_key; 2716 key.mv_size = sizeof(profile_key); 2717 val.mv_data = flatbuf; 2718 val.mv_size = flatbuf_len; 2719 2720 if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) { 2721 ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc)); 2722 return 0; 2723 } 2724 2725 // write last fetched record 2726 if (!ndb_maybe_write_last_profile_fetch(txn, note)) { 2727 ndb_debug("failed to write last profile fetched record\n"); 2728 } 2729 2730 // write profile pubkey index 2731 if (!ndb_write_profile_pk_index(txn, note, profile_key)) { 2732 ndb_debug("failed to write profile pubkey index\n"); 2733 return 0; 2734 } 2735 2736 // write name, display_name profile search indices 2737 if (!ndb_write_profile_search_indices(txn, note, profile_key, 2738 flatbuf)) { 2739 ndb_debug("failed to write profile search indices\n"); 2740 return 0; 2741 } 2742 2743 return 1; 2744 } 2745 2746 // find the last id tag in a note (e, p, etc) 2747 static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type) 2748 { 2749 unsigned char *last = NULL; 2750 struct ndb_iterator iter; 2751 struct ndb_str str; 2752 2753 // get the liked event id (last id) 2754 ndb_tags_iterate_start(note, &iter); 2755 2756 while (ndb_tags_iterate_next(&iter)) { 2757 if (iter.tag->count < 2) 2758 continue; 2759 2760 str = ndb_tag_str(note, iter.tag, 0); 2761 2762 // assign liked to the last e tag 2763 if (str.flag == NDB_PACKED_STR && str.str[0] == type) { 2764 str = ndb_tag_str(note, iter.tag, 1); 2765 if (str.flag == NDB_PACKED_ID) 2766 last = str.id; 2767 } 2768 } 2769 2770 return last; 2771 } 2772 2773 void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len) 2774 { 2775 MDB_val k, v; 2776 2777 k.mv_data = (unsigned char*)id; 2778 k.mv_size = 32; 2779 2780 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) { 2781 //ndb_debug("ndb_get_note_meta: mdb_get note failed\n"); 2782 return NULL; 2783 } 2784 2785 if (len) 2786 *len = v.mv_size; 2787 2788 return v.mv_data; 2789 } 2790 2791 // When receiving a reaction note, look for the liked id and increase the 2792 // reaction counter in the note metadata database 2793 static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note) 2794 { 2795 size_t len; 2796 void *root; 2797 int reactions, rc; 2798 MDB_val key, val; 2799 NdbEventMeta_table_t meta; 2800 unsigned char *liked = ndb_note_last_id_tag(note, 'e'); 2801 2802 if (liked == NULL) 2803 return 0; 2804 2805 root = ndb_get_note_meta(txn, liked, &len); 2806 2807 flatcc_builder_t builder; 2808 flatcc_builder_init(&builder); 2809 NdbEventMeta_start_as_root(&builder); 2810 2811 // no meta record, let's make one 2812 if (root == NULL) { 2813 NdbEventMeta_reactions_add(&builder, 1); 2814 } else { 2815 // clone existing and add to it 2816 meta = NdbEventMeta_as_root(root); 2817 2818 reactions = NdbEventMeta_reactions_get(meta); 2819 NdbEventMeta_clone(&builder, meta); 2820 NdbEventMeta_reactions_add(&builder, reactions + 1); 2821 } 2822 2823 NdbProfileRecord_end_as_root(&builder); 2824 root = flatcc_builder_finalize_aligned_buffer(&builder, &len); 2825 assert(((uint64_t)root % 8) == 0); 2826 2827 if (root == NULL) { 2828 ndb_debug("failed to create note metadata record\n"); 2829 return 0; 2830 } 2831 2832 // metadata is keyed on id because we want to collect stats regardless 2833 // if we have the note yet or not 2834 key.mv_data = liked; 2835 key.mv_size = 32; 2836 2837 val.mv_data = root; 2838 val.mv_size = len; 2839 2840 // write the new meta record 2841 //ndb_debug("writing stats record for "); 2842 //print_hex(liked, 32); 2843 //ndb_debug("\n"); 2844 2845 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) { 2846 ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc)); 2847 free(root); 2848 return 0; 2849 } 2850 2851 free(root); 2852 2853 return 1; 2854 } 2855 2856 2857 static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note, 2858 uint64_t note_key) 2859 2860 { 2861 struct ndb_tsid tsid; 2862 int rc; 2863 MDB_val key, val; 2864 MDB_dbi id_db; 2865 2866 ndb_tsid_init(&tsid, note->id, note->created_at); 2867 2868 key.mv_data = &tsid; 2869 key.mv_size = sizeof(tsid); 2870 val.mv_data = ¬e_key; 2871 val.mv_size = sizeof(note_key); 2872 2873 id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 2874 2875 if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) { 2876 ndb_debug("write note id index to db failed: %s\n", 2877 mdb_strerror(rc)); 2878 return 0; 2879 } 2880 2881 return 1; 2882 } 2883 2884 static int ndb_filter_group_add_filters(struct ndb_filter_group *group, 2885 struct ndb_filter *filters, 2886 int num_filters) 2887 { 2888 int i; 2889 2890 for (i = 0; i < num_filters; i++) { 2891 if (!ndb_filter_group_add(group, &filters[i])) 2892 return 0; 2893 } 2894 2895 return 1; 2896 } 2897 2898 2899 static struct ndb_filter_elements * 2900 ndb_filter_find_elements(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) 2901 { 2902 int i; 2903 struct ndb_filter_elements *els; 2904 2905 for (i = 0; i < filter->num_elements; i++) { 2906 els = ndb_filter_get_elements(filter, i); 2907 assert(els); 2908 if (els->field.type == typ) { 2909 return els; 2910 } 2911 } 2912 2913 return NULL; 2914 } 2915 2916 static const char *ndb_filter_find_search(struct ndb_filter *filter) 2917 { 2918 struct ndb_filter_elements *els; 2919 2920 if (!(els = ndb_filter_find_elements(filter, NDB_FILTER_SEARCH))) 2921 return NULL; 2922 2923 return ndb_filter_get_string_element(filter, els, 0); 2924 } 2925 2926 int ndb_filter_is_subset_of(const struct ndb_filter *a, const struct ndb_filter *b) 2927 { 2928 int i; 2929 struct ndb_filter_elements *b_field, *a_field; 2930 2931 // Everything is always a subset of {} 2932 if (b->num_elements == 0) 2933 return 1; 2934 2935 // We can't be a subset if the number of elements in the other 2936 // filter is larger then the number of elements we have. 2937 if (b->num_elements > a->num_elements) 2938 return 0; 2939 2940 // If our filter count matches, we can only be a subset if we are 2941 // equal 2942 if (b->num_elements == a->num_elements) 2943 return ndb_filter_eq(a, b); 2944 2945 // If our element count is larger than the other filter, then we 2946 // must see if every element in the other filter exists in ours. If 2947 // so, then we are a subset of the other. 2948 // 2949 // eg: B={k:1, a:b} <- A={t:x, k:1, a:b} 2950 // 2951 // A is a subset of B because `k:1` and `a:b` both exist in A 2952 2953 for (i = 0; i < b->num_elements; i++) { 2954 b_field = ndb_filter_get_elements((struct ndb_filter*)b, i); 2955 a_field = ndb_filter_find_elements((struct ndb_filter*)a, 2956 b_field->field.type); 2957 2958 if (a_field == NULL) 2959 return 0; 2960 2961 if (!ndb_filter_field_eq((struct ndb_filter*)a, a_field, 2962 (struct ndb_filter*)b, b_field)) 2963 return 0; 2964 } 2965 2966 return 1; 2967 } 2968 2969 int ndb_filter_eq(const struct ndb_filter *a, const struct ndb_filter *b) 2970 { 2971 int i; 2972 struct ndb_filter_elements *a_els, *b_els; 2973 2974 if (a->num_elements != b->num_elements) 2975 return 0; 2976 2977 for (i = 0; i < a->num_elements; i++) { 2978 a_els = ndb_filter_get_elements((struct ndb_filter*)a, i); 2979 b_els = ndb_filter_find_elements((struct ndb_filter *)b, 2980 a_els->field.type); 2981 2982 if (b_els == NULL) 2983 return 0; 2984 2985 if (!ndb_filter_field_eq((struct ndb_filter*)a, a_els, 2986 (struct ndb_filter*)b, b_els)) 2987 return 0; 2988 } 2989 2990 return 1; 2991 } 2992 2993 2994 static uint64_t * 2995 ndb_filter_get_elem(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) 2996 { 2997 struct ndb_filter_elements *els; 2998 if ((els = ndb_filter_find_elements(filter, typ))) 2999 return &els->elements[0]; 3000 return NULL; 3001 } 3002 3003 static uint64_t *ndb_filter_get_int(struct ndb_filter *filter, 3004 enum ndb_filter_fieldtype typ) 3005 { 3006 uint64_t *el; 3007 if (NULL == (el = ndb_filter_get_elem(filter, typ))) 3008 return NULL; 3009 return el; 3010 } 3011 3012 static inline int push_query_result(struct ndb_query_results *results, 3013 struct ndb_query_result *result) 3014 { 3015 return cursor_push(&results->cur, (unsigned char*)result, sizeof(*result)); 3016 } 3017 3018 static int compare_query_results(const void *pa, const void *pb) 3019 { 3020 struct ndb_query_result *a, *b; 3021 3022 a = (struct ndb_query_result *)pa; 3023 b = (struct ndb_query_result *)pb; 3024 3025 if (a->note->created_at == b->note->created_at) { 3026 return 0; 3027 } else if (a->note->created_at > b->note->created_at) { 3028 return -1; 3029 } else { 3030 return 1; 3031 } 3032 } 3033 3034 static void ndb_query_result_init(struct ndb_query_result *res, 3035 struct ndb_note *note, 3036 uint64_t note_size, 3037 uint64_t note_id) 3038 { 3039 *res = (struct ndb_query_result){ 3040 .note_id = note_id, 3041 .note_size = note_size, 3042 .note = note, 3043 }; 3044 } 3045 3046 static int query_is_full(struct ndb_query_results *results, int limit) 3047 { 3048 if (results->cur.p >= results->cur.end) 3049 return 1; 3050 3051 return cursor_count(&results->cur, sizeof(struct ndb_query_result)) >= limit; 3052 } 3053 3054 static int ndb_query_plan_execute_search(struct ndb_txn *txn, 3055 struct ndb_filter *filter, 3056 struct ndb_query_results *results, 3057 int limit) 3058 { 3059 const char *search; 3060 int i; 3061 struct ndb_text_search_results text_results; 3062 struct ndb_text_search_result *text_result; 3063 struct ndb_text_search_config config; 3064 struct ndb_query_result result; 3065 3066 ndb_default_text_search_config(&config); 3067 3068 if (!(search = ndb_filter_find_search(filter))) 3069 return 0; 3070 3071 if (!ndb_text_search_with(txn, search, &text_results, &config, filter)) 3072 return 0; 3073 3074 for (i = 0; i < text_results.num_results; i++) { 3075 if (query_is_full(results, limit)) 3076 break; 3077 3078 text_result = &text_results.results[i]; 3079 3080 result.note = text_result->note; 3081 result.note_size = text_result->note_size; 3082 result.note_id = text_result->key.note_id; 3083 3084 if (!push_query_result(results, &result)) 3085 break; 3086 } 3087 3088 return 1; 3089 } 3090 3091 static int ndb_query_plan_execute_ids(struct ndb_txn *txn, 3092 struct ndb_filter *filter, 3093 struct ndb_query_results *results, 3094 int limit) 3095 { 3096 MDB_cursor *cur; 3097 MDB_dbi db; 3098 MDB_val k, v; 3099 int rc, i; 3100 struct ndb_filter_elements *ids; 3101 struct ndb_note *note; 3102 struct ndb_query_result res; 3103 struct ndb_tsid tsid, *ptsid; 3104 uint64_t note_id, until, *pint; 3105 size_t note_size; 3106 unsigned char *id; 3107 3108 until = UINT64_MAX; 3109 3110 if (!(ids = ndb_filter_find_elements(filter, NDB_FILTER_IDS))) 3111 return 0; 3112 3113 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3114 until = *pint; 3115 3116 db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 3117 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3118 return 0; 3119 3120 // for each id in our ids filter, find in the db 3121 for (i = 0; i < ids->count; i++) { 3122 if (query_is_full(results, limit)) 3123 break; 3124 3125 id = ndb_filter_get_id_element(filter, ids, i); 3126 ndb_tsid_init(&tsid, (unsigned char *)id, until); 3127 3128 k.mv_data = &tsid; 3129 k.mv_size = sizeof(tsid); 3130 3131 if (!ndb_cursor_start(cur, &k, &v)) 3132 continue; 3133 3134 ptsid = (struct ndb_tsid *)k.mv_data; 3135 note_id = *(uint64_t*)v.mv_data; 3136 3137 if (memcmp(id, ptsid->id, 32)) 3138 continue; 3139 3140 // get the note because we need it to match against the filter 3141 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3142 continue; 3143 3144 // Sure this particular lookup matched the index query, but 3145 // does it match the entire filter? Check! We also pass in 3146 // things we've already matched via the filter so we don't have 3147 // to check again. This can be pretty important for filters 3148 // with a large number of entries. 3149 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_IDS)) 3150 continue; 3151 3152 ndb_query_result_init(&res, note, note_size, note_id); 3153 if (!push_query_result(results, &res)) 3154 break; 3155 } 3156 3157 mdb_cursor_close(cur); 3158 return 1; 3159 } 3160 3161 // 3162 // encode a tag index key 3163 // 3164 // consists of: 3165 // 3166 // u8 tag 3167 // u8 tag_val_len 3168 // [u8] tag_val_bytes 3169 // u64 created_at 3170 // 3171 static int ndb_encode_tag_key(unsigned char *buf, int buf_size, 3172 char tag, const unsigned char *val, 3173 unsigned char val_len, 3174 uint64_t timestamp) 3175 { 3176 struct cursor writer; 3177 int ok; 3178 3179 // quick exit for obvious case where it will be too big. There can be 3180 // values of val_len that still fail, but we just let the writer handle 3181 // those failure cases 3182 if (val_len >= buf_size) 3183 return 0; 3184 3185 make_cursor(buf, buf + buf_size, &writer); 3186 3187 ok = 3188 cursor_push_byte(&writer, tag) && 3189 cursor_push(&writer, (unsigned char*)val, val_len) && 3190 cursor_push(&writer, (unsigned char*)×tamp, sizeof(timestamp)); 3191 3192 if (!ok) 3193 return 0; 3194 3195 return writer.p - writer.start; 3196 } 3197 3198 static int ndb_query_plan_execute_authors(struct ndb_txn *txn, 3199 struct ndb_filter *filter, 3200 struct ndb_query_results *results, 3201 int limit) 3202 { 3203 MDB_val k, v; 3204 MDB_cursor *cur; 3205 int rc, i; 3206 uint64_t *pint, until, since, note_key; 3207 unsigned char *author; 3208 struct ndb_note *note; 3209 size_t note_size; 3210 struct ndb_filter_elements *authors; 3211 struct ndb_query_result res; 3212 struct ndb_tsid tsid, *ptsid; 3213 enum ndb_dbs db; 3214 3215 db = txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY]; 3216 3217 if (!(authors = ndb_filter_find_elements(filter, NDB_FILTER_AUTHORS))) 3218 return 0; 3219 3220 until = UINT64_MAX; 3221 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3222 until = *pint; 3223 3224 since = 0; 3225 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3226 since = *pint; 3227 3228 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3229 return 0; 3230 3231 for (i = 0; i < authors->count; i++) { 3232 author = ndb_filter_get_id_element(filter, authors, i); 3233 3234 ndb_tsid_init(&tsid, author, until); 3235 3236 k.mv_data = &tsid; 3237 k.mv_size = sizeof(tsid); 3238 3239 if (!ndb_cursor_start(cur, &k, &v)) 3240 continue; 3241 3242 // for each id in our ids filter, find in the db 3243 while (!query_is_full(results, limit)) { 3244 ptsid = (struct ndb_tsid *)k.mv_data; 3245 note_key = *(uint64_t*)v.mv_data; 3246 3247 // don't continue the scan if we're below `since` 3248 if (ptsid->timestamp < since) 3249 break; 3250 3251 // our author should match, if not bail 3252 if (memcmp(author, ptsid->id, 32)) 3253 break; 3254 3255 // fetch the note, we need it for our query results 3256 // and to match further against the filter 3257 if (!(note = ndb_get_note_by_key(txn, note_key, ¬e_size))) 3258 goto next; 3259 3260 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_AUTHORS)) 3261 goto next; 3262 3263 ndb_query_result_init(&res, note, note_size, note_key); 3264 if (!push_query_result(results, &res)) 3265 break; 3266 3267 next: 3268 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3269 break; 3270 } 3271 } 3272 3273 mdb_cursor_close(cur); 3274 return 1; 3275 } 3276 3277 static int ndb_query_plan_execute_created_at(struct ndb_txn *txn, 3278 struct ndb_filter *filter, 3279 struct ndb_query_results *results, 3280 int limit) 3281 { 3282 MDB_dbi db; 3283 MDB_val k, v; 3284 MDB_cursor *cur; 3285 int rc; 3286 struct ndb_note *note; 3287 struct ndb_tsid key, *pkey; 3288 uint64_t *pint, until, since, note_id; 3289 size_t note_size; 3290 struct ndb_query_result res; 3291 unsigned char high_key[32] = {0xFF}; 3292 3293 db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 3294 3295 until = UINT64_MAX; 3296 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3297 until = *pint; 3298 3299 since = 0; 3300 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3301 since = *pint; 3302 3303 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3304 return 0; 3305 3306 // if we have until, start there, otherwise just use max 3307 ndb_tsid_init(&key, high_key, until); 3308 k.mv_data = &key; 3309 k.mv_size = sizeof(key); 3310 3311 if (!ndb_cursor_start(cur, &k, &v)) 3312 return 1; 3313 3314 while (!query_is_full(results, limit)) { 3315 pkey = (struct ndb_tsid *)k.mv_data; 3316 note_id = *(uint64_t*)v.mv_data; 3317 assert(v.mv_size == 8); 3318 3319 // don't continue the scan if we're below `since` 3320 if (pkey->timestamp < since) 3321 break; 3322 3323 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3324 goto next; 3325 3326 // does this entry match our filter? 3327 if (!ndb_filter_matches_with(filter, note, 0)) 3328 goto next; 3329 3330 ndb_query_result_init(&res, note, (uint64_t)note_size, note_id); 3331 if (!push_query_result(results, &res)) 3332 break; 3333 next: 3334 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3335 break; 3336 } 3337 3338 mdb_cursor_close(cur); 3339 return 1; 3340 } 3341 3342 static int ndb_query_plan_execute_tags(struct ndb_txn *txn, 3343 struct ndb_filter *filter, 3344 struct ndb_query_results *results, 3345 int limit) 3346 { 3347 MDB_cursor *cur; 3348 MDB_dbi db; 3349 MDB_val k, v; 3350 int len, taglen, rc, i; 3351 uint64_t *pint, until, note_id; 3352 size_t note_size; 3353 unsigned char key_buffer[255]; 3354 struct ndb_note *note; 3355 struct ndb_filter_elements *tags; 3356 unsigned char *tag; 3357 struct ndb_query_result res; 3358 3359 db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; 3360 3361 if (!(tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS))) 3362 return 0; 3363 3364 until = UINT64_MAX; 3365 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3366 until = *pint; 3367 3368 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3369 return 0; 3370 3371 for (i = 0; i < tags->count; i++) { 3372 tag = ndb_filter_get_id_element(filter, tags, i); 3373 3374 taglen = tags->field.elem_type == NDB_ELEMENT_ID 3375 ? 32 : strlen((const char*)tag); 3376 3377 if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), 3378 tags->field.tag, tag, taglen, 3379 until))) 3380 return 0; 3381 3382 k.mv_data = key_buffer; 3383 k.mv_size = len; 3384 3385 if (!ndb_cursor_start(cur, &k, &v)) 3386 continue; 3387 3388 // for each id in our ids filter, find in the db 3389 while (!query_is_full(results, limit)) { 3390 // check if tag value matches, bail if not 3391 if (((unsigned char *)k.mv_data)[0] != tags->field.tag) 3392 break; 3393 3394 // check if tag value matches, bail if not 3395 if (taglen != k.mv_size - 9) 3396 break; 3397 3398 if (memcmp((unsigned char *)k.mv_data+1, tag, k.mv_size-9)) 3399 break; 3400 3401 note_id = *(uint64_t*)v.mv_data; 3402 3403 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3404 goto next; 3405 3406 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_TAGS)) 3407 goto next; 3408 3409 ndb_query_result_init(&res, note, note_size, note_id); 3410 if (!push_query_result(results, &res)) 3411 break; 3412 3413 next: 3414 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3415 break; 3416 } 3417 } 3418 3419 mdb_cursor_close(cur); 3420 return 1; 3421 } 3422 3423 static int ndb_query_plan_execute_kinds(struct ndb_txn *txn, 3424 struct ndb_filter *filter, 3425 struct ndb_query_results *results, 3426 int limit) 3427 { 3428 MDB_cursor *cur; 3429 MDB_dbi db; 3430 MDB_val k, v; 3431 struct ndb_note *note; 3432 struct ndb_u64_ts tsid, *ptsid; 3433 struct ndb_filter_elements *kinds; 3434 struct ndb_query_result res; 3435 uint64_t kind, note_id, until, since, *pint; 3436 size_t note_size; 3437 int i, rc; 3438 3439 // we should have kinds in a kinds filter! 3440 if (!(kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS))) 3441 return 0; 3442 3443 until = UINT64_MAX; 3444 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3445 until = *pint; 3446 3447 since = 0; 3448 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3449 since = *pint; 3450 3451 db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 3452 3453 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3454 return 0; 3455 3456 for (i = 0; i < kinds->count; i++) { 3457 if (query_is_full(results, limit)) 3458 break; 3459 3460 kind = kinds->elements[i]; 3461 ndb_debug("kind %" PRIu64 "\n", kind); 3462 ndb_u64_ts_init(&tsid, kind, until); 3463 3464 k.mv_data = &tsid; 3465 k.mv_size = sizeof(tsid); 3466 3467 if (!ndb_cursor_start(cur, &k, &v)) 3468 continue; 3469 3470 // for each id in our ids filter, find in the db 3471 while (!query_is_full(results, limit)) { 3472 ptsid = (struct ndb_u64_ts *)k.mv_data; 3473 if (ptsid->u64 != kind) 3474 break; 3475 3476 // don't continue the scan if we're below `since` 3477 if (ptsid->timestamp < since) 3478 break; 3479 3480 note_id = *(uint64_t*)v.mv_data; 3481 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3482 goto next; 3483 3484 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_KINDS)) 3485 goto next; 3486 3487 ndb_query_result_init(&res, note, note_size, note_id); 3488 if (!push_query_result(results, &res)) 3489 break; 3490 3491 next: 3492 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3493 break; 3494 } 3495 } 3496 3497 mdb_cursor_close(cur); 3498 return 1; 3499 } 3500 3501 static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter) 3502 { 3503 struct ndb_filter_elements *ids, *kinds, *authors, *tags, *search; 3504 3505 ids = ndb_filter_find_elements(filter, NDB_FILTER_IDS); 3506 search = ndb_filter_find_elements(filter, NDB_FILTER_SEARCH); 3507 kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS); 3508 authors = ndb_filter_find_elements(filter, NDB_FILTER_AUTHORS); 3509 tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS); 3510 3511 // this is rougly similar to the heuristic in strfry's dbscan 3512 if (search) { 3513 return NDB_PLAN_SEARCH; 3514 } else if (ids) { 3515 return NDB_PLAN_IDS; 3516 } else if (kinds && authors && authors->count <= 10) { 3517 return NDB_PLAN_AUTHOR_KINDS; 3518 } else if (authors && authors->count <= 10) { 3519 return NDB_PLAN_AUTHORS; 3520 } else if (tags && tags->count <= 10) { 3521 return NDB_PLAN_TAGS; 3522 } else if (kinds) { 3523 return NDB_PLAN_KINDS; 3524 } 3525 3526 return NDB_PLAN_CREATED; 3527 } 3528 3529 static const char *ndb_query_plan_name(int plan_id) 3530 { 3531 switch (plan_id) { 3532 case NDB_PLAN_IDS: return "ids"; 3533 case NDB_PLAN_SEARCH: return "search"; 3534 case NDB_PLAN_KINDS: return "kinds"; 3535 case NDB_PLAN_TAGS: return "tags"; 3536 case NDB_PLAN_CREATED: return "created"; 3537 case NDB_PLAN_AUTHORS: return "authors"; 3538 } 3539 3540 return "unknown"; 3541 } 3542 3543 static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter, 3544 struct ndb_query_result *res, int capacity, 3545 int *results_out) 3546 { 3547 struct ndb_query_results results; 3548 uint64_t limit, *pint; 3549 enum ndb_query_plan plan; 3550 limit = capacity; 3551 3552 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT))) 3553 limit = *pint; 3554 3555 limit = min(capacity, limit); 3556 make_cursor((unsigned char *)res, 3557 ((unsigned char *)res) + limit * sizeof(*res), 3558 &results.cur); 3559 3560 plan = ndb_filter_plan(filter); 3561 ndb_debug("using query plan '%s'\n", ndb_query_plan_name(plan)); 3562 switch (plan) { 3563 // We have a list of ids, just open a cursor and jump to each once 3564 case NDB_PLAN_IDS: 3565 if (!ndb_query_plan_execute_ids(txn, filter, &results, limit)) 3566 return 0; 3567 break; 3568 3569 case NDB_PLAN_SEARCH: 3570 if (!ndb_query_plan_execute_search(txn, filter, &results, limit)) 3571 return 0; 3572 break; 3573 3574 // We have just kinds, just scan the kind index 3575 case NDB_PLAN_KINDS: 3576 if (!ndb_query_plan_execute_kinds(txn, filter, &results, limit)) 3577 return 0; 3578 break; 3579 3580 case NDB_PLAN_TAGS: 3581 if (!ndb_query_plan_execute_tags(txn, filter, &results, limit)) 3582 return 0; 3583 break; 3584 case NDB_PLAN_CREATED: 3585 if (!ndb_query_plan_execute_created_at(txn, filter, &results, limit)) 3586 return 0; 3587 break; 3588 case NDB_PLAN_AUTHORS: 3589 if (!ndb_query_plan_execute_authors(txn, filter, &results, limit)) 3590 return 0; 3591 break; 3592 case NDB_PLAN_AUTHOR_KINDS: 3593 /* TODO: author kinds 3594 if (!ndb_query_plan_execute_author_kinds(txn, filter, &results, limit)) 3595 return 0; 3596 */ 3597 if (!ndb_query_plan_execute_authors(txn, filter, &results, limit)) 3598 return 0; 3599 break; 3600 } 3601 3602 *results_out = cursor_count(&results.cur, sizeof(*res)); 3603 return 1; 3604 } 3605 3606 int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters, 3607 struct ndb_query_result *results, int result_capacity, int *count) 3608 { 3609 int i, out; 3610 struct ndb_query_result *p = results; 3611 3612 out = 0; 3613 *count = 0; 3614 3615 for (i = 0; i < num_filters; i++) { 3616 if (!ndb_query_filter(txn, &filters[i], p, 3617 result_capacity, &out)) { 3618 return 0; 3619 } 3620 3621 *count += out; 3622 p += out; 3623 result_capacity -= out; 3624 if (result_capacity <= 0) 3625 break; 3626 } 3627 3628 // sort results 3629 qsort(results, *count, sizeof(*results), compare_query_results); 3630 return 1; 3631 } 3632 3633 static int ndb_write_note_tag_index(struct ndb_txn *txn, struct ndb_note *note, 3634 uint64_t note_key) 3635 { 3636 unsigned char key_buffer[255]; 3637 struct ndb_iterator iter; 3638 struct ndb_str tkey, tval; 3639 char tchar; 3640 int len, rc; 3641 MDB_val key, val; 3642 MDB_dbi tags_db; 3643 3644 tags_db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; 3645 3646 ndb_tags_iterate_start(note, &iter); 3647 3648 while (ndb_tags_iterate_next(&iter)) { 3649 if (iter.tag->count < 2) 3650 continue; 3651 3652 tkey = ndb_tag_str(note, iter.tag, 0); 3653 3654 // we only write indices for 1-char tags. 3655 tchar = tkey.str[0]; 3656 if (tchar == 0 || tkey.str[1] != 0) 3657 continue; 3658 3659 tval = ndb_tag_str(note, iter.tag, 1); 3660 len = ndb_str_len(&tval); 3661 3662 if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), 3663 tchar, tval.id, (unsigned char)len, 3664 ndb_note_created_at(note)))) { 3665 // this will fail when we try to encode a key that is 3666 // too big 3667 continue; 3668 } 3669 3670 //ndb_debug("writing tag '%c':'data:%d' to index\n", tchar, len); 3671 3672 key.mv_data = key_buffer; 3673 key.mv_size = len; 3674 3675 val.mv_data = ¬e_key; 3676 val.mv_size = sizeof(note_key); 3677 3678 if ((rc = mdb_put(txn->mdb_txn, tags_db, &key, &val, 0))) { 3679 ndb_debug("write note tag index to db failed: %s\n", 3680 mdb_strerror(rc)); 3681 return 0; 3682 } 3683 } 3684 3685 return 1; 3686 } 3687 3688 static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note, 3689 uint64_t note_key) 3690 { 3691 struct ndb_u64_ts tsid; 3692 int rc; 3693 MDB_val key, val; 3694 MDB_dbi kind_db; 3695 3696 ndb_u64_ts_init(&tsid, note->kind, note->created_at); 3697 3698 key.mv_data = &tsid; 3699 key.mv_size = sizeof(tsid); 3700 val.mv_data = ¬e_key; 3701 val.mv_size = sizeof(note_key); 3702 3703 kind_db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 3704 3705 if ((rc = mdb_put(txn->mdb_txn, kind_db, &key, &val, 0))) { 3706 ndb_debug("write note kind index to db failed: %s\n", 3707 mdb_strerror(rc)); 3708 return 0; 3709 } 3710 3711 return 1; 3712 } 3713 3714 static int ndb_write_word_to_index(struct ndb_txn *txn, const char *word, 3715 int word_len, int word_index, 3716 uint64_t timestamp, uint64_t note_id) 3717 { 3718 // cap to some reasonable key size 3719 unsigned char buffer[1024]; 3720 int keysize, rc; 3721 MDB_val k, v; 3722 MDB_dbi text_db; 3723 3724 // build our compressed text index key 3725 if (!ndb_make_text_search_key(buffer, sizeof(buffer), word_index, 3726 word_len, word, timestamp, note_id, 3727 &keysize)) { 3728 // probably too big 3729 3730 return 0; 3731 } 3732 3733 k.mv_data = buffer; 3734 k.mv_size = keysize; 3735 3736 v.mv_data = NULL; 3737 v.mv_size = 0; 3738 3739 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 3740 3741 if ((rc = mdb_put(txn->mdb_txn, text_db, &k, &v, 0))) { 3742 ndb_debug("write note text index to db failed: %s\n", 3743 mdb_strerror(rc)); 3744 return 0; 3745 } 3746 3747 return 1; 3748 } 3749 3750 3751 3752 // break a string into individual words for querying or for building the 3753 // fulltext search index. This is callback based so we don't need to 3754 // build up an intermediate structure 3755 static int ndb_parse_words(struct cursor *cur, void *ctx, ndb_word_parser_fn fn) 3756 { 3757 int word_len, words; 3758 const char *word; 3759 3760 words = 0; 3761 3762 while (cur->p < cur->end) { 3763 consume_whitespace_or_punctuation(cur); 3764 if (cur->p >= cur->end) 3765 break; 3766 word = (const char *)cur->p; 3767 3768 if (!consume_until_boundary(cur)) 3769 break; 3770 3771 // start of word or end 3772 word_len = cur->p - (unsigned char *)word; 3773 if (word_len == 0 && cur->p >= cur->end) 3774 break; 3775 3776 if (word_len == 0) { 3777 if (!cursor_skip(cur, 1)) 3778 break; 3779 continue; 3780 } 3781 3782 //ndb_debug("writing word index '%.*s'\n", word_len, word); 3783 3784 if (!fn(ctx, word, word_len, words)) 3785 continue; 3786 3787 words++; 3788 } 3789 3790 return 1; 3791 } 3792 3793 struct ndb_word_writer_ctx 3794 { 3795 struct ndb_txn *txn; 3796 struct ndb_note *note; 3797 uint64_t note_id; 3798 }; 3799 3800 static int ndb_fulltext_word_writer(void *ctx, 3801 const char *word, int word_len, int words) 3802 { 3803 struct ndb_word_writer_ctx *wctx = ctx; 3804 3805 if (!ndb_write_word_to_index(wctx->txn, word, word_len, words, 3806 wctx->note->created_at, wctx->note_id)) { 3807 // too big to write this one, just skip it 3808 ndb_debug("failed to write word '%.*s' to index\n", word_len, word); 3809 3810 return 0; 3811 } 3812 3813 //fprintf(stderr, "wrote '%.*s' to note text index\n", word_len, word); 3814 return 1; 3815 } 3816 3817 static int ndb_write_note_fulltext_index(struct ndb_txn *txn, 3818 struct ndb_note *note, 3819 uint64_t note_id) 3820 { 3821 struct cursor cur; 3822 unsigned char *content; 3823 struct ndb_str str; 3824 struct ndb_word_writer_ctx ctx; 3825 3826 str = ndb_note_str(note, ¬e->content); 3827 // I don't think this should happen? 3828 if (unlikely(str.flag == NDB_PACKED_ID)) 3829 return 0; 3830 3831 content = (unsigned char *)str.str; 3832 3833 make_cursor(content, content + note->content_length, &cur); 3834 3835 ctx.txn = txn; 3836 ctx.note = note; 3837 ctx.note_id = note_id; 3838 3839 ndb_parse_words(&cur, &ctx, ndb_fulltext_word_writer); 3840 3841 return 1; 3842 } 3843 3844 static int ndb_parse_search_words(void *ctx, const char *word_str, int word_len, int word_index) 3845 { 3846 (void)word_index; 3847 struct ndb_search_words *words = ctx; 3848 struct ndb_word *word; 3849 3850 if (words->num_words + 1 > MAX_TEXT_SEARCH_WORDS) 3851 return 0; 3852 3853 word = &words->words[words->num_words++]; 3854 word->word = word_str; 3855 word->word_len = word_len; 3856 3857 return 1; 3858 } 3859 3860 static void ndb_search_words_init(struct ndb_search_words *words) 3861 { 3862 words->num_words = 0; 3863 } 3864 3865 static int prefix_count(const char *str1, int len1, const char *str2, int len2) { 3866 int i, count = 0; 3867 int min_len = len1 < len2 ? len1 : len2; 3868 3869 for (i = 0; i < min_len; i++) { 3870 // case insensitive 3871 if (tolower(str1[i]) == tolower(str2[i])) 3872 count++; 3873 else 3874 break; 3875 } 3876 3877 return count; 3878 } 3879 3880 static int ndb_prefix_matches(struct ndb_text_search_result *result, 3881 struct ndb_word *search_word) 3882 { 3883 // Empty strings shouldn't happen but let's 3884 if (result->key.str_len < 2 || search_word->word_len < 2) 3885 return 0; 3886 3887 // make sure we at least have two matching prefix characters. exact 3888 // matches are nice but range searches allow us to match prefixes as 3889 // well. A double-char prefix is suffient, but maybe we could up this 3890 // in the future. 3891 // 3892 // TODO: How are we handling utf-8 prefix matches like 3893 // japanese? 3894 // 3895 if ( result->key.str[0] != tolower(search_word->word[0]) 3896 && result->key.str[1] != tolower(search_word->word[1]) 3897 ) 3898 return 0; 3899 3900 // count the number of prefix-matched characters. This will be used 3901 // for ranking search results 3902 result->prefix_chars = prefix_count(result->key.str, 3903 result->key.str_len, 3904 search_word->word, 3905 search_word->word_len); 3906 3907 if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5)) 3908 return 0; 3909 3910 return 1; 3911 } 3912 3913 // This is called when scanning the full text search index. Scanning stops 3914 // when we no longer have a prefix match for the word 3915 static int ndb_text_search_next_word(MDB_cursor *cursor, MDB_cursor_op op, 3916 MDB_val *k, struct ndb_word *search_word, 3917 struct ndb_text_search_result *last_result, 3918 struct ndb_text_search_result *result, 3919 MDB_cursor_op order_op) 3920 { 3921 struct cursor key_cursor; 3922 //struct ndb_text_search_key search_key; 3923 MDB_val v; 3924 int retries; 3925 retries = -1; 3926 3927 make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor); 3928 3929 // When op is MDB_SET_RANGE, this initializes the search. Position 3930 // the cursor at the next key greater than or equal to the specified 3931 // key. 3932 // 3933 // Subsequent searches should use MDB_NEXT 3934 if (mdb_cursor_get(cursor, k, &v, op)) { 3935 // we should only do this if we're going in reverse 3936 if (op == MDB_SET_RANGE && order_op == MDB_PREV) { 3937 // if set range worked and our key exists, it should be 3938 // the one right before this one 3939 if (mdb_cursor_get(cursor, k, &v, MDB_PREV)) 3940 return 0; 3941 } else { 3942 return 0; 3943 } 3944 } 3945 3946 retry: 3947 retries++; 3948 /* 3949 printf("continuing from "); 3950 if (ndb_unpack_text_search_key(k->mv_data, k->mv_size, &search_key)) { 3951 ndb_print_text_search_key(&search_key); 3952 } else { printf("??"); } 3953 printf("\n"); 3954 */ 3955 3956 make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor); 3957 3958 if (unlikely(!ndb_unpack_text_search_key_noteid(&key_cursor, &result->key.note_id))) { 3959 fprintf(stderr, "UNUSUAL: failed to unpack text search key note_id\n"); 3960 return 0; 3961 } 3962 3963 // if the note id doesn't match the last result, then we stop trying 3964 // each search word 3965 if (last_result && last_result->key.note_id != result->key.note_id) { 3966 return 0; 3967 } 3968 3969 // On success, this could still be not related at all. 3970 // It could just be adjacent to the word. Let's check 3971 // if we have a matching prefix at least. 3972 3973 // Before we unpack the entire key, let's quickly 3974 // unpack just the string to check the prefix. We don't 3975 // need to unpack the entire key if the prefix doesn't 3976 // match 3977 if (!ndb_unpack_text_search_key_string(&key_cursor, 3978 &result->key.str, 3979 &result->key.str_len)) { 3980 // this should never happen 3981 fprintf(stderr, "UNUSUAL: failed to unpack text search key string\n"); 3982 return 0; 3983 } 3984 3985 if (!ndb_prefix_matches(result, search_word)) { 3986 /* 3987 printf("result prefix '%.*s' didn't match search word '%.*s'\n", 3988 result->key.str_len, result->key.str, 3989 search_word->word_len, search_word->word); 3990 */ 3991 // we should only do this if we're going in reverse 3992 if (retries == 0 && op == MDB_SET_RANGE && order_op == MDB_PREV) { 3993 // if set range worked and our key exists, it should be 3994 // the one right before this one 3995 mdb_cursor_get(cursor, k, &v, MDB_PREV); 3996 goto retry; 3997 } else { 3998 return 0; 3999 } 4000 } 4001 4002 // Unpack the remaining text search key, we will need this information 4003 // when building up our search results. 4004 if (!ndb_unpack_remaining_text_search_key(&key_cursor, &result->key)) { 4005 // This should never happen 4006 fprintf(stderr, "UNUSUAL: failed to unpack text search key\n"); 4007 return 0; 4008 } 4009 4010 /* 4011 if (last_result) { 4012 if (result->key.word_index < last_result->key.word_index) { 4013 fprintf(stderr, "skipping '%.*s' because it is before last result '%.*s'\n", 4014 result->key.str_len, result->key.str, 4015 last_result->key.str_len, last_result->key.str); 4016 return 0; 4017 } 4018 } 4019 */ 4020 4021 return 1; 4022 } 4023 4024 static void ndb_text_search_results_init( 4025 struct ndb_text_search_results *results) { 4026 results->num_results = 0; 4027 } 4028 4029 void ndb_default_text_search_config(struct ndb_text_search_config *cfg) 4030 { 4031 cfg->order = NDB_ORDER_DESCENDING; 4032 cfg->limit = MAX_TEXT_SEARCH_RESULTS; 4033 } 4034 4035 void ndb_text_search_config_set_order(struct ndb_text_search_config *cfg, 4036 enum ndb_search_order order) 4037 { 4038 cfg->order = order; 4039 } 4040 4041 void ndb_text_search_config_set_limit(struct ndb_text_search_config *cfg, int limit) 4042 { 4043 cfg->limit = limit; 4044 } 4045 4046 static int compare_search_words(const void *pa, const void *pb) 4047 { 4048 struct ndb_word *a, *b; 4049 4050 a = (struct ndb_word *)pa; 4051 b = (struct ndb_word *)pb; 4052 4053 if (a->word_len == b->word_len) { 4054 return 0; 4055 } else if (a->word_len > b->word_len) { 4056 // biggest words should be at the front of the list, 4057 // so we say it's "smaller" here 4058 return -1; 4059 } else { 4060 return 1; 4061 } 4062 } 4063 4064 // Sort search words from largest to smallest. Larger words are less likely 4065 // in the index, allowing our scan to walk fewer words at the root when 4066 // recursively matching. 4067 void sort_largest_to_smallest(struct ndb_search_words *words) 4068 { 4069 qsort(words->words, words->num_words, sizeof(words->words[0]), compare_search_words); 4070 } 4071 4072 4073 int ndb_text_search_with(struct ndb_txn *txn, const char *query, 4074 struct ndb_text_search_results *results, 4075 struct ndb_text_search_config *config, 4076 struct ndb_filter *filter) 4077 { 4078 unsigned char buffer[1024], *buf; 4079 unsigned char saved_buf[1024], *saved; 4080 struct ndb_text_search_result *result, *last_result; 4081 struct ndb_text_search_result candidate, last_candidate; 4082 struct ndb_search_words search_words; 4083 //struct ndb_text_search_key search_key; 4084 struct ndb_word *search_word; 4085 struct ndb_note *note; 4086 struct cursor cur; 4087 uint64_t since, until, timestamp_op, *pint; 4088 size_t note_size; 4089 ndb_text_search_key_order_fn key_order_fn; 4090 MDB_dbi text_db; 4091 MDB_cursor *cursor; 4092 MDB_val k, v; 4093 int i, j, keysize, saved_size, limit; 4094 MDB_cursor_op op, order_op; 4095 4096 note_size = 0; 4097 note = 0; 4098 saved = NULL; 4099 ndb_text_search_results_init(results); 4100 ndb_search_words_init(&search_words); 4101 4102 until = UINT64_MAX; 4103 since = 0; 4104 limit = MAX_TEXT_SEARCH_RESULTS; 4105 4106 // until, since from filter 4107 if (filter != NULL) { 4108 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 4109 until = *pint; 4110 4111 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 4112 since = *pint; 4113 4114 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT))) 4115 limit = *pint; 4116 } 4117 4118 order_op = MDB_PREV; 4119 key_order_fn = ndb_make_text_search_key_high; 4120 timestamp_op = until; 4121 if (config) { 4122 if (config->order == NDB_ORDER_ASCENDING) { 4123 order_op = MDB_NEXT; 4124 // set the min timestamp value to since when ascending 4125 timestamp_op = since; 4126 key_order_fn = ndb_make_text_search_key_low; 4127 } 4128 limit = min(limit, config->limit); 4129 } 4130 // end search config 4131 4132 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 4133 make_cursor((unsigned char *)query, (unsigned char *)query + strlen(query), &cur); 4134 4135 ndb_parse_words(&cur, &search_words, ndb_parse_search_words); 4136 if (search_words.num_words == 0) 4137 return 0; 4138 4139 if ((i = mdb_cursor_open(txn->mdb_txn, text_db, &cursor))) { 4140 fprintf(stderr, "nd_text_search: mdb_cursor_open failed, error %d\n", i); 4141 return 0; 4142 } 4143 4144 // This should complete the query quicker because the larger words are 4145 // likely to have fewer entries in the search index. This is not always 4146 // true. Words with higher frequency (like bitcoin on nostr in 2024) 4147 // may be slower. TODO: Skip word recursion by leveraging a minimal 4148 // perfect hashmap of parsed words on a note 4149 sort_largest_to_smallest(&search_words); 4150 4151 // for each word, we recursively find all of the submatches 4152 while (results->num_results < limit) { 4153 last_result = NULL; 4154 result = &results->results[results->num_results]; 4155 4156 // if we have saved, then we continue from the last root search 4157 // sequence 4158 if (saved) { 4159 buf = saved_buf; 4160 saved = NULL; 4161 keysize = saved_size; 4162 4163 k.mv_data = buf; 4164 k.mv_size = saved_size; 4165 4166 // reposition the cursor so we can continue 4167 if (mdb_cursor_get(cursor, &k, &v, MDB_SET_RANGE)) 4168 break; 4169 4170 op = order_op; 4171 } else { 4172 // construct a packed fulltext search key using this 4173 // word. This key doesn't contain any timestamp or index 4174 // info, so it should range match instead of exact 4175 // match 4176 if (!key_order_fn(buffer, sizeof(buffer), 4177 search_words.words[0].word_len, 4178 search_words.words[0].word, 4179 timestamp_op, 4180 &keysize)) 4181 { 4182 // word is too big to fit in 1024-sized key 4183 continue; 4184 } 4185 4186 buf = buffer; 4187 op = MDB_SET_RANGE; 4188 } 4189 4190 for (j = 0; j < search_words.num_words; j++) { 4191 search_word = &search_words.words[j]; 4192 4193 // shouldn't happen but let's be defensive a bit 4194 if (search_word->word_len == 0) 4195 continue; 4196 4197 // if we already matched a note in this phrase, make 4198 // sure we're including the note id in the query 4199 if (last_result) { 4200 // we are narrowing down a search. 4201 // if we already have this note id, just continue 4202 for (i = 0; i < results->num_results; i++) { 4203 if (results->results[i].key.note_id == last_result->key.note_id) 4204 // we can't break here to 4205 // leave the word loop so 4206 // have to use a goto 4207 goto cont; 4208 } 4209 4210 if (!ndb_make_noted_text_search_key( 4211 buffer, sizeof(buffer), 4212 search_word->word_len, 4213 search_word->word, 4214 last_result->key.timestamp, 4215 last_result->key.note_id, 4216 &keysize)) 4217 { 4218 continue; 4219 } 4220 4221 buf = buffer; 4222 } 4223 4224 k.mv_data = buf; 4225 k.mv_size = keysize; 4226 4227 // TODO: we can speed this up with the minimal perfect 4228 // hashmap by quickly rejecting the remaining words 4229 // by looking in the word hashmap on the note. This 4230 // would allow us to skip the recursive word lookup 4231 // thing 4232 if (!ndb_text_search_next_word(cursor, op, &k, 4233 search_word, 4234 last_result, 4235 &candidate, 4236 order_op)) { 4237 // we didn't find a match for this note_id 4238 if (j == 0) 4239 // if we're at one of the root words, 4240 // this means that there are no further 4241 // root word matches for any note, so 4242 // we know we're done 4243 goto done; 4244 else 4245 break; 4246 } 4247 4248 *result = candidate; 4249 op = MDB_SET_RANGE; 4250 4251 // save the first key match, since we will continue from 4252 // this on the next root word result 4253 if (j == 0) { 4254 if (!saved) { 4255 memcpy(saved_buf, k.mv_data, k.mv_size); 4256 saved = saved_buf; 4257 saved_size = k.mv_size; 4258 } 4259 4260 // since we will be trying to match the same 4261 // note_id on all subsequent word matches, 4262 // let's lookup this note and make sure it 4263 // matches the filter if we have one. If it 4264 // doesn't match, we can quickly skip the 4265 // remaining word queries 4266 if (filter) { 4267 if ((note = ndb_get_note_by_key(txn, 4268 result->key.note_id, 4269 ¬e_size))) 4270 { 4271 if (!ndb_filter_matches(filter, note)) { 4272 break; 4273 } 4274 result->note = note; 4275 result->note_size = note_size; 4276 } 4277 } 4278 } 4279 4280 result->note = note; 4281 result->note_size = note_size; 4282 last_candidate = *result; 4283 last_result = &last_candidate; 4284 } 4285 4286 // we matched all of the queries! 4287 if (j == search_words.num_words) { 4288 results->num_results++; 4289 } 4290 4291 cont: 4292 ; 4293 } 4294 4295 done: 4296 mdb_cursor_close(cursor); 4297 4298 return 1; 4299 } 4300 4301 int ndb_text_search(struct ndb_txn *txn, const char *query, 4302 struct ndb_text_search_results *results, 4303 struct ndb_text_search_config *config) 4304 { 4305 return ndb_text_search_with(txn, query, results, config, NULL); 4306 } 4307 4308 static void ndb_write_blocks(struct ndb_txn *txn, uint64_t note_key, 4309 struct ndb_blocks *blocks) 4310 { 4311 int rc; 4312 MDB_val key, val; 4313 4314 // make sure we're not writing the owned flag to the db 4315 blocks->flags &= ~NDB_BLOCK_FLAG_OWNED; 4316 4317 key.mv_data = ¬e_key; 4318 key.mv_size = sizeof(note_key); 4319 val.mv_data = blocks; 4320 val.mv_size = ndb_blocks_total_size(blocks); 4321 assert((val.mv_size % 8) == 0); 4322 4323 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_BLOCKS], &key, &val, 0))) { 4324 ndb_debug("write version to note_blocks failed: %s\n", 4325 mdb_strerror(rc)); 4326 return; 4327 } 4328 } 4329 4330 static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note, 4331 uint64_t note_key, unsigned char *scratch, 4332 size_t scratch_size) 4333 { 4334 size_t content_len; 4335 const char *content; 4336 struct ndb_blocks *blocks; 4337 4338 content_len = ndb_note_content_length(note); 4339 content = ndb_note_content(note); 4340 4341 if (!ndb_parse_content(scratch, scratch_size, content, content_len, &blocks)) { 4342 //ndb_debug("failed to parse content '%.*s'\n", content_len, content); 4343 return 0; 4344 } 4345 4346 ndb_write_blocks(txn, note_key, blocks); 4347 return 1; 4348 } 4349 4350 static uint64_t ndb_write_note(struct ndb_txn *txn, 4351 struct ndb_writer_note *note, 4352 unsigned char *scratch, size_t scratch_size, 4353 uint32_t ndb_flags) 4354 { 4355 int rc; 4356 uint64_t note_key, kind; 4357 MDB_dbi note_db; 4358 MDB_val key, val; 4359 4360 kind = note->note->kind; 4361 4362 // let's quickly sanity check if we already have this note 4363 if (ndb_get_notekey_by_id(txn, note->note->id)) 4364 return 0; 4365 4366 // get dbs 4367 note_db = txn->lmdb->dbs[NDB_DB_NOTE]; 4368 4369 // get new key 4370 note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1; 4371 4372 // write note to event store 4373 key.mv_data = ¬e_key; 4374 key.mv_size = sizeof(note_key); 4375 val.mv_data = note->note; 4376 val.mv_size = note->note_len; 4377 4378 if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) { 4379 ndb_debug("write note to db failed: %s\n", mdb_strerror(rc)); 4380 return 0; 4381 } 4382 4383 ndb_write_note_id_index(txn, note->note, note_key); 4384 ndb_write_note_kind_index(txn, note->note, note_key); 4385 ndb_write_note_tag_index(txn, note->note, note_key); 4386 ndb_write_note_pubkey_index(txn, note->note, note_key); 4387 ndb_write_note_pubkey_kind_index(txn, note->note, note_key); 4388 4389 // only parse content and do fulltext index on text and longform notes 4390 if (kind == 1 || kind == 30023) { 4391 if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_FULLTEXT)) { 4392 if (!ndb_write_note_fulltext_index(txn, note->note, note_key)) 4393 return 0; 4394 } 4395 4396 // write note blocks 4397 if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_NOTE_BLOCKS)) { 4398 ndb_write_new_blocks(txn, note->note, note_key, scratch, scratch_size); 4399 } 4400 } else if (kind == 7 && !ndb_flag_set(ndb_flags, NDB_FLAG_NO_STATS)) { 4401 ndb_write_reaction_stats(txn, note->note); 4402 } 4403 4404 return note_key; 4405 } 4406 4407 static void ndb_monitor_lock(struct ndb_monitor *mon) { 4408 pthread_mutex_lock(&mon->mutex); 4409 } 4410 4411 static void ndb_monitor_unlock(struct ndb_monitor *mon) { 4412 pthread_mutex_unlock(&mon->mutex); 4413 } 4414 4415 struct written_note { 4416 uint64_t note_id; 4417 struct ndb_writer_note *note; 4418 }; 4419 4420 // When the data has been committed to the database, take all of the written 4421 // notes, check them against subscriptions, and then write to the subscription 4422 // inbox for all matching notes 4423 static void ndb_notify_subscriptions(struct ndb_monitor *monitor, 4424 struct written_note *wrote, int num_notes) 4425 { 4426 int i, k; 4427 int pushed; 4428 struct written_note *written; 4429 struct ndb_note *note; 4430 struct ndb_subscription *sub; 4431 4432 ndb_monitor_lock(monitor); 4433 4434 for (i = 0; i < monitor->num_subscriptions; i++) { 4435 sub = &monitor->subscriptions[i]; 4436 ndb_debug("checking subscription %d, %d notes\n", i, num_notes); 4437 4438 pushed = 0; 4439 for (k = 0; k < num_notes; k++) { 4440 written = &wrote[k]; 4441 note = written->note->note; 4442 4443 if (ndb_filter_group_matches(&sub->group, note)) { 4444 ndb_debug("pushing note\n"); 4445 4446 if (!prot_queue_push(&sub->inbox, &written->note_id)) { 4447 ndb_debug("couldn't push note to subscriber"); 4448 } else { 4449 pushed++; 4450 } 4451 } else { 4452 ndb_debug("not pushing note\n"); 4453 } 4454 } 4455 4456 // After pushing all of the matching notes, check to see if we 4457 // have a registered subscription callback. If so, we call it. 4458 // The callback needs to call ndb_poll_for_notes to pull data 4459 // that was just pushed to the queue in the for loop above. 4460 if (monitor->sub_cb != NULL && pushed > 0) { 4461 monitor->sub_cb(monitor->sub_cb_ctx, sub->subid); 4462 } 4463 } 4464 4465 ndb_monitor_unlock(monitor); 4466 } 4467 4468 uint64_t ndb_write_note_and_profile( 4469 struct ndb_txn *txn, 4470 struct ndb_writer_profile *profile, 4471 unsigned char *scratch, 4472 size_t scratch_size, 4473 uint32_t ndb_flags) 4474 { 4475 uint64_t note_nkey; 4476 4477 note_nkey = ndb_write_note(txn, &profile->note, scratch, scratch_size, ndb_flags); 4478 4479 if (profile->record.builder) { 4480 // only write if parsing didn't fail 4481 ndb_write_profile(txn, profile, note_nkey); 4482 } 4483 4484 return note_nkey; 4485 } 4486 4487 // only to be called from the writer thread 4488 static int ndb_write_version(struct ndb_txn *txn, uint64_t version) 4489 { 4490 int rc; 4491 MDB_val key, val; 4492 uint64_t version_key; 4493 4494 version_key = NDB_META_KEY_VERSION; 4495 4496 key.mv_data = &version_key; 4497 key.mv_size = sizeof(version_key); 4498 val.mv_data = &version; 4499 val.mv_size = sizeof(version); 4500 4501 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { 4502 ndb_debug("write version to ndb_meta failed: %s\n", 4503 mdb_strerror(rc)); 4504 return 0; 4505 } 4506 4507 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 4508 return 1; 4509 } 4510 4511 4512 static int ndb_run_migrations(struct ndb_txn *txn) 4513 { 4514 int64_t version, latest_version, i; 4515 4516 latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); 4517 4518 if ((version = ndb_db_version(txn)) == -1) { 4519 ndb_debug("run_migrations: no version found, assuming new db\n"); 4520 version = latest_version; 4521 4522 // no version found. fresh db? 4523 if (!ndb_write_version(txn, version)) { 4524 fprintf(stderr, "run_migrations: failed writing db version"); 4525 return 0; 4526 } 4527 4528 return 1; 4529 } else { 4530 ndb_debug("ndb: version %" PRIu64 " found\n", version); 4531 } 4532 4533 if (version < latest_version) 4534 fprintf(stderr, "nostrdb: migrating v%d -> v%d\n", 4535 (int)version, (int)latest_version); 4536 4537 for (i = version; i < latest_version; i++) { 4538 if (!MIGRATIONS[i].fn(txn)) { 4539 fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); 4540 return 0; 4541 } 4542 4543 if (!ndb_write_version(txn, i+1)) { 4544 fprintf(stderr, "run_migrations: failed writing db version"); 4545 return 0; 4546 } 4547 4548 version = i+1; 4549 } 4550 4551 return 1; 4552 } 4553 4554 4555 static void *ndb_writer_thread(void *data) 4556 { 4557 ndb_debug("started writer thread\n"); 4558 struct ndb_writer *writer = data; 4559 struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; 4560 struct written_note written_notes[THREAD_QUEUE_BATCH]; 4561 size_t scratch_size; 4562 int i, popped, done, needs_commit, num_notes; 4563 uint64_t note_nkey; 4564 struct ndb_txn txn; 4565 unsigned char *scratch; 4566 4567 // 8mb scratch buffer for parsing note content 4568 scratch_size = 8 * 1024 * 1024; 4569 scratch = malloc(scratch_size); 4570 MDB_txn *mdb_txn = NULL; 4571 ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn); 4572 4573 done = 0; 4574 while (!done) { 4575 txn.mdb_txn = NULL; 4576 num_notes = 0; 4577 ndb_debug("writer waiting for items\n"); 4578 popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); 4579 ndb_debug("writer popped %d items\n", popped); 4580 4581 needs_commit = 0; 4582 for (i = 0 ; i < popped; i++) { 4583 msg = &msgs[i]; 4584 switch (msg->type) { 4585 case NDB_WRITER_NOTE: needs_commit = 1; break; 4586 case NDB_WRITER_PROFILE: needs_commit = 1; break; 4587 case NDB_WRITER_DBMETA: needs_commit = 1; break; 4588 case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break; 4589 case NDB_WRITER_BLOCKS: needs_commit = 1; break; 4590 case NDB_WRITER_MIGRATE: needs_commit = 1; break; 4591 case NDB_WRITER_QUIT: break; 4592 } 4593 } 4594 4595 if (needs_commit && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn)) 4596 { 4597 fprintf(stderr, "writer thread txn_begin failed"); 4598 // should definitely not happen unless DB is full 4599 // or something ? 4600 continue; 4601 } 4602 4603 for (i = 0; i < popped; i++) { 4604 msg = &msgs[i]; 4605 4606 switch (msg->type) { 4607 case NDB_WRITER_QUIT: 4608 // quits are handled before this 4609 ndb_debug("writer thread got quit message\n"); 4610 done = 1; 4611 continue; 4612 case NDB_WRITER_PROFILE: 4613 note_nkey = 4614 ndb_write_note_and_profile( 4615 &txn, 4616 &msg->profile, 4617 scratch, 4618 scratch_size, 4619 writer->ndb_flags); 4620 4621 if (note_nkey > 0) { 4622 written_notes[num_notes++] = 4623 (struct written_note){ 4624 .note_id = note_nkey, 4625 .note = &msg->profile.note, 4626 }; 4627 } else { 4628 ndb_debug("failed to write note\n"); 4629 } 4630 break; 4631 case NDB_WRITER_NOTE: 4632 note_nkey = ndb_write_note(&txn, &msg->note, 4633 scratch, 4634 scratch_size, 4635 writer->ndb_flags); 4636 4637 if (note_nkey > 0) { 4638 written_notes[num_notes++] = (struct written_note){ 4639 .note_id = note_nkey, 4640 .note = &msg->note, 4641 }; 4642 } 4643 break; 4644 case NDB_WRITER_DBMETA: 4645 ndb_write_version(&txn, msg->ndb_meta.version); 4646 break; 4647 case NDB_WRITER_BLOCKS: 4648 ndb_write_blocks(&txn, msg->blocks.note_key, 4649 msg->blocks.blocks); 4650 break; 4651 case NDB_WRITER_MIGRATE: 4652 if (!ndb_run_migrations(&txn)) { 4653 mdb_txn_abort(txn.mdb_txn); 4654 goto bail; 4655 } 4656 break; 4657 case NDB_WRITER_PROFILE_LAST_FETCH: 4658 ndb_writer_last_profile_fetch(&txn, 4659 msg->last_fetch.pubkey, 4660 msg->last_fetch.fetched_at 4661 ); 4662 break; 4663 } 4664 } 4665 4666 // commit writes 4667 if (needs_commit) { 4668 if (!ndb_end_query(&txn)) { 4669 ndb_debug("writer thread txn commit failed\n"); 4670 } else { 4671 ndb_debug("notifying subscriptions, %d notes\n", num_notes); 4672 ndb_notify_subscriptions(writer->monitor, 4673 written_notes, 4674 num_notes); 4675 // update subscriptions 4676 } 4677 } 4678 4679 // free notes 4680 for (i = 0; i < popped; i++) { 4681 msg = &msgs[i]; 4682 if (msg->type == NDB_WRITER_NOTE) { 4683 free(msg->note.note); 4684 } else if (msg->type == NDB_WRITER_PROFILE) { 4685 free(msg->profile.note.note); 4686 //ndb_profile_record_builder_free(&msg->profile.record); 4687 } else if (msg->type == NDB_WRITER_BLOCKS) { 4688 ndb_blocks_free(msg->blocks.blocks); 4689 } 4690 } 4691 } 4692 4693 bail: 4694 free(scratch); 4695 ndb_debug("quitting writer thread\n"); 4696 return NULL; 4697 } 4698 4699 static void *ndb_ingester_thread(void *data) 4700 { 4701 secp256k1_context *ctx; 4702 struct thread *thread = data; 4703 struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx; 4704 struct ndb_lmdb *lmdb = ingester->lmdb; 4705 struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg; 4706 struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out; 4707 int i, to_write, popped, done, any_event; 4708 MDB_txn *read_txn = NULL; 4709 int rc; 4710 4711 ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); 4712 ndb_debug("started ingester thread\n"); 4713 4714 done = 0; 4715 while (!done) { 4716 to_write = 0; 4717 any_event = 0; 4718 4719 popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH); 4720 ndb_debug("ingester popped %d items\n", popped); 4721 4722 for (i = 0; i < popped; i++) { 4723 msg = &msgs[i]; 4724 if (msg->type == NDB_INGEST_EVENT) { 4725 any_event = 1; 4726 break; 4727 } 4728 } 4729 4730 if (any_event && (rc = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn))) { 4731 // this is bad 4732 fprintf(stderr, "UNUSUAL ndb_ingester: mdb_txn_begin failed: '%s'\n", 4733 mdb_strerror(rc)); 4734 continue; 4735 } 4736 4737 for (i = 0; i < popped; i++) { 4738 msg = &msgs[i]; 4739 switch (msg->type) { 4740 case NDB_INGEST_QUIT: 4741 done = 1; 4742 break; 4743 4744 case NDB_INGEST_EVENT: 4745 out = &outs[to_write]; 4746 if (ndb_ingester_process_event(ctx, ingester, 4747 &msg->event, out, 4748 read_txn)) { 4749 to_write++; 4750 } 4751 } 4752 } 4753 4754 if (any_event) 4755 mdb_txn_abort(read_txn); 4756 4757 if (to_write > 0) { 4758 ndb_debug("pushing %d events to write queue\n", to_write); 4759 if (!prot_queue_push_all(ingester->writer_inbox, outs, to_write)) { 4760 ndb_debug("failed pushing %d events to write queue\n", to_write); 4761 } 4762 } 4763 } 4764 4765 ndb_debug("quitting ingester thread\n"); 4766 secp256k1_context_destroy(ctx); 4767 return NULL; 4768 } 4769 4770 4771 static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb, 4772 struct ndb_monitor *monitor, uint32_t ndb_flags) 4773 { 4774 writer->lmdb = lmdb; 4775 writer->monitor = monitor; 4776 writer->ndb_flags = ndb_flags; 4777 writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE; 4778 writer->queue_buf = malloc(writer->queue_buflen); 4779 if (writer->queue_buf == NULL) { 4780 fprintf(stderr, "ndb: failed to allocate space for writer queue"); 4781 return 0; 4782 } 4783 4784 // init the writer queue. 4785 prot_queue_init(&writer->inbox, writer->queue_buf, 4786 writer->queue_buflen, sizeof(struct ndb_writer_msg)); 4787 4788 // spin up the writer thread 4789 if (THREAD_CREATE(writer->thread_id, ndb_writer_thread, writer)) 4790 { 4791 fprintf(stderr, "ndb writer thread failed to create\n"); 4792 return 0; 4793 } 4794 4795 return 1; 4796 } 4797 4798 // initialize the ingester queue and then spawn the thread 4799 static int ndb_ingester_init(struct ndb_ingester *ingester, 4800 struct ndb_lmdb *lmdb, 4801 struct prot_queue *writer_inbox, 4802 const struct ndb_config *config) 4803 { 4804 int elem_size, num_elems; 4805 static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT }; 4806 4807 // TODO: configurable queue sizes 4808 elem_size = sizeof(struct ndb_ingester_msg); 4809 num_elems = DEFAULT_QUEUE_SIZE; 4810 4811 ingester->writer_inbox = writer_inbox; 4812 ingester->lmdb = lmdb; 4813 ingester->flags = config->flags; 4814 ingester->filter = config->ingest_filter; 4815 ingester->filter_context = config->filter_context; 4816 4817 if (!threadpool_init(&ingester->tp, config->ingester_threads, 4818 elem_size, num_elems, &quit_msg, ingester, 4819 ndb_ingester_thread)) 4820 { 4821 fprintf(stderr, "ndb ingester threadpool failed to init\n"); 4822 return 0; 4823 } 4824 4825 return 1; 4826 } 4827 4828 static int ndb_writer_destroy(struct ndb_writer *writer) 4829 { 4830 struct ndb_writer_msg msg; 4831 4832 // kill thread 4833 msg.type = NDB_WRITER_QUIT; 4834 ndb_debug("writer: pushing quit message\n"); 4835 if (!prot_queue_push(&writer->inbox, &msg)) { 4836 // queue is too full to push quit message. just kill it. 4837 ndb_debug("writer: terminating thread\n"); 4838 THREAD_TERMINATE(writer->thread_id); 4839 } else { 4840 ndb_debug("writer: joining thread\n"); 4841 THREAD_FINISH(writer->thread_id); 4842 } 4843 4844 // cleanup 4845 ndb_debug("writer: cleaning up protected queue\n"); 4846 prot_queue_destroy(&writer->inbox); 4847 4848 free(writer->queue_buf); 4849 4850 return 1; 4851 } 4852 4853 static int ndb_ingester_destroy(struct ndb_ingester *ingester) 4854 { 4855 threadpool_destroy(&ingester->tp); 4856 return 1; 4857 } 4858 4859 static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize) 4860 { 4861 int rc; 4862 MDB_txn *txn; 4863 4864 if ((rc = mdb_env_create(&lmdb->env))) { 4865 fprintf(stderr, "mdb_env_create failed, error %d\n", rc); 4866 return 0; 4867 } 4868 4869 if ((rc = mdb_env_set_mapsize(lmdb->env, mapsize))) { 4870 fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc); 4871 return 0; 4872 } 4873 4874 if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) { 4875 fprintf(stderr, "mdb_env_set_maxdbs failed, error %d\n", rc); 4876 return 0; 4877 } 4878 4879 if ((rc = mdb_env_open(lmdb->env, filename, 0, 0664))) { 4880 fprintf(stderr, "mdb_env_open failed, error %d\n", rc); 4881 return 0; 4882 } 4883 4884 // Initialize DBs 4885 if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) { 4886 fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc); 4887 return 0; 4888 } 4889 4890 // note flatbuffer db 4891 if ((rc = mdb_dbi_open(txn, "note", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NOTE]))) { 4892 fprintf(stderr, "mdb_dbi_open event failed, error %d\n", rc); 4893 return 0; 4894 } 4895 4896 // note metadata db 4897 if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) { 4898 fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc); 4899 return 0; 4900 } 4901 4902 // profile flatbuffer db 4903 if ((rc = mdb_dbi_open(txn, "profile", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_PROFILE]))) { 4904 fprintf(stderr, "mdb_dbi_open profile failed, error %d\n", rc); 4905 return 0; 4906 } 4907 4908 // profile search db 4909 if ((rc = mdb_dbi_open(txn, "profile_search", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_SEARCH]))) { 4910 fprintf(stderr, "mdb_dbi_open profile_search failed, error %d\n", rc); 4911 return 0; 4912 } 4913 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], ndb_search_key_cmp); 4914 4915 // ndb metadata (db version, etc) 4916 if ((rc = mdb_dbi_open(txn, "ndb_meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NDB_META]))) { 4917 fprintf(stderr, "mdb_dbi_open ndb_meta failed, error %d\n", rc); 4918 return 0; 4919 } 4920 4921 // profile last fetches 4922 if ((rc = mdb_dbi_open(txn, "profile_last_fetch", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH]))) { 4923 fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); 4924 return 0; 4925 } 4926 4927 // id+ts index flags 4928 unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; 4929 4930 // index dbs 4931 if ((rc = mdb_dbi_open(txn, "note_id", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_ID]))) { 4932 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 4933 return 0; 4934 } 4935 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare); 4936 4937 if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) { 4938 fprintf(stderr, "mdb_dbi_open profile_pk failed: %s\n", mdb_strerror(rc)); 4939 return 0; 4940 } 4941 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare); 4942 4943 if ((rc = mdb_dbi_open(txn, "note_kind", 4944 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 4945 &lmdb->dbs[NDB_DB_NOTE_KIND]))) { 4946 fprintf(stderr, "mdb_dbi_open note_kind failed: %s\n", mdb_strerror(rc)); 4947 return 0; 4948 } 4949 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_ts_compare); 4950 4951 if ((rc = mdb_dbi_open(txn, "note_pubkey", 4952 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 4953 &lmdb->dbs[NDB_DB_NOTE_PUBKEY]))) { 4954 fprintf(stderr, "mdb_dbi_open note_pubkey failed: %s\n", mdb_strerror(rc)); 4955 return 0; 4956 } 4957 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_PUBKEY], ndb_tsid_compare); 4958 4959 if ((rc = mdb_dbi_open(txn, "note_pubkey_kind", 4960 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 4961 &lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND]))) { 4962 fprintf(stderr, "mdb_dbi_open note_pubkey_kind failed: %s\n", mdb_strerror(rc)); 4963 return 0; 4964 } 4965 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND], ndb_id_u64_ts_compare); 4966 4967 if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT, 4968 &lmdb->dbs[NDB_DB_NOTE_TEXT]))) { 4969 fprintf(stderr, "mdb_dbi_open note_text failed: %s\n", mdb_strerror(rc)); 4970 return 0; 4971 } 4972 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare); 4973 4974 if ((rc = mdb_dbi_open(txn, "note_blocks", MDB_CREATE | MDB_INTEGERKEY, 4975 &lmdb->dbs[NDB_DB_NOTE_BLOCKS]))) { 4976 fprintf(stderr, "mdb_dbi_open note_blocks failed: %s\n", mdb_strerror(rc)); 4977 return 0; 4978 } 4979 4980 if ((rc = mdb_dbi_open(txn, "note_tags", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, 4981 &lmdb->dbs[NDB_DB_NOTE_TAGS]))) { 4982 fprintf(stderr, "mdb_dbi_open note_tags failed: %s\n", mdb_strerror(rc)); 4983 return 0; 4984 } 4985 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TAGS], ndb_tag_key_compare); 4986 4987 // Commit the transaction 4988 if ((rc = mdb_txn_commit(txn))) { 4989 fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc); 4990 return 0; 4991 } 4992 4993 return 1; 4994 } 4995 4996 static int ndb_queue_write_version(struct ndb *ndb, uint64_t version) 4997 { 4998 struct ndb_writer_msg msg; 4999 msg.type = NDB_WRITER_DBMETA; 5000 msg.ndb_meta.version = version; 5001 return ndb_writer_queue_msg(&ndb->writer, &msg); 5002 } 5003 5004 static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb, 5005 void *sub_cb_ctx) 5006 { 5007 monitor->num_subscriptions = 0; 5008 monitor->sub_cb = cb; 5009 monitor->sub_cb_ctx = sub_cb_ctx; 5010 pthread_mutex_init(&monitor->mutex, NULL); 5011 } 5012 5013 void ndb_filter_group_destroy(struct ndb_filter_group *group) 5014 { 5015 struct ndb_filter *filter; 5016 int i; 5017 for (i = 0; i < group->num_filters; i++) { 5018 filter = &group->filters[i]; 5019 ndb_filter_destroy(filter); 5020 } 5021 } 5022 5023 static void ndb_subscription_destroy(struct ndb_subscription *sub) 5024 { 5025 ndb_filter_group_destroy(&sub->group); 5026 // this was malloc'd inside ndb_subscribe 5027 free(sub->inbox.buf); 5028 prot_queue_destroy(&sub->inbox); 5029 sub->subid = 0; 5030 } 5031 5032 static void ndb_monitor_destroy(struct ndb_monitor *monitor) 5033 { 5034 int i; 5035 5036 ndb_monitor_lock(monitor); 5037 5038 for (i = 0; i < monitor->num_subscriptions; i++) { 5039 ndb_subscription_destroy(&monitor->subscriptions[i]); 5040 } 5041 5042 monitor->num_subscriptions = 0; 5043 5044 ndb_monitor_unlock(monitor); 5045 5046 pthread_mutex_destroy(&monitor->mutex); 5047 } 5048 5049 int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config) 5050 { 5051 struct ndb *ndb; 5052 //MDB_dbi ind_id; // TODO: ind_pk, etc 5053 5054 ndb = *pndb = calloc(1, sizeof(struct ndb)); 5055 ndb->flags = config->flags; 5056 5057 if (ndb == NULL) { 5058 fprintf(stderr, "ndb_init: malloc failed\n"); 5059 return 0; 5060 } 5061 5062 if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize)) 5063 return 0; 5064 5065 ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx); 5066 5067 if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor, ndb->flags)) { 5068 fprintf(stderr, "ndb_writer_init failed\n"); 5069 return 0; 5070 } 5071 5072 if (!ndb_ingester_init(&ndb->ingester, &ndb->lmdb, &ndb->writer.inbox, config)) { 5073 fprintf(stderr, "failed to initialize %d ingester thread(s)\n", 5074 config->ingester_threads); 5075 return 0; 5076 } 5077 5078 if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE)) { 5079 struct ndb_writer_msg msg = { .type = NDB_WRITER_MIGRATE }; 5080 ndb_writer_queue_msg(&ndb->writer, &msg); 5081 } 5082 5083 // Initialize LMDB environment and spin up threads 5084 return 1; 5085 } 5086 5087 void ndb_destroy(struct ndb *ndb) 5088 { 5089 if (ndb == NULL) 5090 return; 5091 5092 // ingester depends on writer and must be destroyed first 5093 ndb_debug("destroying ingester\n"); 5094 ndb_ingester_destroy(&ndb->ingester); 5095 ndb_debug("destroying writer\n"); 5096 ndb_writer_destroy(&ndb->writer); 5097 ndb_debug("destroying monitor\n"); 5098 ndb_monitor_destroy(&ndb->monitor); 5099 5100 ndb_debug("closing env\n"); 5101 mdb_env_close(ndb->lmdb.env); 5102 5103 ndb_debug("ndb destroyed\n"); 5104 free(ndb); 5105 } 5106 5107 // Process a nostr event from a client 5108 // 5109 // ie: ["EVENT", {"content":"..."} ...] 5110 // 5111 // The client-sent variation of ndb_process_event 5112 int ndb_process_client_event(struct ndb *ndb, const char *json, int len) 5113 { 5114 return ndb_ingest_event(&ndb->ingester, json, len, 1); 5115 } 5116 5117 // Process anostr event from a relay, 5118 // 5119 // ie: ["EVENT", "subid", {"content":"..."}...] 5120 // 5121 // This function returns as soon as possible, first copying the passed 5122 // json and then queueing it up for processing. Worker threads then take 5123 // the json and process it. 5124 // 5125 // Processing: 5126 // 5127 // 1. The event is parsed into ndb_notes and the signature is validated 5128 // 2. A quick lookup is made on the database to see if we already have 5129 // the note id, if we do we don't need to waste time on json parsing 5130 // or note validation. 5131 // 3. Once validation is done we pass it to the writer queue for writing 5132 // to LMDB. 5133 // 5134 int ndb_process_event(struct ndb *ndb, const char *json, int json_len) 5135 { 5136 return ndb_ingest_event(&ndb->ingester, json, json_len, 0); 5137 } 5138 5139 5140 int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client) 5141 { 5142 const char *start, *end, *very_end; 5143 start = ldjson; 5144 end = start + json_len; 5145 very_end = ldjson + json_len; 5146 int (* process)(struct ndb *, const char *, int); 5147 #if DEBUG 5148 int processed = 0; 5149 #endif 5150 process = client ? ndb_process_client_event : ndb_process_event; 5151 5152 while ((end = fast_strchr(start, '\n', very_end - start))) { 5153 //printf("processing '%.*s'\n", (int)(end-start), start); 5154 if (!process(ndb, start, end - start)) { 5155 ndb_debug("ndb_process_client_event failed\n"); 5156 return 0; 5157 } 5158 start = end + 1; 5159 #if DEBUG 5160 processed++; 5161 #endif 5162 } 5163 5164 #if DEBUG 5165 ndb_debug("ndb_process_events: processed %d events\n", processed); 5166 #endif 5167 5168 return 1; 5169 } 5170 5171 #ifndef _WIN32 5172 // TODO: windows 5173 int ndb_process_events_stream(struct ndb *ndb, FILE* fp) 5174 { 5175 char *line = NULL; 5176 size_t len = 0; 5177 ssize_t nread; 5178 5179 while ((nread = getline(&line, &len, fp)) != -1) { 5180 if (line == NULL) 5181 break; 5182 ndb_process_event(ndb, line, len); 5183 } 5184 5185 if (line) 5186 free(line); 5187 5188 return 1; 5189 } 5190 #endif 5191 5192 int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len) 5193 { 5194 return _ndb_process_events(ndb, ldjson, json_len, 1); 5195 } 5196 5197 int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len) 5198 { 5199 return _ndb_process_events(ndb, ldjson, json_len, 0); 5200 } 5201 5202 static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag) 5203 { 5204 return cursor_push_u16(cur, tag->count); 5205 } 5206 5207 int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf, 5208 size_t bufsize) 5209 { 5210 struct ndb_note *note; 5211 int half, size, str_indices_size; 5212 5213 // come on bruh 5214 if (bufsize < sizeof(struct ndb_note) * 2) 5215 return 0; 5216 5217 str_indices_size = bufsize / 32; 5218 size = bufsize - str_indices_size; 5219 half = size / 2; 5220 5221 //debug("size %d half %d str_indices %d\n", size, half, str_indices_size); 5222 5223 // make a safe cursor of our available memory 5224 make_cursor(buf, buf + bufsize, &builder->mem); 5225 5226 note = builder->note = (struct ndb_note *)buf; 5227 5228 // take slices of the memory into subcursors 5229 if (!(cursor_slice(&builder->mem, &builder->note_cur, half) && 5230 cursor_slice(&builder->mem, &builder->strings, half) && 5231 cursor_slice(&builder->mem, &builder->str_indices, str_indices_size))) { 5232 return 0; 5233 } 5234 5235 memset(note, 0, sizeof(*note)); 5236 builder->note_cur.p += sizeof(*note); 5237 5238 note->strings = builder->strings.start - buf; 5239 note->version = 1; 5240 5241 return 1; 5242 } 5243 5244 5245 5246 static inline int ndb_json_parser_init(struct ndb_json_parser *p, 5247 const char *json, int json_len, 5248 unsigned char *buf, int bufsize) 5249 { 5250 int half = bufsize / 2; 5251 5252 unsigned char *tok_start = buf + half; 5253 unsigned char *tok_end = buf + bufsize; 5254 5255 p->toks = (jsmntok_t*)tok_start; 5256 p->toks_end = (jsmntok_t*)tok_end; 5257 p->num_tokens = 0; 5258 p->json = json; 5259 p->json_len = json_len; 5260 5261 // ndb_builder gets the first half of the buffer, and jsmn gets the 5262 // second half. I like this way of alloating memory (without actually 5263 // dynamically allocating memory). You get one big chunk upfront and 5264 // then submodules can recursively subdivide it. Maybe you could do 5265 // something even more clever like golden-ratio style subdivision where 5266 // the more important stuff gets a larger chunk and then it spirals 5267 // downward into smaller chunks. Thanks for coming to my TED talk. 5268 5269 if (!ndb_builder_init(&p->builder, buf, half)) 5270 return 0; 5271 5272 jsmn_init(&p->json_parser); 5273 5274 return 1; 5275 } 5276 5277 static inline int ndb_json_parser_parse(struct ndb_json_parser *p, 5278 struct ndb_id_cb *cb) 5279 { 5280 jsmntok_t *tok; 5281 int cap = ((unsigned char *)p->toks_end - (unsigned char*)p->toks)/sizeof(*p->toks); 5282 int res = 5283 jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap, cb != NULL); 5284 5285 // got an ID! 5286 if (res == -42) { 5287 tok = &p->toks[p->json_parser.toknext-1]; 5288 5289 switch (cb->fn(cb->data, p->json + tok->start)) { 5290 case NDB_IDRES_CONT: 5291 res = jsmn_parse(&p->json_parser, p->json, p->json_len, 5292 p->toks, cap, 0); 5293 break; 5294 case NDB_IDRES_STOP: 5295 return -42; 5296 } 5297 } else if (res == 0) { 5298 return 0; 5299 } 5300 5301 p->num_tokens = res; 5302 p->i = 0; 5303 5304 return 1; 5305 } 5306 5307 static inline int toksize(jsmntok_t *tok) 5308 { 5309 return tok->end - tok->start; 5310 } 5311 5312 5313 5314 static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2) 5315 { 5316 switch (c2) { 5317 case 't': return cursor_push_byte(cur, '\t'); 5318 case 'n': return cursor_push_byte(cur, '\n'); 5319 case 'r': return cursor_push_byte(cur, '\r'); 5320 case 'b': return cursor_push_byte(cur, '\b'); 5321 case 'f': return cursor_push_byte(cur, '\f'); 5322 case '\\': return cursor_push_byte(cur, '\\'); 5323 case '/': return cursor_push_byte(cur, '/'); 5324 case '"': return cursor_push_byte(cur, '"'); 5325 case 'u': 5326 // these aren't handled yet 5327 return 0; 5328 default: 5329 return cursor_push_byte(cur, c1) && cursor_push_byte(cur, c2); 5330 } 5331 } 5332 5333 static int cursor_push_escaped_char(struct cursor *cur, char c) 5334 { 5335 switch (c) { 5336 case '"': return cursor_push_str(cur, "\\\""); 5337 case '\\': return cursor_push_str(cur, "\\\\"); 5338 case '\b': return cursor_push_str(cur, "\\b"); 5339 case '\f': return cursor_push_str(cur, "\\f"); 5340 case '\n': return cursor_push_str(cur, "\\n"); 5341 case '\r': return cursor_push_str(cur, "\\r"); 5342 case '\t': return cursor_push_str(cur, "\\t"); 5343 // TODO: \u hex hex hex hex 5344 } 5345 return cursor_push_byte(cur, c); 5346 } 5347 5348 static int cursor_push_hex_str(struct cursor *cur, unsigned char *buf, int len) 5349 { 5350 int i; 5351 5352 if (len % 2 != 0) 5353 return 0; 5354 5355 if (!cursor_push_byte(cur, '"')) 5356 return 0; 5357 5358 for (i = 0; i < len; i++) { 5359 unsigned int c = ((const unsigned char *)buf)[i]; 5360 if (!cursor_push_byte(cur, hexchar(c >> 4))) 5361 return 0; 5362 if (!cursor_push_byte(cur, hexchar(c & 0xF))) 5363 return 0; 5364 } 5365 5366 if (!cursor_push_byte(cur, '"')) 5367 return 0; 5368 5369 return 1; 5370 } 5371 5372 static int cursor_push_jsonstr(struct cursor *cur, const char *str) 5373 { 5374 int i; 5375 int len; 5376 5377 len = strlen(str); 5378 5379 if (!cursor_push_byte(cur, '"')) 5380 return 0; 5381 5382 for (i = 0; i < len; i++) { 5383 if (!cursor_push_escaped_char(cur, str[i])) 5384 return 0; 5385 } 5386 5387 if (!cursor_push_byte(cur, '"')) 5388 return 0; 5389 5390 return 1; 5391 } 5392 5393 5394 static inline int cursor_push_json_tag_str(struct cursor *cur, struct ndb_str str) 5395 { 5396 if (str.flag == NDB_PACKED_ID) 5397 return cursor_push_hex_str(cur, str.id, 32); 5398 5399 return cursor_push_jsonstr(cur, str.str); 5400 } 5401 5402 static int cursor_push_json_tag(struct cursor *cur, struct ndb_note *note, 5403 struct ndb_tag *tag) 5404 { 5405 int i; 5406 5407 if (!cursor_push_byte(cur, '[')) 5408 return 0; 5409 5410 for (i = 0; i < tag->count; i++) { 5411 if (!cursor_push_json_tag_str(cur, ndb_tag_str(note, tag, i))) 5412 return 0; 5413 if (i != tag->count-1 && !cursor_push_byte(cur, ',')) 5414 return 0; 5415 } 5416 5417 return cursor_push_byte(cur, ']'); 5418 } 5419 5420 static int cursor_push_json_tags(struct cursor *cur, struct ndb_note *note) 5421 { 5422 int i; 5423 struct ndb_iterator iter, *it = &iter; 5424 ndb_tags_iterate_start(note, it); 5425 5426 if (!cursor_push_byte(cur, '[')) 5427 return 0; 5428 5429 i = 0; 5430 while (ndb_tags_iterate_next(it)) { 5431 if (!cursor_push_json_tag(cur, note, it->tag)) 5432 return 0; 5433 if (i != note->tags.count-1 && !cursor_push_str(cur, ",")) 5434 return 0; 5435 i++; 5436 } 5437 5438 if (!cursor_push_byte(cur, ']')) 5439 return 0; 5440 5441 return 1; 5442 } 5443 5444 static int ndb_event_commitment(struct ndb_note *ev, unsigned char *buf, int buflen) 5445 { 5446 char timebuf[16] = {0}; 5447 char kindbuf[16] = {0}; 5448 char pubkey[65]; 5449 struct cursor cur; 5450 int ok; 5451 5452 if (!hex_encode(ev->pubkey, sizeof(ev->pubkey), pubkey)) 5453 return 0; 5454 5455 make_cursor(buf, buf + buflen, &cur); 5456 5457 // TODO: update in 2106 ... 5458 snprintf(timebuf, sizeof(timebuf), "%d", (uint32_t)ev->created_at); 5459 snprintf(kindbuf, sizeof(kindbuf), "%d", ev->kind); 5460 5461 ok = 5462 cursor_push_str(&cur, "[0,\"") && 5463 cursor_push_str(&cur, pubkey) && 5464 cursor_push_str(&cur, "\",") && 5465 cursor_push_str(&cur, timebuf) && 5466 cursor_push_str(&cur, ",") && 5467 cursor_push_str(&cur, kindbuf) && 5468 cursor_push_str(&cur, ",") && 5469 cursor_push_json_tags(&cur, ev) && 5470 cursor_push_str(&cur, ",") && 5471 cursor_push_jsonstr(&cur, ndb_note_str(ev, &ev->content).str) && 5472 cursor_push_str(&cur, "]"); 5473 5474 if (!ok) 5475 return 0; 5476 5477 return cur.p - cur.start; 5478 } 5479 5480 static int cursor_push_hex(struct cursor *c, unsigned char *bytes, int len) 5481 { 5482 int i; 5483 unsigned char chr; 5484 if (c->p + (len * 2) >= c->end) 5485 return 0; 5486 5487 for (i = 0; i < len; i++) { 5488 chr = bytes[i]; 5489 5490 *(c->p++) = hexchar(chr >> 4); 5491 *(c->p++) = hexchar(chr & 0xF); 5492 } 5493 5494 return 1; 5495 } 5496 5497 static int cursor_push_int_str(struct cursor *c, uint64_t num) 5498 { 5499 char timebuf[16] = {0}; 5500 snprintf(timebuf, sizeof(timebuf), "%" PRIu64, num); 5501 return cursor_push_str(c, timebuf); 5502 } 5503 5504 int ndb_note_json(struct ndb_note *note, char *buf, int buflen) 5505 { 5506 struct cursor cur, *c = &cur; 5507 5508 make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, &cur); 5509 5510 int ok = cursor_push_str(c, "{\"id\":\"") && 5511 cursor_push_hex(c, ndb_note_id(note), 32) && 5512 cursor_push_str(c, "\",\"pubkey\":\"") && 5513 cursor_push_hex(c, ndb_note_pubkey(note), 32) && 5514 cursor_push_str(c, "\",\"created_at\":") && 5515 cursor_push_int_str(c, ndb_note_created_at(note)) && 5516 cursor_push_str(c, ",\"kind\":") && 5517 cursor_push_int_str(c, ndb_note_kind(note)) && 5518 cursor_push_str(c, ",\"tags\":") && 5519 cursor_push_json_tags(c, note) && 5520 cursor_push_str(c, ",\"content\":") && 5521 cursor_push_jsonstr(c, ndb_note_content(note)) && 5522 cursor_push_str(c, ",\"sig\":\"") && 5523 cursor_push_hex(c, ndb_note_sig(note), 64) && 5524 cursor_push_c_str(c, "\"}"); 5525 5526 if (!ok) { 5527 return 0; 5528 } 5529 5530 return cur.p - cur.start; 5531 } 5532 5533 static int cursor_push_json_elem_array(struct cursor *cur, 5534 const struct ndb_filter *filter, 5535 struct ndb_filter_elements *elems) 5536 { 5537 int i; 5538 unsigned char *id; 5539 const char *str; 5540 uint64_t val; 5541 5542 if (!cursor_push_byte(cur, '[')) 5543 return 0; 5544 5545 for (i = 0; i < elems->count; i++) { 5546 5547 switch (elems->field.elem_type) { 5548 case NDB_ELEMENT_STRING: 5549 str = ndb_filter_get_string_element(filter, elems, i); 5550 if (!cursor_push_jsonstr(cur, str)) 5551 return 0; 5552 break; 5553 case NDB_ELEMENT_ID: 5554 id = ndb_filter_get_id_element(filter, elems, i); 5555 if (!cursor_push_hex_str(cur, id, 32)) 5556 return 0; 5557 break; 5558 case NDB_ELEMENT_INT: 5559 val = ndb_filter_get_int_element(elems, i); 5560 if (!cursor_push_int_str(cur, val)) 5561 return 0; 5562 break; 5563 case NDB_ELEMENT_UNKNOWN: 5564 ndb_debug("unknown element in cursor_push_json_elem_array"); 5565 return 0; 5566 } 5567 5568 if (i != elems->count-1) { 5569 if (!cursor_push_byte(cur, ',')) 5570 return 0; 5571 } 5572 } 5573 5574 if (!cursor_push_str(cur, "]")) 5575 return 0; 5576 5577 return 1; 5578 } 5579 5580 int ndb_filter_json(const struct ndb_filter *filter, char *buf, int buflen) 5581 { 5582 const char *str; 5583 struct cursor cur, *c = &cur; 5584 struct ndb_filter_elements *elems; 5585 int i; 5586 5587 if (!filter->finalized) { 5588 ndb_debug("filter not finalized in ndb_filter_json\n"); 5589 return 0; 5590 } 5591 5592 make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, c); 5593 5594 if (!cursor_push_str(c, "{")) 5595 return 0; 5596 5597 for (i = 0; i < filter->num_elements; i++) { 5598 elems = ndb_filter_get_elements(filter, i); 5599 switch (elems->field.type) { 5600 case NDB_FILTER_IDS: 5601 if (!cursor_push_str(c, "\"ids\":")) 5602 return 0; 5603 if (!cursor_push_json_elem_array(c, filter, elems)) 5604 return 0; 5605 break; 5606 case NDB_FILTER_SEARCH: 5607 if (!cursor_push_str(c, "\"search\":")) 5608 return 0; 5609 if (!(str = ndb_filter_get_string_element(filter, elems, 0))) 5610 return 0; 5611 if (!cursor_push_jsonstr(c, str)) 5612 return 0; 5613 break; 5614 case NDB_FILTER_AUTHORS: 5615 if (!cursor_push_str(c, "\"authors\":")) 5616 return 0; 5617 if (!cursor_push_json_elem_array(c, filter, elems)) 5618 return 0; 5619 break; 5620 case NDB_FILTER_KINDS: 5621 if (!cursor_push_str(c, "\"kinds\":")) 5622 return 0; 5623 if (!cursor_push_json_elem_array(c, filter, elems)) 5624 return 0; 5625 break; 5626 case NDB_FILTER_TAGS: 5627 if (!cursor_push_str(c, "\"#")) 5628 return 0; 5629 if (!cursor_push_byte(c, elems->field.tag)) 5630 return 0; 5631 if (!cursor_push_str(c, "\":")) 5632 return 0; 5633 if (!cursor_push_json_elem_array(c, filter, elems)) 5634 return 0; 5635 break; 5636 case NDB_FILTER_SINCE: 5637 if (!cursor_push_str(c, "\"since\":")) 5638 return 0; 5639 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 5640 return 0; 5641 break; 5642 case NDB_FILTER_UNTIL: 5643 if (!cursor_push_str(c, "\"until\":")) 5644 return 0; 5645 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 5646 return 0; 5647 break; 5648 case NDB_FILTER_LIMIT: 5649 if (!cursor_push_str(c, "\"limit\":")) 5650 return 0; 5651 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 5652 return 0; 5653 break; 5654 } 5655 5656 if (i != filter->num_elements-1) { 5657 if (!cursor_push_byte(c, ',')) { 5658 return 0; 5659 } 5660 } 5661 5662 } 5663 5664 if (!cursor_push_c_str(c, "}")) 5665 return 0; 5666 5667 return cur.p - cur.start; 5668 } 5669 5670 int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen) { 5671 int len; 5672 5673 if (!(len = ndb_event_commitment(note, buf, buflen))) 5674 return 0; 5675 5676 //fprintf(stderr, "%.*s\n", len, buf); 5677 5678 sha256((struct sha256*)note->id, buf, len); 5679 5680 return 1; 5681 } 5682 5683 int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32], 5684 unsigned char sig[64]) 5685 { 5686 unsigned char aux[32]; 5687 secp256k1_keypair *pair = (secp256k1_keypair*) keypair->pair; 5688 5689 if (!fill_random(aux, sizeof(aux))) 5690 return 0; 5691 5692 secp256k1_context *ctx = 5693 secp256k1_context_create(SECP256K1_CONTEXT_NONE); 5694 5695 return secp256k1_schnorrsig_sign32(ctx, sig, id, pair, aux); 5696 } 5697 5698 int ndb_create_keypair(struct ndb_keypair *kp) 5699 { 5700 secp256k1_keypair *keypair = (secp256k1_keypair*)kp->pair; 5701 secp256k1_xonly_pubkey pubkey; 5702 5703 secp256k1_context *ctx = 5704 secp256k1_context_create(SECP256K1_CONTEXT_NONE);; 5705 5706 /* Try to create a keypair with a valid context, it should only 5707 * fail if the secret key is zero or out of range. */ 5708 if (!secp256k1_keypair_create(ctx, keypair, kp->secret)) 5709 return 0; 5710 5711 if (!secp256k1_keypair_xonly_pub(ctx, &pubkey, NULL, keypair)) 5712 return 0; 5713 5714 /* Serialize the public key. Should always return 1 for a valid public key. */ 5715 return secp256k1_xonly_pubkey_serialize(ctx, kp->pubkey, &pubkey); 5716 } 5717 5718 int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair) 5719 { 5720 if (!hex_decode(secstr, strlen(secstr), keypair->secret, 32)) { 5721 fprintf(stderr, "could not hex decode secret key\n"); 5722 return 0; 5723 } 5724 5725 return ndb_create_keypair(keypair); 5726 } 5727 5728 int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note, 5729 struct ndb_keypair *keypair) 5730 { 5731 int strings_len = builder->strings.p - builder->strings.start; 5732 unsigned char *note_end = builder->note_cur.p + strings_len; 5733 int total_size = note_end - builder->note_cur.start; 5734 5735 // move the strings buffer next to the end of our ndb_note 5736 memmove(builder->note_cur.p, builder->strings.start, strings_len); 5737 5738 // set the strings location 5739 builder->note->strings = builder->note_cur.p - builder->note_cur.start; 5740 5741 // record the total size 5742 //builder->note->size = total_size; 5743 5744 *note = builder->note; 5745 5746 // generate id and sign if we're building this manually 5747 if (keypair) { 5748 // use the remaining memory for building our id buffer 5749 unsigned char *end = builder->mem.end; 5750 unsigned char *start = (unsigned char*)(*note) + total_size; 5751 5752 ndb_builder_set_pubkey(builder, keypair->pubkey); 5753 5754 if (!ndb_calculate_id(builder->note, start, end - start)) 5755 return 0; 5756 5757 if (!ndb_sign_id(keypair, (*note)->id, (*note)->sig)) 5758 return 0; 5759 } 5760 5761 // make sure we're aligned as a whole 5762 total_size = (total_size + 7) & ~7; 5763 assert((total_size % 8) == 0); 5764 return total_size; 5765 } 5766 5767 struct ndb_note * ndb_builder_note(struct ndb_builder *builder) 5768 { 5769 return builder->note; 5770 } 5771 5772 static union ndb_packed_str ndb_offset_str(uint32_t offset) 5773 { 5774 // ensure accidents like -1 don't corrupt our packed_str 5775 union ndb_packed_str str; 5776 // most significant byte is reserved for ndb_packtype 5777 str.offset = offset & 0xFFFFFF; 5778 return str; 5779 } 5780 5781 5782 /// find an existing string via str_indices. these indices only exist in the 5783 /// builder phase just for this purpose. 5784 static inline int ndb_builder_find_str(struct ndb_builder *builder, 5785 const char *str, int len, 5786 union ndb_packed_str *pstr) 5787 { 5788 // find existing matching string to avoid duplicate strings 5789 int indices = cursor_count(&builder->str_indices, sizeof(uint32_t)); 5790 for (int i = 0; i < indices; i++) { 5791 uint32_t index = ((uint32_t*)builder->str_indices.start)[i]; 5792 const char *some_str = (const char*)builder->strings.start + index; 5793 5794 if (!memcmp(some_str, str, len) && some_str[len] == '\0') { 5795 // found an existing matching str, use that index 5796 *pstr = ndb_offset_str(index); 5797 return 1; 5798 } 5799 } 5800 5801 return 0; 5802 } 5803 5804 static int ndb_builder_push_str(struct ndb_builder *builder, const char *str, 5805 int len, union ndb_packed_str *pstr) 5806 { 5807 uint32_t loc; 5808 5809 // no string found, push a new one 5810 loc = builder->strings.p - builder->strings.start; 5811 if (!(cursor_push(&builder->strings, (unsigned char*)str, len) && 5812 cursor_push_byte(&builder->strings, '\0'))) { 5813 return 0; 5814 } 5815 5816 *pstr = ndb_offset_str(loc); 5817 5818 // record in builder indices. ignore return value, if we can't cache it 5819 // then whatever 5820 cursor_push_u32(&builder->str_indices, loc); 5821 5822 return 1; 5823 } 5824 5825 static int ndb_builder_push_packed_id(struct ndb_builder *builder, 5826 unsigned char *id, 5827 union ndb_packed_str *pstr) 5828 { 5829 // Don't both find id duplicates. very rarely are they duplicated 5830 // and it slows things down quite a bit. If we really care about this 5831 // We can switch to a hash table. 5832 //if (ndb_builder_find_str(builder, (const char*)id, 32, pstr)) { 5833 // pstr->packed.flag = NDB_PACKED_ID; 5834 // return 1; 5835 //} 5836 5837 if (ndb_builder_push_str(builder, (const char*)id, 32, pstr)) { 5838 pstr->packed.flag = NDB_PACKED_ID; 5839 return 1; 5840 } 5841 5842 return 0; 5843 } 5844 5845 union ndb_packed_str ndb_chars_to_packed_str(char c1, char c2) 5846 { 5847 union ndb_packed_str str; 5848 str.packed.flag = NDB_PACKED_STR; 5849 str.packed.str[0] = c1; 5850 str.packed.str[1] = c2; 5851 str.packed.str[2] = '\0'; 5852 return str; 5853 } 5854 5855 static union ndb_packed_str ndb_char_to_packed_str(char c) 5856 { 5857 union ndb_packed_str str; 5858 str.packed.flag = NDB_PACKED_STR; 5859 str.packed.str[0] = c; 5860 str.packed.str[1] = '\0'; 5861 return str; 5862 } 5863 5864 5865 /// Check for small strings to pack 5866 static inline int ndb_builder_try_compact_str(struct ndb_builder *builder, 5867 const char *str, int len, 5868 union ndb_packed_str *pstr, 5869 int pack_ids) 5870 { 5871 unsigned char id_buf[32]; 5872 5873 if (len == 0) { 5874 *pstr = ndb_char_to_packed_str(0); 5875 return 1; 5876 } else if (len == 1) { 5877 *pstr = ndb_char_to_packed_str(str[0]); 5878 return 1; 5879 } else if (len == 2) { 5880 *pstr = ndb_chars_to_packed_str(str[0], str[1]); 5881 return 1; 5882 } else if (pack_ids && len == 64 && hex_decode(str, 64, id_buf, 32)) { 5883 return ndb_builder_push_packed_id(builder, id_buf, pstr); 5884 } 5885 5886 return 0; 5887 } 5888 5889 5890 static int ndb_builder_push_unpacked_str(struct ndb_builder *builder, 5891 const char *str, int len, 5892 union ndb_packed_str *pstr) 5893 { 5894 if (ndb_builder_find_str(builder, str, len, pstr)) 5895 return 1; 5896 5897 return ndb_builder_push_str(builder, str, len, pstr); 5898 } 5899 5900 int ndb_builder_make_str(struct ndb_builder *builder, const char *str, int len, 5901 union ndb_packed_str *pstr, int pack_ids) 5902 { 5903 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 5904 return 1; 5905 5906 return ndb_builder_push_unpacked_str(builder, str, len, pstr); 5907 } 5908 5909 int ndb_builder_set_content(struct ndb_builder *builder, const char *content, 5910 int len) 5911 { 5912 int pack_ids = 0; 5913 builder->note->content_length = len; 5914 return ndb_builder_make_str(builder, content, len, 5915 &builder->note->content, pack_ids); 5916 } 5917 5918 5919 static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len, 5920 const char *s) 5921 { 5922 if (tok->type == JSMN_STRING && (int)strlen(s) == tok_len && 5923 memcmp(json + tok->start, s, tok_len) == 0) { 5924 return 1; 5925 } 5926 return 0; 5927 } 5928 5929 static int ndb_builder_finalize_tag(struct ndb_builder *builder, 5930 union ndb_packed_str offset) 5931 { 5932 if (!cursor_push_u32(&builder->note_cur, offset.offset)) 5933 return 0; 5934 builder->current_tag->count++; 5935 return 1; 5936 } 5937 5938 /// Unescape and push json strings 5939 static int ndb_builder_make_json_str(struct ndb_builder *builder, 5940 const char *str, int len, 5941 union ndb_packed_str *pstr, 5942 int *written, int pack_ids) 5943 { 5944 // let's not care about de-duping these. we should just unescape 5945 // in-place directly into the strings table. 5946 if (written) 5947 *written = len; 5948 5949 const char *p, *end, *start; 5950 unsigned char *builder_start; 5951 5952 // always try compact strings first 5953 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 5954 return 1; 5955 5956 end = str + len; 5957 start = str; // Initialize start to the beginning of the string 5958 5959 *pstr = ndb_offset_str(builder->strings.p - builder->strings.start); 5960 builder_start = builder->strings.p; 5961 5962 for (p = str; p < end; p++) { 5963 if (*p == '\\' && p+1 < end) { 5964 // Push the chunk of unescaped characters before this escape sequence 5965 if (start < p && !cursor_push(&builder->strings, 5966 (unsigned char *)start, 5967 p - start)) { 5968 return 0; 5969 } 5970 5971 if (!cursor_push_unescaped_char(&builder->strings, *p, *(p+1))) 5972 return 0; 5973 5974 p++; // Skip the character following the backslash 5975 start = p + 1; // Update the start pointer to the next character 5976 } 5977 } 5978 5979 // Handle the last chunk after the last escape sequence (or if there are no escape sequences at all) 5980 if (start < p && !cursor_push(&builder->strings, (unsigned char *)start, 5981 p - start)) { 5982 return 0; 5983 } 5984 5985 if (written) 5986 *written = builder->strings.p - builder_start; 5987 5988 // TODO: dedupe these!? 5989 return cursor_push_byte(&builder->strings, '\0'); 5990 } 5991 5992 static int ndb_builder_push_json_tag(struct ndb_builder *builder, 5993 const char *str, int len) 5994 { 5995 union ndb_packed_str pstr; 5996 int pack_ids = 1; 5997 if (!ndb_builder_make_json_str(builder, str, len, &pstr, NULL, pack_ids)) 5998 return 0; 5999 return ndb_builder_finalize_tag(builder, pstr); 6000 } 6001 6002 // Push a json array into an ndb tag ["p", "abcd..."] -> struct ndb_tag 6003 static int ndb_builder_tag_from_json_array(struct ndb_json_parser *p, 6004 jsmntok_t *array) 6005 { 6006 jsmntok_t *str_tok; 6007 const char *str; 6008 6009 if (array->size == 0) 6010 return 0; 6011 6012 if (!ndb_builder_new_tag(&p->builder)) 6013 return 0; 6014 6015 for (int i = 0; i < array->size; i++) { 6016 str_tok = &array[i+1]; 6017 str = p->json + str_tok->start; 6018 6019 if (!ndb_builder_push_json_tag(&p->builder, str, 6020 toksize(str_tok))) { 6021 return 0; 6022 } 6023 } 6024 6025 return 1; 6026 } 6027 6028 // Push json tags into ndb data 6029 // [["t", "hashtag"], ["p", "abcde..."]] -> struct ndb_tags 6030 static inline int ndb_builder_process_json_tags(struct ndb_json_parser *p, 6031 jsmntok_t *array) 6032 { 6033 jsmntok_t *tag = array; 6034 6035 if (array->size == 0) 6036 return 1; 6037 6038 for (int i = 0; i < array->size; i++) { 6039 if (!ndb_builder_tag_from_json_array(p, &tag[i+1])) 6040 return 0; 6041 tag += tag[i+1].size; 6042 } 6043 6044 return 1; 6045 } 6046 6047 static int parse_unsigned_int(const char *start, int len, unsigned int *num) 6048 { 6049 unsigned int number = 0; 6050 const char *p = start, *end = start + len; 6051 int digits = 0; 6052 6053 while (p < end) { 6054 char c = *p; 6055 6056 if (c < '0' || c > '9') 6057 break; 6058 6059 // Check for overflow 6060 char digit = c - '0'; 6061 if (number > (UINT_MAX - digit) / 10) 6062 return 0; // Overflow detected 6063 6064 number = number * 10 + digit; 6065 6066 p++; 6067 digits++; 6068 } 6069 6070 if (digits == 0) 6071 return 0; 6072 6073 *num = number; 6074 return 1; 6075 } 6076 6077 int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce, 6078 unsigned char *buf, int bufsize, struct ndb_id_cb *cb) 6079 { 6080 jsmntok_t *tok = NULL; 6081 int tok_len, res; 6082 struct ndb_json_parser parser; 6083 struct ndb_event *ev = &fce->event; 6084 6085 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6086 6087 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 6088 return res; 6089 6090 if (parser.toks[0].type == JSMN_OBJECT) { 6091 ndb_debug("got raw json in client_event_from_json\n"); 6092 fce->evtype = NDB_FCE_EVENT; 6093 return ndb_parse_json_note(&parser, &ev->note); 6094 } 6095 6096 if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY) 6097 return 0; 6098 6099 parser.i = 1; 6100 tok = &parser.toks[parser.i++]; 6101 tok_len = toksize(tok); 6102 if (tok->type != JSMN_STRING) 6103 return 0; 6104 6105 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 6106 fce->evtype = NDB_FCE_EVENT; 6107 return ndb_parse_json_note(&parser, &ev->note); 6108 } 6109 6110 return 0; 6111 } 6112 6113 6114 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, 6115 unsigned char *buf, int bufsize, 6116 struct ndb_id_cb *cb) 6117 { 6118 jsmntok_t *tok = NULL; 6119 int tok_len, res; 6120 struct ndb_json_parser parser; 6121 struct ndb_event *ev = &tce->event; 6122 6123 tce->subid_len = 0; 6124 tce->subid = ""; 6125 6126 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6127 6128 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 6129 return res; 6130 6131 if (parser.toks[0].type == JSMN_OBJECT) { 6132 ndb_debug("got raw json in ws_event_from_json\n"); 6133 tce->evtype = NDB_TCE_EVENT; 6134 return ndb_parse_json_note(&parser, &ev->note); 6135 } 6136 6137 if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY) 6138 return 0; 6139 6140 parser.i = 1; 6141 tok = &parser.toks[parser.i++]; 6142 tok_len = toksize(tok); 6143 if (tok->type != JSMN_STRING) 6144 return 0; 6145 6146 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 6147 tce->evtype = NDB_TCE_EVENT; 6148 6149 tok = &parser.toks[parser.i++]; 6150 if (tok->type != JSMN_STRING) 6151 return 0; 6152 6153 tce->subid = json + tok->start; 6154 tce->subid_len = toksize(tok); 6155 6156 return ndb_parse_json_note(&parser, &ev->note); 6157 } else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) { 6158 tce->evtype = NDB_TCE_EOSE; 6159 6160 tok = &parser.toks[parser.i++]; 6161 if (tok->type != JSMN_STRING) 6162 return 0; 6163 6164 tce->subid = json + tok->start; 6165 tce->subid_len = toksize(tok); 6166 return 1; 6167 } else if (tok_len == 2 && !memcmp("OK", json + tok->start, 2)) { 6168 if (parser.num_tokens != 5) 6169 return 0; 6170 6171 struct ndb_command_result *cr = &tce->command_result; 6172 6173 tce->evtype = NDB_TCE_OK; 6174 6175 tok = &parser.toks[parser.i++]; 6176 if (tok->type != JSMN_STRING) 6177 return 0; 6178 6179 tce->subid = json + tok->start; 6180 tce->subid_len = toksize(tok); 6181 6182 tok = &parser.toks[parser.i++]; 6183 if (tok->type != JSMN_PRIMITIVE || toksize(tok) == 0) 6184 return 0; 6185 6186 cr->ok = (json + tok->start)[0] == 't'; 6187 6188 tok = &parser.toks[parser.i++]; 6189 if (tok->type != JSMN_STRING) 6190 return 0; 6191 6192 tce->command_result.msg = json + tok->start; 6193 tce->command_result.msglen = toksize(tok); 6194 6195 return 1; 6196 } else if (tok_len == 4 && !memcmp("AUTH", json + tok->start, 4)) { 6197 tce->evtype = NDB_TCE_AUTH; 6198 6199 tok = &parser.toks[parser.i++]; 6200 if (tok->type != JSMN_STRING) 6201 return 0; 6202 6203 tce->subid = json + tok->start; 6204 tce->subid_len = toksize(tok); 6205 6206 return 1; 6207 } 6208 6209 return 0; 6210 } 6211 6212 static enum ndb_filter_fieldtype 6213 ndb_filter_parse_field(const char *tok, int len, char *tagchar) 6214 { 6215 *tagchar = 0; 6216 6217 if (len == 0) 6218 return 0; 6219 6220 if (len == 7 && !strncmp(tok, "authors", 7)) { 6221 return NDB_FILTER_AUTHORS; 6222 } else if (len == 3 && !strncmp(tok, "ids", 3)) { 6223 return NDB_FILTER_IDS; 6224 } else if (len == 5 && !strncmp(tok, "kinds", 5)) { 6225 return NDB_FILTER_KINDS; 6226 } else if (len == 2 && tok[0] == '#') { 6227 *tagchar = tok[1]; 6228 return NDB_FILTER_TAGS; 6229 } else if (len == 5 && !strncmp(tok, "since", 5)) { 6230 return NDB_FILTER_SINCE; 6231 } else if (len == 5 && !strncmp(tok, "until", 5)) { 6232 return NDB_FILTER_UNTIL; 6233 } else if (len == 5 && !strncmp(tok, "limit", 5)) { 6234 return NDB_FILTER_LIMIT; 6235 } else if (len == 6 && !strncmp(tok, "search", 6)) { 6236 return NDB_FILTER_SEARCH; 6237 } 6238 6239 return 0; 6240 } 6241 6242 static int ndb_filter_parse_json_ids(struct ndb_json_parser *parser, 6243 struct ndb_filter *filter) 6244 { 6245 jsmntok_t *tok; 6246 const char *start; 6247 unsigned char hexbuf[32]; 6248 int tok_len, i, size; 6249 6250 tok = &parser->toks[parser->i++]; 6251 6252 if (tok->type != JSMN_ARRAY) { 6253 ndb_debug("parse_json_ids: not an array\n"); 6254 return 0; 6255 } 6256 6257 size = tok->size; 6258 6259 for (i = 0; i < size; parser->i++, i++) { 6260 tok = &parser->toks[parser->i]; 6261 start = parser->json + tok->start; 6262 tok_len = toksize(tok); 6263 6264 if (tok->type != JSMN_STRING) { 6265 ndb_debug("parse_json_ids: not a string '%d'\n", tok->type); 6266 return 0; 6267 } 6268 6269 if (tok_len != 64) { 6270 ndb_debug("parse_json_ids: not len 64: '%.*s'\n", tok_len, start); 6271 return 0; 6272 } 6273 6274 // id 6275 if (!hex_decode(start, tok_len, hexbuf, sizeof(hexbuf))) { 6276 ndb_debug("parse_json_ids: hex decode failed\n"); 6277 return 0; 6278 } 6279 6280 ndb_debug("adding id elem\n"); 6281 if (!ndb_filter_add_id_element(filter, hexbuf)) { 6282 ndb_debug("parse_json_ids: failed to add id element\n"); 6283 return 0; 6284 } 6285 } 6286 6287 parser->i--; 6288 return 1; 6289 } 6290 6291 static int ndb_filter_parse_json_elems(struct ndb_json_parser *parser, 6292 struct ndb_filter *filter) 6293 { 6294 jsmntok_t *tok; 6295 const char *start; 6296 int tok_len; 6297 unsigned char hexbuf[32]; 6298 enum ndb_generic_element_type typ; 6299 tok = NULL; 6300 int i, size; 6301 6302 tok = &parser->toks[parser->i++]; 6303 6304 if (tok->type != JSMN_ARRAY) 6305 return 0; 6306 6307 size = tok->size; 6308 6309 for (i = 0; i < size; i++, parser->i++) { 6310 tok = &parser->toks[parser->i]; 6311 start = parser->json + tok->start; 6312 tok_len = toksize(tok); 6313 6314 if (tok->type != JSMN_STRING) 6315 return 0; 6316 6317 if (i == 0) { 6318 if (tok_len == 64 && hex_decode(start, 64, hexbuf, sizeof(hexbuf))) { 6319 typ = NDB_ELEMENT_ID; 6320 if (!ndb_filter_add_id_element(filter, hexbuf)) { 6321 ndb_debug("failed to push id elem\n"); 6322 return 0; 6323 } 6324 } else { 6325 typ = NDB_ELEMENT_STRING; 6326 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 6327 return 0; 6328 } 6329 } else if (typ == NDB_ELEMENT_ID) { 6330 if (!hex_decode(start, 64, hexbuf, sizeof(hexbuf))) 6331 return 0; 6332 if (!ndb_filter_add_id_element(filter, hexbuf)) 6333 return 0; 6334 } else if (typ == NDB_ELEMENT_STRING) { 6335 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 6336 return 0; 6337 } else { 6338 // ??? 6339 return 0; 6340 } 6341 } 6342 6343 parser->i--; 6344 return 1; 6345 } 6346 6347 static int ndb_filter_parse_json_str(struct ndb_json_parser *parser, 6348 struct ndb_filter *filter) 6349 { 6350 jsmntok_t *tok; 6351 const char *start; 6352 int tok_len; 6353 6354 tok = &parser->toks[parser->i]; 6355 start = parser->json + tok->start; 6356 tok_len = toksize(tok); 6357 6358 if (tok->type != JSMN_STRING) 6359 return 0; 6360 6361 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 6362 return 0; 6363 6364 ndb_debug("added str elem '%.*s'\n", tok_len, start); 6365 6366 return 1; 6367 } 6368 6369 static int ndb_filter_parse_json_int(struct ndb_json_parser *parser, 6370 struct ndb_filter *filter) 6371 { 6372 jsmntok_t *tok; 6373 const char *start; 6374 int tok_len; 6375 unsigned int value; 6376 6377 tok = &parser->toks[parser->i]; 6378 start = parser->json + tok->start; 6379 tok_len = toksize(tok); 6380 6381 if (tok->type != JSMN_PRIMITIVE) 6382 return 0; 6383 6384 if (!parse_unsigned_int(start, tok_len, &value)) 6385 return 0; 6386 6387 if (!ndb_filter_add_int_element(filter, (uint64_t)value)) 6388 return 0; 6389 6390 ndb_debug("added int elem %d\n", value); 6391 6392 return 1; 6393 } 6394 6395 6396 static int ndb_filter_parse_json_ints(struct ndb_json_parser *parser, 6397 struct ndb_filter *filter) 6398 { 6399 jsmntok_t *tok; 6400 int size, i; 6401 6402 tok = &parser->toks[parser->i++]; 6403 6404 if (tok->type != JSMN_ARRAY) 6405 return 0; 6406 6407 size = tok->size; 6408 6409 for (i = 0; i < size; parser->i++, i++) { 6410 if (!ndb_filter_parse_json_int(parser, filter)) 6411 return 0; 6412 } 6413 6414 parser->i--; 6415 return 1; 6416 } 6417 6418 6419 static int ndb_filter_parse_json(struct ndb_json_parser *parser, 6420 struct ndb_filter *filter) 6421 { 6422 jsmntok_t *tok = NULL; 6423 const char *json = parser->json; 6424 const char *start; 6425 char tag; 6426 int tok_len; 6427 enum ndb_filter_fieldtype field; 6428 6429 if (parser->toks[parser->i++].type != JSMN_OBJECT) 6430 return 0; 6431 6432 for (; parser->i < parser->num_tokens; parser->i++) { 6433 tok = &parser->toks[parser->i++]; 6434 start = json + tok->start; 6435 tok_len = toksize(tok); 6436 6437 if (!(field = ndb_filter_parse_field(start, tok_len, &tag))) { 6438 ndb_debug("failed field '%.*s'\n", tok_len, start); 6439 continue; 6440 } 6441 6442 if (tag) { 6443 ndb_debug("starting tag field '%c'\n", tag); 6444 if (!ndb_filter_start_tag_field(filter, tag)) { 6445 ndb_debug("failed to start tag field '%c'\n", tag); 6446 return 0; 6447 } 6448 } else if (!ndb_filter_start_field(filter, field)) { 6449 ndb_debug("field already started\n"); 6450 return 0; 6451 } 6452 6453 // we parsed a top-level field 6454 switch(field) { 6455 case NDB_FILTER_AUTHORS: 6456 case NDB_FILTER_IDS: 6457 if (!ndb_filter_parse_json_ids(parser, filter)) { 6458 ndb_debug("failed to parse filter ids/authors\n"); 6459 return 0; 6460 } 6461 break; 6462 case NDB_FILTER_SEARCH: 6463 if (!ndb_filter_parse_json_str(parser, filter)) { 6464 ndb_debug("failed to parse filter search str\n"); 6465 return 0; 6466 } 6467 break; 6468 case NDB_FILTER_SINCE: 6469 case NDB_FILTER_UNTIL: 6470 case NDB_FILTER_LIMIT: 6471 if (!ndb_filter_parse_json_int(parser, filter)) { 6472 ndb_debug("failed to parse filter since/until/limit\n"); 6473 return 0; 6474 } 6475 break; 6476 case NDB_FILTER_KINDS: 6477 if (!ndb_filter_parse_json_ints(parser, filter)) { 6478 ndb_debug("failed to parse filter kinds\n"); 6479 return 0; 6480 } 6481 break; 6482 case NDB_FILTER_TAGS: 6483 if (!ndb_filter_parse_json_elems(parser, filter)) { 6484 ndb_debug("failed to parse filter tags\n"); 6485 return 0; 6486 } 6487 break; 6488 } 6489 6490 ndb_filter_end_field(filter); 6491 } 6492 6493 return ndb_filter_end(filter); 6494 } 6495 6496 int ndb_parse_json_note(struct ndb_json_parser *parser, struct ndb_note **note) 6497 { 6498 jsmntok_t *tok = NULL; 6499 unsigned char hexbuf[64]; 6500 const char *json = parser->json; 6501 const char *start; 6502 int i, tok_len, parsed; 6503 6504 parsed = 0; 6505 6506 if (parser->toks[parser->i].type != JSMN_OBJECT) 6507 return 0; 6508 6509 // TODO: build id buffer and verify at end 6510 6511 for (i = parser->i + 1; i < parser->num_tokens; i++) { 6512 tok = &parser->toks[i]; 6513 start = json + tok->start; 6514 tok_len = toksize(tok); 6515 6516 //printf("toplevel %.*s %d\n", tok_len, json + tok->start, tok->type); 6517 if (tok_len == 0 || i + 1 >= parser->num_tokens) 6518 continue; 6519 6520 if (start[0] == 'p' && jsoneq(json, tok, tok_len, "pubkey")) { 6521 // pubkey 6522 tok = &parser->toks[i+1]; 6523 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6524 parsed |= NDB_PARSED_PUBKEY; 6525 ndb_builder_set_pubkey(&parser->builder, hexbuf); 6526 } else if (tok_len == 2 && start[0] == 'i' && start[1] == 'd') { 6527 // id 6528 tok = &parser->toks[i+1]; 6529 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6530 parsed |= NDB_PARSED_ID; 6531 ndb_builder_set_id(&parser->builder, hexbuf); 6532 } else if (tok_len == 3 && start[0] == 's' && start[1] == 'i' && start[2] == 'g') { 6533 // sig 6534 tok = &parser->toks[i+1]; 6535 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6536 parsed |= NDB_PARSED_SIG; 6537 ndb_builder_set_sig(&parser->builder, hexbuf); 6538 } else if (start[0] == 'k' && jsoneq(json, tok, tok_len, "kind")) { 6539 // kind 6540 tok = &parser->toks[i+1]; 6541 start = json + tok->start; 6542 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 6543 return 0; 6544 if (!parse_unsigned_int(start, toksize(tok), 6545 &parser->builder.note->kind)) 6546 return 0; 6547 parsed |= NDB_PARSED_KIND; 6548 } else if (start[0] == 'c') { 6549 if (jsoneq(json, tok, tok_len, "created_at")) { 6550 // created_at 6551 tok = &parser->toks[i+1]; 6552 start = json + tok->start; 6553 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 6554 return 0; 6555 // TODO: update to int64 in 2106 ... xD 6556 unsigned int bigi; 6557 if (!parse_unsigned_int(start, toksize(tok), &bigi)) 6558 return 0; 6559 parser->builder.note->created_at = bigi; 6560 parsed |= NDB_PARSED_CREATED_AT; 6561 } else if (jsoneq(json, tok, tok_len, "content")) { 6562 // content 6563 tok = &parser->toks[i+1]; 6564 union ndb_packed_str pstr; 6565 tok_len = toksize(tok); 6566 int written, pack_ids = 0; 6567 if (!ndb_builder_make_json_str(&parser->builder, 6568 json + tok->start, 6569 tok_len, &pstr, 6570 &written, pack_ids)) { 6571 ndb_debug("ndb_builder_make_json_str failed\n"); 6572 return 0; 6573 } 6574 parser->builder.note->content_length = written; 6575 parser->builder.note->content = pstr; 6576 parsed |= NDB_PARSED_CONTENT; 6577 } 6578 } else if (start[0] == 't' && jsoneq(json, tok, tok_len, "tags")) { 6579 tok = &parser->toks[i+1]; 6580 ndb_builder_process_json_tags(parser, tok); 6581 i += tok->size; 6582 parsed |= NDB_PARSED_TAGS; 6583 } 6584 } 6585 6586 //ndb_debug("parsed %d = %d, &->%d", parsed, NDB_PARSED_ALL, parsed & NDB_PARSED_ALL); 6587 if (parsed != NDB_PARSED_ALL) 6588 return 0; 6589 6590 return ndb_builder_finalize(&parser->builder, note, NULL); 6591 } 6592 6593 int ndb_filter_from_json(const char *json, int len, struct ndb_filter *filter, 6594 unsigned char *buf, int bufsize) 6595 { 6596 struct ndb_json_parser parser; 6597 int res; 6598 6599 if (filter->finalized) 6600 return 0; 6601 6602 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6603 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 6604 return res; 6605 6606 if (parser.num_tokens < 1) 6607 return 0; 6608 6609 return ndb_filter_parse_json(&parser, filter); 6610 } 6611 6612 int ndb_note_from_json(const char *json, int len, struct ndb_note **note, 6613 unsigned char *buf, int bufsize) 6614 { 6615 struct ndb_json_parser parser; 6616 int res; 6617 6618 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6619 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 6620 return res; 6621 6622 if (parser.num_tokens < 1) 6623 return 0; 6624 6625 return ndb_parse_json_note(&parser, note); 6626 } 6627 6628 void ndb_builder_set_pubkey(struct ndb_builder *builder, unsigned char *pubkey) 6629 { 6630 memcpy(builder->note->pubkey, pubkey, 32); 6631 } 6632 6633 void ndb_builder_set_id(struct ndb_builder *builder, unsigned char *id) 6634 { 6635 memcpy(builder->note->id, id, 32); 6636 } 6637 6638 void ndb_builder_set_sig(struct ndb_builder *builder, unsigned char *sig) 6639 { 6640 memcpy(builder->note->sig, sig, 64); 6641 } 6642 6643 void ndb_builder_set_kind(struct ndb_builder *builder, uint32_t kind) 6644 { 6645 builder->note->kind = kind; 6646 } 6647 6648 void ndb_builder_set_created_at(struct ndb_builder *builder, uint64_t created_at) 6649 { 6650 builder->note->created_at = created_at; 6651 } 6652 6653 int ndb_builder_new_tag(struct ndb_builder *builder) 6654 { 6655 builder->note->tags.count++; 6656 struct ndb_tag tag = {0}; 6657 builder->current_tag = (struct ndb_tag *)builder->note_cur.p; 6658 return cursor_push_tag(&builder->note_cur, &tag); 6659 } 6660 6661 void ndb_stat_counts_init(struct ndb_stat_counts *counts) 6662 { 6663 counts->count = 0; 6664 counts->key_size = 0; 6665 counts->value_size = 0; 6666 } 6667 6668 static void ndb_stat_init(struct ndb_stat *stat) 6669 { 6670 // init stats 6671 int i; 6672 6673 for (i = 0; i < NDB_CKIND_COUNT; i++) { 6674 ndb_stat_counts_init(&stat->common_kinds[i]); 6675 } 6676 6677 for (i = 0; i < NDB_DBS; i++) { 6678 ndb_stat_counts_init(&stat->dbs[i]); 6679 } 6680 6681 ndb_stat_counts_init(&stat->other_kinds); 6682 } 6683 6684 int ndb_stat(struct ndb *ndb, struct ndb_stat *stat) 6685 { 6686 int rc; 6687 MDB_cursor *cur; 6688 MDB_val k, v; 6689 MDB_dbi db; 6690 struct ndb_txn txn; 6691 struct ndb_note *note; 6692 int i; 6693 enum ndb_common_kind common_kind; 6694 6695 // initialize to 0 6696 ndb_stat_init(stat); 6697 6698 if (!ndb_begin_query(ndb, &txn)) { 6699 fprintf(stderr, "ndb_stat failed at ndb_begin_query\n"); 6700 return 0; 6701 } 6702 6703 // stat each dbi in the database 6704 for (i = 0; i < NDB_DBS; i++) 6705 { 6706 db = ndb->lmdb.dbs[i]; 6707 6708 if ((rc = mdb_cursor_open(txn.mdb_txn, db, &cur))) { 6709 fprintf(stderr, "ndb_stat: mdb_cursor_open failed, error '%s'\n", 6710 mdb_strerror(rc)); 6711 return 0; 6712 } 6713 6714 // loop over every entry and count kv sizes 6715 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6716 // we gather more detailed per-kind stats if we're in 6717 // the notes db 6718 if (i == NDB_DB_NOTE) { 6719 note = v.mv_data; 6720 common_kind = ndb_kind_to_common_kind(note->kind); 6721 6722 // uncommon kind? just count them in bulk 6723 if ((int)common_kind == -1) { 6724 stat->other_kinds.count++; 6725 stat->other_kinds.key_size += k.mv_size; 6726 stat->other_kinds.value_size += v.mv_size; 6727 } else { 6728 stat->common_kinds[common_kind].count++; 6729 stat->common_kinds[common_kind].key_size += k.mv_size; 6730 stat->common_kinds[common_kind].value_size += v.mv_size; 6731 } 6732 } 6733 6734 stat->dbs[i].count++; 6735 stat->dbs[i].key_size += k.mv_size; 6736 stat->dbs[i].value_size += v.mv_size; 6737 } 6738 6739 // close the cursor, they are per-dbi 6740 mdb_cursor_close(cur); 6741 } 6742 6743 ndb_end_query(&txn); 6744 6745 return 1; 6746 } 6747 6748 /// Push an element to the current tag 6749 /// 6750 /// Basic idea is to call ndb_builder_new_tag 6751 int ndb_builder_push_tag_str(struct ndb_builder *builder, 6752 const char *str, int len) 6753 { 6754 union ndb_packed_str pstr; 6755 int pack_ids = 1; 6756 if (!ndb_builder_make_str(builder, str, len, &pstr, pack_ids)) 6757 return 0; 6758 return ndb_builder_finalize_tag(builder, pstr); 6759 } 6760 6761 // 6762 // CONFIG 6763 // 6764 void ndb_default_config(struct ndb_config *config) 6765 { 6766 int cores = get_cpu_cores(); 6767 config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB 6768 config->ingester_threads = cores == -1 ? 4 : cores; 6769 config->flags = 0; 6770 config->ingest_filter = NULL; 6771 config->filter_context = NULL; 6772 config->sub_cb_ctx = NULL; 6773 config->sub_cb = NULL; 6774 } 6775 6776 void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *context) 6777 { 6778 config->sub_cb_ctx = context; 6779 config->sub_cb = fn; 6780 } 6781 6782 void ndb_config_set_ingest_threads(struct ndb_config *config, int threads) 6783 { 6784 config->ingester_threads = threads; 6785 } 6786 6787 void ndb_config_set_flags(struct ndb_config *config, int flags) 6788 { 6789 config->flags = flags; 6790 } 6791 6792 void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize) 6793 { 6794 config->mapsize = mapsize; 6795 } 6796 6797 void ndb_config_set_ingest_filter(struct ndb_config *config, 6798 ndb_ingest_filter_fn fn, void *filter_ctx) 6799 { 6800 config->ingest_filter = fn; 6801 config->filter_context = filter_ctx; 6802 } 6803 6804 int ndb_print_tag_index(struct ndb_txn *txn) 6805 { 6806 MDB_cursor *cur; 6807 MDB_val k, v; 6808 int i; 6809 6810 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TAGS], &cur)) 6811 return 0; 6812 6813 i = 1; 6814 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6815 printf("%d ", i); 6816 print_tag_kv(txn, &k, &v); 6817 i++; 6818 } 6819 6820 return 1; 6821 } 6822 6823 int ndb_print_kind_keys(struct ndb_txn *txn) 6824 { 6825 MDB_cursor *cur; 6826 MDB_val k, v; 6827 int i; 6828 struct ndb_u64_ts *tsid; 6829 6830 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_KIND], &cur)) 6831 return 0; 6832 6833 i = 1; 6834 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6835 tsid = k.mv_data; 6836 printf("%d note_kind %" PRIu64 " %" PRIu64 "\n", 6837 i, tsid->u64, tsid->timestamp); 6838 6839 i++; 6840 } 6841 6842 return 1; 6843 } 6844 6845 // used by ndb.c 6846 int ndb_print_search_keys(struct ndb_txn *txn) 6847 { 6848 MDB_cursor *cur; 6849 MDB_val k, v; 6850 int i; 6851 struct ndb_text_search_key search_key; 6852 6853 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TEXT], &cur)) 6854 return 0; 6855 6856 i = 1; 6857 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6858 if (!ndb_unpack_text_search_key(k.mv_data, k.mv_size, &search_key)) { 6859 fprintf(stderr, "error decoding key %d\n", i); 6860 continue; 6861 } 6862 6863 ndb_print_text_search_key(k.mv_size, &search_key); 6864 printf("\n"); 6865 6866 i++; 6867 } 6868 6869 return 1; 6870 } 6871 6872 struct ndb_tags *ndb_note_tags(struct ndb_note *note) 6873 { 6874 return ¬e->tags; 6875 } 6876 6877 struct ndb_str ndb_note_str(struct ndb_note *note, union ndb_packed_str *pstr) 6878 { 6879 struct ndb_str str; 6880 str.flag = pstr->packed.flag; 6881 6882 if (str.flag == NDB_PACKED_STR) { 6883 str.str = pstr->packed.str; 6884 return str; 6885 } 6886 6887 str.str = ((const char *)note) + note->strings + (pstr->offset & 0xFFFFFF); 6888 return str; 6889 } 6890 6891 struct ndb_str ndb_tag_str(struct ndb_note *note, struct ndb_tag *tag, int ind) 6892 { 6893 return ndb_note_str(note, &tag->strs[ind]); 6894 } 6895 6896 int ndb_str_len(struct ndb_str *str) 6897 { 6898 if (str->flag == NDB_PACKED_ID) 6899 return 32; 6900 return strlen(str->str); 6901 } 6902 6903 struct ndb_str ndb_iter_tag_str(struct ndb_iterator *iter, int ind) 6904 { 6905 return ndb_tag_str(iter->note, iter->tag, ind); 6906 } 6907 6908 unsigned char * ndb_note_id(struct ndb_note *note) 6909 { 6910 return note->id; 6911 } 6912 6913 unsigned char * ndb_note_pubkey(struct ndb_note *note) 6914 { 6915 return note->pubkey; 6916 } 6917 6918 unsigned char * ndb_note_sig(struct ndb_note *note) 6919 { 6920 return note->sig; 6921 } 6922 6923 uint32_t ndb_note_created_at(struct ndb_note *note) 6924 { 6925 return note->created_at; 6926 } 6927 6928 uint32_t ndb_note_kind(struct ndb_note *note) 6929 { 6930 return note->kind; 6931 } 6932 6933 void _ndb_note_set_kind(struct ndb_note *note, uint32_t kind) 6934 { 6935 note->kind = kind; 6936 } 6937 6938 const char *ndb_note_content(struct ndb_note *note) 6939 { 6940 return ndb_note_str(note, ¬e->content).str; 6941 } 6942 6943 uint32_t ndb_note_content_length(struct ndb_note *note) 6944 { 6945 return note->content_length; 6946 } 6947 6948 struct ndb_note * ndb_note_from_bytes(unsigned char *bytes) 6949 { 6950 struct ndb_note *note = (struct ndb_note *)bytes; 6951 if (note->version != 1) 6952 return 0; 6953 return note; 6954 } 6955 6956 void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter) 6957 { 6958 iter->note = note; 6959 iter->tag = NULL; 6960 iter->index = -1; 6961 } 6962 6963 // Helper function to get a pointer to the nth tag 6964 static struct ndb_tag *ndb_tags_tag(struct ndb_tags *tags, size_t index) { 6965 return (struct ndb_tag *)((uint8_t *)tags + sizeof(struct ndb_tags) + index * sizeof(struct ndb_tag)); 6966 } 6967 6968 int ndb_tags_iterate_next(struct ndb_iterator *iter) 6969 { 6970 struct ndb_tags *tags; 6971 6972 if (iter->tag == NULL || iter->index == -1) { 6973 iter->tag = ndb_tags_tag(&iter->note->tags, 0); 6974 iter->index = 0; 6975 return iter->note->tags.count != 0; 6976 } 6977 6978 tags = &iter->note->tags; 6979 6980 if (++iter->index < tags->count) { 6981 uint32_t tag_data_size = iter->tag->count * sizeof(iter->tag->strs[0]); 6982 iter->tag = (struct ndb_tag *)(iter->tag->strs[0].bytes + tag_data_size); 6983 return 1; 6984 } 6985 6986 return 0; 6987 } 6988 6989 uint16_t ndb_tags_count(struct ndb_tags *tags) 6990 { 6991 return tags->count; 6992 } 6993 6994 uint16_t ndb_tag_count(struct ndb_tag *tags) 6995 { 6996 return tags->count; 6997 } 6998 6999 enum ndb_common_kind ndb_kind_to_common_kind(int kind) 7000 { 7001 switch (kind) 7002 { 7003 case 0: return NDB_CKIND_PROFILE; 7004 case 1: return NDB_CKIND_TEXT; 7005 case 3: return NDB_CKIND_CONTACTS; 7006 case 4: return NDB_CKIND_DM; 7007 case 5: return NDB_CKIND_DELETE; 7008 case 6: return NDB_CKIND_REPOST; 7009 case 7: return NDB_CKIND_REACTION; 7010 case 9735: return NDB_CKIND_ZAP; 7011 case 9734: return NDB_CKIND_ZAP_REQUEST; 7012 case 23194: return NDB_CKIND_NWC_REQUEST; 7013 case 23195: return NDB_CKIND_NWC_RESPONSE; 7014 case 27235: return NDB_CKIND_HTTP_AUTH; 7015 case 30000: return NDB_CKIND_LIST; 7016 case 30023: return NDB_CKIND_LONGFORM; 7017 case 30315: return NDB_CKIND_STATUS; 7018 } 7019 7020 return -1; 7021 } 7022 7023 const char *ndb_kind_name(enum ndb_common_kind ck) 7024 { 7025 switch (ck) { 7026 case NDB_CKIND_PROFILE: return "profile"; 7027 case NDB_CKIND_TEXT: return "text"; 7028 case NDB_CKIND_CONTACTS: return "contacts"; 7029 case NDB_CKIND_DM: return "dm"; 7030 case NDB_CKIND_DELETE: return "delete"; 7031 case NDB_CKIND_REPOST: return "repost"; 7032 case NDB_CKIND_REACTION: return "reaction"; 7033 case NDB_CKIND_ZAP: return "zap"; 7034 case NDB_CKIND_ZAP_REQUEST: return "zap_request"; 7035 case NDB_CKIND_NWC_REQUEST: return "nwc_request"; 7036 case NDB_CKIND_NWC_RESPONSE: return "nwc_response"; 7037 case NDB_CKIND_HTTP_AUTH: return "http_auth"; 7038 case NDB_CKIND_LIST: return "list"; 7039 case NDB_CKIND_LONGFORM: return "longform"; 7040 case NDB_CKIND_STATUS: return "status"; 7041 case NDB_CKIND_COUNT: return "unknown"; 7042 } 7043 7044 return "unknown"; 7045 } 7046 7047 const char *ndb_db_name(enum ndb_dbs db) 7048 { 7049 switch (db) { 7050 case NDB_DB_NOTE: 7051 return "note"; 7052 case NDB_DB_META: 7053 return "note_metadata"; 7054 case NDB_DB_PROFILE: 7055 return "profile"; 7056 case NDB_DB_NOTE_ID: 7057 return "note_index"; 7058 case NDB_DB_PROFILE_PK: 7059 return "profile_pubkey_index"; 7060 case NDB_DB_NDB_META: 7061 return "nostrdb_metadata"; 7062 case NDB_DB_PROFILE_SEARCH: 7063 return "profile_search"; 7064 case NDB_DB_PROFILE_LAST_FETCH: 7065 return "profile_last_fetch"; 7066 case NDB_DB_NOTE_KIND: 7067 return "note_kind_index"; 7068 case NDB_DB_NOTE_TEXT: 7069 return "note_fulltext"; 7070 case NDB_DB_NOTE_BLOCKS: 7071 return "note_blocks"; 7072 case NDB_DB_NOTE_TAGS: 7073 return "note_tags"; 7074 case NDB_DB_NOTE_PUBKEY: 7075 return "note_pubkey_index"; 7076 case NDB_DB_NOTE_PUBKEY_KIND: 7077 return "note_pubkey_kind_index"; 7078 case NDB_DBS: 7079 return "count"; 7080 } 7081 7082 return "unknown"; 7083 } 7084 7085 static struct ndb_blocks *ndb_note_to_blocks(struct ndb_note *note) 7086 { 7087 const char *content; 7088 size_t content_len; 7089 struct ndb_blocks *blocks; 7090 unsigned char *buffer; 7091 7092 content = ndb_note_content(note); 7093 content_len = ndb_note_content_length(note); 7094 7095 // something weird is going on 7096 if (content_len >= INT32_MAX) 7097 return NULL; 7098 7099 uint32_t buf_size = 2<<18; 7100 7101 buffer = malloc(buf_size); // Not carefully calculated, but ok because we will not need this once we switch to the local relay model 7102 if (!buffer) 7103 return NULL; 7104 7105 if (!ndb_parse_content(buffer, buf_size, content, content_len, &blocks)) { 7106 free(buffer); 7107 return NULL; 7108 } 7109 7110 //blocks = realloc(blocks, ndb_blocks_total_size(blocks)); 7111 //if (blocks == NULL) 7112 //return NULL; 7113 7114 blocks->flags |= NDB_BLOCK_FLAG_OWNED; 7115 7116 return blocks; 7117 } 7118 7119 struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, uint64_t note_key) 7120 { 7121 struct ndb_blocks *blocks, *blocks_to_writer; 7122 size_t blocks_size; 7123 struct ndb_note *note; 7124 size_t note_len; 7125 7126 if ((blocks = ndb_lookup_by_key(txn, note_key, NDB_DB_NOTE_BLOCKS, ¬e_len))) { 7127 return blocks; 7128 } 7129 7130 // If we don't have note blocks, let's lazily generate them. This is 7131 // migration-friendly instead of doing them all at once 7132 if (!(note = ndb_get_note_by_key(txn, note_key, ¬e_len))) { 7133 // no note found, can't return note blocks 7134 return NULL; 7135 } 7136 7137 if (!(blocks = ndb_note_to_blocks(note))) 7138 return NULL; 7139 7140 // send a copy to the writer 7141 blocks_size = ndb_blocks_total_size(blocks); 7142 blocks_to_writer = malloc(blocks_size); 7143 memcpy(blocks_to_writer, blocks, blocks_size); 7144 assert(blocks->flags & NDB_BLOCK_FLAG_OWNED); 7145 7146 // we generated new blocks, let's store them in the DB 7147 struct ndb_writer_blocks write_blocks = { 7148 .blocks = blocks_to_writer, 7149 .note_key = note_key 7150 }; 7151 7152 assert(write_blocks.blocks != blocks); 7153 7154 struct ndb_writer_msg msg = { .type = NDB_WRITER_BLOCKS }; 7155 msg.blocks = write_blocks; 7156 7157 ndb_writer_queue_msg(&ndb->writer, &msg); 7158 7159 return blocks; 7160 } 7161 7162 // please call ndb_monitor_lock before calling this 7163 static struct ndb_subscription * 7164 ndb_monitor_find_subscription(struct ndb_monitor *monitor, uint64_t subid, int *index) 7165 { 7166 struct ndb_subscription *sub, *tsub; 7167 int i; 7168 7169 for (i = 0, sub = NULL; i < monitor->num_subscriptions; i++) { 7170 tsub = &monitor->subscriptions[i]; 7171 if (tsub->subid == subid) { 7172 sub = tsub; 7173 if (index) 7174 *index = i; 7175 break; 7176 } 7177 } 7178 7179 return sub; 7180 } 7181 7182 int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, 7183 int note_id_capacity) 7184 { 7185 struct ndb_subscription *sub; 7186 int res; 7187 7188 if (subid == 0) 7189 return 0; 7190 7191 ndb_monitor_lock(&ndb->monitor); 7192 7193 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) 7194 res = 0; 7195 else 7196 res = prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity); 7197 7198 ndb_monitor_unlock(&ndb->monitor); 7199 7200 return res; 7201 } 7202 7203 int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, 7204 int note_id_capacity) 7205 { 7206 struct ndb_subscription *sub; 7207 struct prot_queue *queue_inbox; 7208 7209 // this is not a valid subscription id 7210 if (subid == 0) 7211 return 0; 7212 7213 ndb_monitor_lock(&ndb->monitor); 7214 7215 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) { 7216 ndb_monitor_unlock(&ndb->monitor); 7217 return 0; 7218 } 7219 7220 queue_inbox = &sub->inbox; 7221 7222 ndb_monitor_unlock(&ndb->monitor); 7223 7224 // there is technically a race condition if the thread yeilds at this 7225 // comment and a subscription is added/removed. A deadlock in the 7226 // writer queue would be much worse though. This function is dubious 7227 // anyways. 7228 7229 return prot_queue_pop_all(queue_inbox, note_ids, note_id_capacity); 7230 } 7231 7232 int ndb_unsubscribe(struct ndb *ndb, uint64_t subid) 7233 { 7234 struct ndb_subscription *sub; 7235 int index, res, elems_to_move; 7236 7237 ndb_monitor_lock(&ndb->monitor); 7238 7239 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, &index))) { 7240 res = 0; 7241 goto done; 7242 } 7243 7244 ndb_subscription_destroy(sub); 7245 7246 elems_to_move = (--ndb->monitor.num_subscriptions) - index; 7247 7248 memmove(&ndb->monitor.subscriptions[index], 7249 &ndb->monitor.subscriptions[index+1], 7250 elems_to_move * sizeof(*sub)); 7251 7252 res = 1; 7253 7254 done: 7255 ndb_monitor_unlock(&ndb->monitor); 7256 7257 return res; 7258 } 7259 7260 int ndb_num_subscriptions(struct ndb *ndb) 7261 { 7262 return ndb->monitor.num_subscriptions; 7263 } 7264 7265 uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filters) 7266 { 7267 static uint64_t subids = 0; 7268 struct ndb_subscription *sub; 7269 size_t buflen; 7270 uint64_t subid; 7271 char *buf; 7272 7273 ndb_monitor_lock(&ndb->monitor); 7274 7275 if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) { 7276 fprintf(stderr, "too many subscriptions\n"); 7277 subid = 0; 7278 goto done; 7279 } 7280 7281 sub = &ndb->monitor.subscriptions[ndb->monitor.num_subscriptions]; 7282 subid = ++subids; 7283 sub->subid = subid; 7284 7285 ndb_filter_group_init(&sub->group); 7286 if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) { 7287 subid = 0; 7288 goto done; 7289 } 7290 7291 // 500k ought to be enough for anyone 7292 buflen = sizeof(uint64_t) * DEFAULT_QUEUE_SIZE; 7293 buf = malloc(buflen); 7294 7295 if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) { 7296 fprintf(stderr, "failed to push prot queue\n"); 7297 subid = 0; 7298 goto done; 7299 } 7300 7301 ndb->monitor.num_subscriptions++; 7302 done: 7303 ndb_monitor_unlock(&ndb->monitor); 7304 7305 return subid; 7306 }