nostrdb.c (180614B)
1 2 #include "nostrdb.h" 3 #include "jsmn.h" 4 #include "hex.h" 5 #include "cursor.h" 6 #include "random.h" 7 #include "ccan/crypto/sha256/sha256.h" 8 #include "bolt11/bolt11.h" 9 #include "bolt11/amount.h" 10 #include "lmdb.h" 11 #include "util.h" 12 #include "cpu.h" 13 #include "block.h" 14 #include "threadpool.h" 15 #include "thread.h" 16 #include "protected_queue.h" 17 #include "memchr.h" 18 #include "print_util.h" 19 #include <stdlib.h> 20 #include <limits.h> 21 #include <assert.h> 22 23 #include "bindings/c/profile_json_parser.h" 24 #include "bindings/c/profile_builder.h" 25 #include "bindings/c/meta_builder.h" 26 #include "bindings/c/meta_reader.h" 27 #include "bindings/c/profile_verifier.h" 28 #include "secp256k1.h" 29 #include "secp256k1_ecdh.h" 30 #include "secp256k1_schnorrsig.h" 31 32 #define max(a,b) ((a) > (b) ? (a) : (b)) 33 #define min(a,b) ((a) < (b) ? (a) : (b)) 34 35 // the maximum number of things threads pop and push in bulk 36 #define THREAD_QUEUE_BATCH 4096 37 38 // maximum number of active subscriptions 39 #define MAX_SUBSCRIPTIONS 256 40 #define MAX_SCAN_CURSORS 12 41 #define MAX_FILTERS 16 42 43 // the maximum size of inbox queues 44 static const int DEFAULT_QUEUE_SIZE = 32768; 45 46 // increase if we need bigger filters 47 #define NDB_FILTER_PAGES 64 48 49 #define ndb_flag_set(flags, f) ((flags & f) == f) 50 51 #define NDB_PARSED_ID (1 << 0) 52 #define NDB_PARSED_PUBKEY (1 << 1) 53 #define NDB_PARSED_SIG (1 << 2) 54 #define NDB_PARSED_CREATED_AT (1 << 3) 55 #define NDB_PARSED_KIND (1 << 4) 56 #define NDB_PARSED_CONTENT (1 << 5) 57 #define NDB_PARSED_TAGS (1 << 6) 58 #define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS) 59 60 typedef int (*ndb_migrate_fn)(struct ndb_txn *); 61 typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len, 62 int word_index); 63 64 // these must be byte-aligned, they are directly accessing the serialized data 65 // representation 66 #pragma pack(push, 1) 67 68 union ndb_packed_str { 69 struct { 70 char str[3]; 71 // we assume little endian everywhere. sorry not sorry. 72 unsigned char flag; // NDB_PACKED_STR, etc 73 } packed; 74 75 uint32_t offset; 76 unsigned char bytes[4]; 77 }; 78 79 struct ndb_tag { 80 uint16_t count; 81 union ndb_packed_str strs[0]; 82 }; 83 84 struct ndb_tags { 85 uint16_t padding; 86 uint16_t count; 87 }; 88 89 // v1 90 struct ndb_note { 91 unsigned char version; // v=1 92 unsigned char padding[3]; // keep things aligned 93 unsigned char id[32]; 94 unsigned char pubkey[32]; 95 unsigned char sig[64]; 96 97 uint64_t created_at; 98 uint32_t kind; 99 uint32_t content_length; 100 union ndb_packed_str content; 101 uint32_t strings; 102 // nothing can come after tags since it contains variadic data 103 struct ndb_tags tags; 104 }; 105 106 #pragma pack(pop) 107 108 109 struct ndb_migration { 110 ndb_migrate_fn fn; 111 }; 112 113 struct ndb_profile_record_builder { 114 flatcc_builder_t *builder; 115 void *flatbuf; 116 }; 117 118 // controls whether to continue or stop the json parser 119 enum ndb_idres { 120 NDB_IDRES_CONT, 121 NDB_IDRES_STOP, 122 }; 123 124 // closure data for the id-detecting ingest controller 125 struct ndb_ingest_controller 126 { 127 MDB_txn *read_txn; 128 struct ndb_lmdb *lmdb; 129 }; 130 131 enum ndb_writer_msgtype { 132 NDB_WRITER_QUIT, // kill thread immediately 133 NDB_WRITER_NOTE, // write a note to the db 134 NDB_WRITER_PROFILE, // write a profile to the db 135 NDB_WRITER_DBMETA, // write ndb metadata 136 NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched 137 NDB_WRITER_BLOCKS, // write parsed note blocks 138 NDB_WRITER_MIGRATE, // migrate the database 139 }; 140 141 // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) 142 enum ndb_meta_key { 143 NDB_META_KEY_VERSION = 1 144 }; 145 146 struct ndb_json_parser { 147 const char *json; 148 int json_len; 149 struct ndb_builder builder; 150 jsmn_parser json_parser; 151 jsmntok_t *toks, *toks_end; 152 int i; 153 int num_tokens; 154 }; 155 156 // useful to pass to threads on its own 157 struct ndb_lmdb { 158 MDB_env *env; 159 MDB_dbi dbs[NDB_DBS]; 160 }; 161 162 struct ndb_writer { 163 struct ndb_lmdb *lmdb; 164 struct ndb_monitor *monitor; 165 166 uint32_t ndb_flags; 167 void *queue_buf; 168 int queue_buflen; 169 pthread_t thread_id; 170 171 struct prot_queue inbox; 172 }; 173 174 struct ndb_ingester { 175 struct ndb_lmdb *lmdb; 176 uint32_t flags; 177 struct threadpool tp; 178 struct prot_queue *writer_inbox; 179 void *filter_context; 180 ndb_ingest_filter_fn filter; 181 }; 182 183 struct ndb_filter_group { 184 struct ndb_filter filters[MAX_FILTERS]; 185 int num_filters; 186 }; 187 188 struct ndb_subscription { 189 uint64_t subid; 190 struct ndb_filter_group group; 191 struct prot_queue inbox; 192 }; 193 194 struct ndb_monitor { 195 struct ndb_subscription subscriptions[MAX_SUBSCRIPTIONS]; 196 ndb_sub_fn sub_cb; 197 void *sub_cb_ctx; 198 int num_subscriptions; 199 200 // monitor isn't a full inbox. We want pollers to be able to poll 201 // subscriptions efficiently without going through a message queue, so 202 // we use a simple mutex here. 203 pthread_mutex_t mutex; 204 }; 205 206 struct ndb { 207 struct ndb_lmdb lmdb; 208 struct ndb_ingester ingester; 209 struct ndb_monitor monitor; 210 struct ndb_writer writer; 211 int version; 212 uint32_t flags; // setting flags 213 // lmdb environ handles, etc 214 }; 215 216 /// 217 /// Query Plans 218 /// 219 /// There are general strategies for performing certain types of query 220 /// depending on the filter. For example, for large contact list queries 221 /// with many authors, we simply do a descending scan on created_at 222 /// instead of doing 1000s of pubkey scans. 223 /// 224 /// Query plans are calculated from filters via `ndb_filter_plan` 225 /// 226 enum ndb_query_plan { 227 NDB_PLAN_KINDS, 228 NDB_PLAN_IDS, 229 NDB_PLAN_AUTHORS, 230 NDB_PLAN_AUTHOR_KINDS, 231 NDB_PLAN_CREATED, 232 NDB_PLAN_TAGS, 233 NDB_PLAN_SEARCH, 234 }; 235 236 // A id + u64 + timestamp 237 struct ndb_id_u64_ts { 238 unsigned char id[32]; // pubkey, id, etc 239 uint64_t u64; // kind, etc 240 uint64_t timestamp; 241 }; 242 243 // A clustered key with an id and a timestamp 244 struct ndb_tsid { 245 unsigned char id[32]; 246 uint64_t timestamp; 247 }; 248 249 // A u64 + timestamp id. Just using this for kinds at the moment. 250 struct ndb_u64_ts { 251 uint64_t u64; // kind, etc 252 uint64_t timestamp; 253 }; 254 255 struct ndb_word 256 { 257 const char *word; 258 int word_len; 259 }; 260 261 struct ndb_search_words 262 { 263 struct ndb_word words[MAX_TEXT_SEARCH_WORDS]; 264 int num_words; 265 }; 266 267 268 static inline int is_replaceable_kind(uint64_t kind) 269 { 270 return kind == 0 || kind == 3 271 || (10000 <= kind && kind < 20000) 272 || (30000 <= kind && kind < 40000); 273 } 274 275 276 // ndb_text_search_key 277 // 278 // This is compressed when in lmdb: 279 // 280 // note_id: varint 281 // strlen: varint 282 // str: cstr 283 // timestamp: varint 284 // word_index: varint 285 // 286 static int ndb_make_text_search_key(unsigned char *buf, int bufsize, 287 int word_index, int word_len, const char *str, 288 uint64_t timestamp, uint64_t note_id, 289 int *keysize) 290 { 291 struct cursor cur; 292 make_cursor(buf, buf + bufsize, &cur); 293 294 // TODO: need update this to uint64_t 295 // we push this first because our query function can pull this off 296 // quickly to check matches 297 if (!cursor_push_varint(&cur, (int32_t)note_id)) 298 return 0; 299 300 // string length 301 if (!cursor_push_varint(&cur, word_len)) 302 return 0; 303 304 // non-null terminated, lowercase string 305 if (!cursor_push_lowercase(&cur, str, word_len)) 306 return 0; 307 308 // TODO: need update this to uint64_t 309 if (!cursor_push_varint(&cur, (int)timestamp)) 310 return 0; 311 312 // the index of the word in the content so that we can do more accurate 313 // phrase searches 314 if (!cursor_push_varint(&cur, word_index)) 315 return 0; 316 317 // pad to 8-byte alignment 318 if (!cursor_align(&cur, 8)) 319 return 0; 320 321 *keysize = cur.p - cur.start; 322 assert((*keysize % 8) == 0); 323 324 return 1; 325 } 326 327 static int ndb_make_noted_text_search_key(unsigned char *buf, int bufsize, 328 int wordlen, const char *word, 329 uint64_t timestamp, uint64_t note_id, 330 int *keysize) 331 { 332 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 333 timestamp, note_id, keysize); 334 } 335 336 static int ndb_make_text_search_key_low(unsigned char *buf, int bufsize, 337 int wordlen, const char *word, 338 uint64_t since, 339 int *keysize) 340 { 341 uint64_t note_id; 342 note_id = 0; 343 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 344 since, note_id, keysize); 345 } 346 347 static int ndb_make_text_search_key_high(unsigned char *buf, int bufsize, 348 int wordlen, const char *word, 349 uint64_t until, 350 int *keysize) 351 { 352 uint64_t note_id; 353 note_id = INT32_MAX; 354 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 355 until, note_id, keysize); 356 } 357 358 typedef int (*ndb_text_search_key_order_fn)(unsigned char *buf, int bufsize, int wordlen, const char *word, uint64_t timestamp, int *keysize); 359 360 /** From LMDB: Compare two items lexically */ 361 static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) { 362 int diff; 363 ssize_t len_diff; 364 unsigned int len; 365 366 len = a->mv_size; 367 len_diff = (ssize_t) a->mv_size - (ssize_t) b->mv_size; 368 if (len_diff > 0) { 369 len = b->mv_size; 370 len_diff = 1; 371 } 372 373 diff = memcmp(a->mv_data, b->mv_data, len); 374 return diff ? diff : len_diff<0 ? -1 : len_diff; 375 } 376 377 static int ndb_tag_key_compare(const MDB_val *a, const MDB_val *b) 378 { 379 MDB_val va, vb; 380 uint64_t ts_a, ts_b; 381 int cmp; 382 383 va.mv_data = a->mv_data; 384 va.mv_size = a->mv_size - 8; 385 386 vb.mv_data = b->mv_data; 387 vb.mv_size = b->mv_size - 8; 388 389 if ((cmp = mdb_cmp_memn(&va, &vb))) 390 return cmp; 391 392 ts_a = *(uint64_t*)((unsigned char *)va.mv_data + va.mv_size); 393 ts_b = *(uint64_t*)((unsigned char *)vb.mv_data + vb.mv_size); 394 395 if (ts_a < ts_b) 396 return -1; 397 else if (ts_a > ts_b) 398 return 1; 399 400 return 0; 401 } 402 403 static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b) 404 { 405 struct cursor ca, cb; 406 uint64_t sa, sb, nid_a, nid_b; 407 MDB_val a2, b2; 408 409 make_cursor(a->mv_data, (unsigned char *)a->mv_data + a->mv_size, &ca); 410 make_cursor(b->mv_data, (unsigned char *)b->mv_data + b->mv_size, &cb); 411 412 // note_id 413 if (unlikely(!cursor_pull_varint(&ca, &nid_a) || !cursor_pull_varint(&cb, &nid_b))) 414 return 0; 415 416 // string size 417 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 418 return 0; 419 420 a2.mv_data = ca.p; 421 a2.mv_size = sa; 422 423 b2.mv_data = cb.p; 424 b2.mv_size = sb; 425 426 int cmp = mdb_cmp_memn(&a2, &b2); 427 if (cmp) return cmp; 428 429 // skip over string 430 ca.p += sa; 431 cb.p += sb; 432 433 // timestamp 434 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 435 return 0; 436 437 if (sa < sb) return -1; 438 else if (sa > sb) return 1; 439 440 // note_id 441 if (nid_a < nid_b) return -1; 442 else if (nid_a > nid_b) return 1; 443 444 // word index 445 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 446 return 0; 447 448 if (sa < sb) return -1; 449 else if (sa > sb) return 1; 450 451 return 0; 452 } 453 454 static inline int ndb_unpack_text_search_key_noteid( 455 struct cursor *cur, uint64_t *note_id) 456 { 457 if (!cursor_pull_varint(cur, note_id)) 458 return 0; 459 460 return 1; 461 } 462 463 // faster peek of just the string instead of unpacking everything 464 // this is used to quickly discard range query matches if there is no 465 // common prefix 466 static inline int ndb_unpack_text_search_key_string(struct cursor *cur, 467 const char **str, 468 int *str_len) 469 { 470 uint64_t len; 471 472 if (!cursor_pull_varint(cur, &len)) 473 return 0; 474 475 *str_len = len; 476 477 *str = (const char *)cur->p; 478 479 if (!cursor_skip(cur, *str_len)) 480 return 0; 481 482 return 1; 483 } 484 485 // should be called after ndb_unpack_text_search_key_string. It continues 486 // the unpacking of a text search key if we've already started it. 487 static inline int 488 ndb_unpack_remaining_text_search_key(struct cursor *cur, 489 struct ndb_text_search_key *key) 490 { 491 if (!cursor_pull_varint(cur, &key->timestamp)) 492 return 0; 493 494 if (!cursor_pull_varint(cur, &key->word_index)) 495 return 0; 496 497 return 1; 498 } 499 500 // unpack a fulltext search key 501 // 502 // full version of string + unpack remaining. This is split up because text 503 // searching only requires to pull the string for prefix searching, and the 504 // remaining is optional 505 static inline int ndb_unpack_text_search_key(unsigned char *p, int len, 506 struct ndb_text_search_key *key) 507 { 508 struct cursor c; 509 make_cursor(p, p + len, &c); 510 511 if (!ndb_unpack_text_search_key_noteid(&c, &key->note_id)) 512 return 0; 513 514 if (!ndb_unpack_text_search_key_string(&c, &key->str, &key->str_len)) 515 return 0; 516 517 return ndb_unpack_remaining_text_search_key(&c, key); 518 } 519 520 // Copies only lowercase characters to the destination string and fills the rest with null bytes. 521 // `dst` and `src` are pointers to the destination and source strings, respectively. 522 // `n` is the maximum number of characters to copy. 523 static void lowercase_strncpy(char *dst, const char *src, int n) { 524 int j = 0, i = 0; 525 526 if (!dst || !src || n == 0) { 527 return; 528 } 529 530 while (src[i] != '\0' && j < n) { 531 dst[j++] = tolower(src[i++]); 532 } 533 534 // Null-terminate and fill the destination string 535 while (j < n) { 536 dst[j++] = '\0'; 537 } 538 } 539 540 static inline int ndb_filter_elem_is_ptr(struct ndb_filter_field *field) { 541 return field->elem_type == NDB_ELEMENT_STRING || field->elem_type == NDB_ELEMENT_ID; 542 } 543 544 // Copy the filter 545 int ndb_filter_clone(struct ndb_filter *dst, struct ndb_filter *src) 546 { 547 size_t src_size, elem_size, data_size; 548 549 memcpy(dst, src, sizeof(*src)); 550 551 elem_size = src->elem_buf.end - src->elem_buf.start; 552 data_size = src->data_buf.end - src->data_buf.start; 553 src_size = data_size + elem_size; 554 555 // let's only allow finalized filters to be cloned 556 if (!src || !src->finalized) 557 return 0; 558 559 dst->elem_buf.start = malloc(src_size); 560 dst->elem_buf.end = dst->elem_buf.start + elem_size; 561 dst->elem_buf.p = dst->elem_buf.end; 562 563 dst->data_buf.start = dst->elem_buf.start + elem_size; 564 dst->data_buf.end = dst->data_buf.start + data_size; 565 dst->data_buf.p = dst->data_buf.end; 566 567 if (dst->elem_buf.start == NULL) 568 return 0; 569 570 memcpy(dst->elem_buf.start, src->elem_buf.start, src_size); 571 572 return 1; 573 } 574 575 // "Finalize" the filter. This resizes the allocated heap buffers so that they 576 // are as small as possible. This also prevents new fields from being added 577 int ndb_filter_end(struct ndb_filter *filter) 578 { 579 #ifdef NDB_LOG 580 size_t orig_size; 581 #endif 582 size_t data_len, elem_len; 583 if (filter->finalized == 1) 584 return 0; 585 586 // move the data buffer to the end of the element buffer and update 587 // all of the element pointers accordingly 588 data_len = filter->data_buf.p - filter->data_buf.start; 589 elem_len = filter->elem_buf.p - filter->elem_buf.start; 590 #ifdef NDB_LOG 591 orig_size = filter->data_buf.end - filter->elem_buf.start; 592 #endif 593 594 // cap the elem buff 595 filter->elem_buf.end = filter->elem_buf.p; 596 597 // move the data buffer to the end of the element buffer 598 memmove(filter->elem_buf.p, filter->data_buf.start, data_len); 599 600 // realloc the whole thing 601 filter->elem_buf.start = realloc(filter->elem_buf.start, elem_len + data_len); 602 filter->elem_buf.end = filter->elem_buf.start + elem_len; 603 filter->elem_buf.p = filter->elem_buf.end; 604 605 filter->data_buf.start = filter->elem_buf.end; 606 filter->data_buf.end = filter->data_buf.start + data_len; 607 filter->data_buf.p = filter->data_buf.end; 608 609 filter->finalized = 1; 610 611 ndb_debug("ndb_filter_end: %ld -> %ld\n", orig_size, elem_len + data_len); 612 613 return 1; 614 } 615 616 static inline struct ndb_filter_elements * 617 ndb_filter_get_elements_by_offset(const struct ndb_filter *filter, int offset) 618 { 619 struct ndb_filter_elements *els; 620 621 if (offset < 0) 622 return NULL; 623 624 els = (struct ndb_filter_elements *)(filter->elem_buf.start + offset); 625 626 if ((unsigned char *)els > filter->elem_buf.p) 627 return NULL; 628 629 return els; 630 } 631 632 struct ndb_filter_elements * 633 ndb_filter_current_element(const struct ndb_filter *filter) 634 { 635 return ndb_filter_get_elements_by_offset(filter, filter->current); 636 } 637 638 struct ndb_filter_elements * 639 ndb_filter_get_elements(const struct ndb_filter *filter, int index) 640 { 641 if (filter->num_elements <= 0) 642 return NULL; 643 644 if (index > filter->num_elements-1) 645 return NULL; 646 647 return ndb_filter_get_elements_by_offset(filter, filter->elements[index]); 648 } 649 650 static inline unsigned char * 651 ndb_filter_elements_data(const struct ndb_filter *filter, int offset) 652 { 653 unsigned char *data; 654 655 if (offset < 0) 656 return NULL; 657 658 data = filter->data_buf.start + offset; 659 if (data > filter->data_buf.p) 660 return NULL; 661 662 return data; 663 } 664 665 unsigned char * 666 ndb_filter_get_id_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index) 667 { 668 return ndb_filter_elements_data(filter, els->elements[index]); 669 } 670 671 const char * 672 ndb_filter_get_string_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index) 673 { 674 return (const char *)ndb_filter_elements_data(filter, els->elements[index]); 675 } 676 677 uint64_t * 678 ndb_filter_get_int_element_ptr(struct ndb_filter_elements *els, int index) 679 { 680 return &els->elements[index]; 681 } 682 683 uint64_t 684 ndb_filter_get_int_element(const struct ndb_filter_elements *els, int index) 685 { 686 return els->elements[index]; 687 } 688 689 int ndb_filter_init_with(struct ndb_filter *filter, int pages) 690 { 691 struct cursor cur; 692 int page_size, elem_size, data_size, buf_size; 693 694 page_size = 4096; // assuming this, not a big deal if we're wrong 695 696 buf_size = page_size * pages; 697 elem_size = buf_size / 4; 698 data_size = buf_size - elem_size; 699 700 unsigned char *buf = malloc(buf_size); 701 if (!buf) 702 return 0; 703 704 // init memory arena for the cursor 705 make_cursor(buf, buf + buf_size, &cur); 706 707 cursor_slice(&cur, &filter->elem_buf, elem_size); 708 cursor_slice(&cur, &filter->data_buf, data_size); 709 710 // make sure we are fully allocated 711 assert(cur.p == cur.end); 712 713 // make sure elem_buf is the start of the buffer 714 assert(filter->elem_buf.start == cur.start); 715 716 filter->num_elements = 0; 717 filter->elements[0] = 0; 718 filter->current = -1; 719 filter->finalized = 0; 720 721 return 1; 722 } 723 724 int ndb_filter_init(struct ndb_filter *filter) { 725 return ndb_filter_init_with(filter, NDB_FILTER_PAGES); 726 } 727 728 void ndb_filter_destroy(struct ndb_filter *filter) 729 { 730 if (filter->elem_buf.start) 731 free(filter->elem_buf.start); 732 733 memset(filter, 0, sizeof(*filter)); 734 } 735 736 static const char *ndb_filter_field_name(enum ndb_filter_fieldtype field) 737 { 738 switch (field) { 739 case NDB_FILTER_IDS: return "ids"; 740 case NDB_FILTER_AUTHORS: return "authors"; 741 case NDB_FILTER_KINDS: return "kinds"; 742 case NDB_FILTER_TAGS: return "tags"; 743 case NDB_FILTER_SINCE: return "since"; 744 case NDB_FILTER_UNTIL: return "until"; 745 case NDB_FILTER_LIMIT: return "limit"; 746 case NDB_FILTER_SEARCH: return "search"; 747 } 748 749 return "unknown"; 750 } 751 752 static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filter_fieldtype field, char tag) 753 { 754 int i; 755 struct ndb_filter_elements *els, *el; 756 757 if (ndb_filter_current_element(filter)) { 758 fprintf(stderr, "ndb_filter_start_field: filter field already in progress, did you forget to call ndb_filter_end_field?\n"); 759 return 0; 760 } 761 762 // you can only start and end fields once 763 for (i = 0; i < filter->num_elements; i++) { 764 el = ndb_filter_get_elements(filter, i); 765 assert(el); 766 // TODO: fix this tags check to try to find matching tags 767 if (el->field.type == field && field != NDB_FILTER_TAGS) { 768 fprintf(stderr, "ndb_filter_start_field: field '%s' already exists\n", 769 ndb_filter_field_name(field)); 770 return 0; 771 } 772 } 773 774 filter->current = filter->elem_buf.p - filter->elem_buf.start; 775 els = ndb_filter_current_element(filter); 776 assert(els); 777 778 // advance elem buffer to the variable data section 779 if (!cursor_skip(&filter->elem_buf, sizeof(struct ndb_filter_elements))) { 780 fprintf(stderr, "ndb_filter_start_field: '%s' oom (todo: realloc?)\n", 781 ndb_filter_field_name(field)); 782 return 0; 783 } 784 785 els->field.type = field; 786 els->field.tag = tag; 787 els->field.elem_type = 0; 788 els->count = 0; 789 790 return 1; 791 } 792 793 int ndb_filter_start_field(struct ndb_filter *filter, enum ndb_filter_fieldtype field) 794 { 795 return ndb_filter_start_field_impl(filter, field, 0); 796 } 797 798 int ndb_filter_start_tag_field(struct ndb_filter *filter, char tag) 799 { 800 return ndb_filter_start_field_impl(filter, NDB_FILTER_TAGS, tag); 801 } 802 803 static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_element el) 804 { 805 struct ndb_filter_elements *current; 806 uint64_t offset; 807 808 if (!(current = ndb_filter_current_element(filter))) 809 return 0; 810 811 offset = filter->data_buf.p - filter->data_buf.start; 812 813 switch (current->field.type) { 814 case NDB_FILTER_IDS: 815 case NDB_FILTER_AUTHORS: 816 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 817 return 0; 818 break; 819 case NDB_FILTER_KINDS: 820 offset = el.integer; 821 break; 822 case NDB_FILTER_SINCE: 823 case NDB_FILTER_UNTIL: 824 case NDB_FILTER_LIMIT: 825 // only one allowed for since/until 826 if (current->count != 0) 827 return 0; 828 offset = el.integer; 829 break; 830 case NDB_FILTER_SEARCH: 831 if (current->field.elem_type != NDB_ELEMENT_STRING) { 832 return 0; 833 } 834 if (!cursor_push(&filter->data_buf, (unsigned char *)el.string.string, el.string.len)) 835 return 0; 836 if (!cursor_push_byte(&filter->data_buf, 0)) 837 return 0; 838 break; 839 case NDB_FILTER_TAGS: 840 switch (current->field.elem_type) { 841 case NDB_ELEMENT_ID: 842 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 843 return 0; 844 break; 845 case NDB_ELEMENT_STRING: 846 if (!cursor_push(&filter->data_buf, (unsigned char *)el.string.string, el.string.len)) 847 return 0; 848 if (!cursor_push_byte(&filter->data_buf, 0)) 849 return 0; 850 break; 851 case NDB_ELEMENT_INT: 852 // ints are not allowed in tag filters 853 case NDB_ELEMENT_UNKNOWN: 854 return 0; 855 } 856 // push a pointer of the string in the databuf as an element 857 break; 858 } 859 860 if (!cursor_push(&filter->elem_buf, (unsigned char *)&offset, 861 sizeof(offset))) { 862 return 0; 863 } 864 865 current->count++; 866 867 return 1; 868 } 869 870 static int ndb_filter_set_elem_type(struct ndb_filter *filter, 871 enum ndb_generic_element_type elem_type) 872 { 873 enum ndb_generic_element_type current_elem_type; 874 struct ndb_filter_elements *current; 875 876 if (!(current = ndb_filter_current_element(filter))) 877 return 0; 878 879 current_elem_type = current->field.elem_type; 880 881 // element types must be uniform 882 if (current_elem_type != elem_type && current_elem_type != NDB_ELEMENT_UNKNOWN) { 883 fprintf(stderr, "ndb_filter_set_elem_type: element types must be uniform\n"); 884 return 0; 885 } 886 887 current->field.elem_type = elem_type; 888 889 return 1; 890 } 891 892 893 int ndb_filter_add_str_element_len(struct ndb_filter *filter, const char *str, int len) 894 { 895 union ndb_filter_element el; 896 struct ndb_filter_elements *current; 897 898 if (!(current = ndb_filter_current_element(filter))) 899 return 0; 900 901 // only generic tags and search queries are allowed to have strings 902 switch (current->field.type) { 903 case NDB_FILTER_SINCE: 904 case NDB_FILTER_UNTIL: 905 case NDB_FILTER_LIMIT: 906 case NDB_FILTER_IDS: 907 case NDB_FILTER_AUTHORS: 908 case NDB_FILTER_KINDS: 909 return 0; 910 case NDB_FILTER_SEARCH: 911 if (current->count == 1) { 912 // you can't add more than one string to a search 913 return 0; 914 } 915 break; 916 case NDB_FILTER_TAGS: 917 break; 918 } 919 920 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_STRING)) 921 return 0; 922 923 el.string.string = str; 924 el.string.len = len; 925 926 return ndb_filter_add_element(filter, el); 927 } 928 929 int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str) 930 { 931 return ndb_filter_add_str_element_len(filter, str, strlen(str)); 932 } 933 934 int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer) 935 { 936 union ndb_filter_element el; 937 struct ndb_filter_elements *current; 938 if (!(current = ndb_filter_current_element(filter))) 939 return 0; 940 941 switch (current->field.type) { 942 case NDB_FILTER_IDS: 943 case NDB_FILTER_AUTHORS: 944 case NDB_FILTER_TAGS: 945 case NDB_FILTER_SEARCH: 946 return 0; 947 case NDB_FILTER_KINDS: 948 case NDB_FILTER_SINCE: 949 case NDB_FILTER_UNTIL: 950 case NDB_FILTER_LIMIT: 951 break; 952 } 953 954 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_INT)) 955 return 0; 956 957 el.integer = integer; 958 959 return ndb_filter_add_element(filter, el); 960 } 961 962 int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id) 963 { 964 union ndb_filter_element el; 965 struct ndb_filter_elements *current; 966 967 if (!(current = ndb_filter_current_element(filter))) 968 return 0; 969 970 // only certain filter types allow pushing id elements 971 switch (current->field.type) { 972 case NDB_FILTER_SINCE: 973 case NDB_FILTER_UNTIL: 974 case NDB_FILTER_LIMIT: 975 case NDB_FILTER_KINDS: 976 case NDB_FILTER_SEARCH: 977 return 0; 978 case NDB_FILTER_IDS: 979 case NDB_FILTER_AUTHORS: 980 case NDB_FILTER_TAGS: 981 break; 982 } 983 984 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_ID)) 985 return 0; 986 987 // this is needed so that generic filters know its an id 988 el.id = id; 989 990 return ndb_filter_add_element(filter, el); 991 } 992 993 static int ndb_tag_filter_matches(struct ndb_filter *filter, 994 struct ndb_filter_elements *els, 995 struct ndb_note *note) 996 { 997 int i; 998 const unsigned char *id; 999 const char *el_str; 1000 struct ndb_iterator iter, *it = &iter; 1001 struct ndb_str str; 1002 1003 ndb_tags_iterate_start(note, it); 1004 1005 while (ndb_tags_iterate_next(it)) { 1006 // we're looking for tags with 2 or more entries: ["p", id], etc 1007 if (it->tag->count < 2) 1008 continue; 1009 1010 str = ndb_tag_str(note, it->tag, 0); 1011 1012 // we only care about packed strings (single char, etc) 1013 if (str.flag != NDB_PACKED_STR) 1014 continue; 1015 1016 // do we have #e matching e (or p, etc) 1017 if (str.str[0] != els->field.tag || str.str[1] != 0) 1018 continue; 1019 1020 str = ndb_tag_str(note, it->tag, 1); 1021 1022 switch (els->field.elem_type) { 1023 case NDB_ELEMENT_ID: 1024 // if our filter element type is an id, then we 1025 // expect a packed id in the tag, otherwise skip 1026 if (str.flag != NDB_PACKED_ID) 1027 continue; 1028 break; 1029 case NDB_ELEMENT_STRING: 1030 // if our filter element type is a string, then 1031 // we should not expect an id 1032 if (str.flag == NDB_PACKED_ID) 1033 continue; 1034 1035 break; 1036 case NDB_ELEMENT_UNKNOWN: 1037 default: 1038 // For some reason the element type is not set. It's 1039 // possible nothing was added to the generic filter? 1040 // Let's just fail here and log a note for debugging 1041 fprintf(stderr, "UNUSUAL ndb_tag_filter_matches: have unknown element type %d\n", els->field.elem_type); 1042 return 0; 1043 } 1044 1045 for (i = 0; i < els->count; i++) { 1046 switch (els->field.elem_type) { 1047 case NDB_ELEMENT_ID: 1048 id = ndb_filter_get_id_element(filter, els, i); 1049 if (!memcmp(id, str.id, 32)) 1050 return 1; 1051 break; 1052 case NDB_ELEMENT_STRING: 1053 el_str = ndb_filter_get_string_element(filter, els, i); 1054 if (!strcmp(el_str, str.str)) 1055 return 1; 1056 break; 1057 case NDB_ELEMENT_INT: 1058 // int elements int tag queries are not supported 1059 case NDB_ELEMENT_UNKNOWN: 1060 return 0; 1061 } 1062 } 1063 } 1064 1065 return 0; 1066 } 1067 1068 struct search_id_state { 1069 struct ndb_filter *filter; 1070 struct ndb_filter_elements *els; 1071 unsigned char *key; 1072 }; 1073 1074 static int compare_ids(const void *pa, const void *pb) 1075 { 1076 unsigned char *a = *(unsigned char **)pa; 1077 unsigned char *b = *(unsigned char **)pb; 1078 1079 return memcmp(a, b, 32); 1080 } 1081 1082 static int search_ids(const void *ctx, const void *mid_ptr) 1083 { 1084 struct search_id_state *state; 1085 unsigned char *mid_id; 1086 uint32_t mid; 1087 1088 state = (struct search_id_state *)ctx; 1089 mid = *(uint32_t *)mid_ptr; 1090 1091 mid_id = ndb_filter_elements_data(state->filter, mid); 1092 assert(mid_id); 1093 1094 return memcmp(state->key, mid_id, 32); 1095 } 1096 1097 static int compare_kinds(const void *pa, const void *pb) 1098 { 1099 1100 // NOTE: this should match type in `union ndb_filter_element` 1101 uint64_t a = *(uint64_t *)pa; 1102 uint64_t b = *(uint64_t *)pb; 1103 1104 if (a < b) { 1105 return -1; 1106 } else if (a > b) { 1107 return 1; 1108 } else { 1109 return 0; 1110 } 1111 } 1112 1113 // 1114 // returns 1 if a filter matches a note 1115 static int ndb_filter_matches_with(struct ndb_filter *filter, 1116 struct ndb_note *note, int already_matched) 1117 { 1118 int i, j; 1119 struct ndb_filter_elements *els; 1120 struct search_id_state state; 1121 1122 state.filter = filter; 1123 1124 for (i = 0; i < filter->num_elements; i++) { 1125 els = ndb_filter_get_elements(filter, i); 1126 state.els = els; 1127 assert(els); 1128 1129 // if we know we already match from a query scan result, 1130 // we can skip this check 1131 if ((1 << els->field.type) & already_matched) 1132 continue; 1133 1134 switch (els->field.type) { 1135 case NDB_FILTER_KINDS: 1136 for (j = 0; j < els->count; j++) { 1137 if ((unsigned int)els->elements[j] == note->kind) 1138 goto cont; 1139 } 1140 break; 1141 case NDB_FILTER_IDS: 1142 state.key = ndb_note_id(note); 1143 if (bsearch(&state, &els->elements[0], els->count, 1144 sizeof(els->elements[0]), search_ids)) { 1145 continue; 1146 } 1147 break; 1148 case NDB_FILTER_AUTHORS: 1149 state.key = ndb_note_pubkey(note); 1150 if (bsearch(&state, &els->elements[0], els->count, 1151 sizeof(els->elements[0]), search_ids)) { 1152 continue; 1153 } 1154 break; 1155 case NDB_FILTER_TAGS: 1156 if (ndb_tag_filter_matches(filter, els, note)) 1157 continue; 1158 break; 1159 case NDB_FILTER_SINCE: 1160 assert(els->count == 1); 1161 if (note->created_at >= els->elements[0]) 1162 continue; 1163 break; 1164 case NDB_FILTER_UNTIL: 1165 assert(els->count == 1); 1166 if (note->created_at < els->elements[0]) 1167 continue; 1168 break; 1169 case NDB_FILTER_SEARCH: 1170 // TODO: matching search filters will need an accelerated 1171 // data structure, like our minimal perfect hashmap 1172 // idea for mutewords. 1173 // 1174 // We'll also want to store tokenized words in the filter 1175 // itself, so that we can walk over each word and check 1176 // the hashmap to see if the note contains at least 1177 // one word. 1178 // 1179 // For now we always return true, since we assume 1180 // the search index will be walked for these kinds 1181 // of queries. 1182 continue; 1183 case NDB_FILTER_LIMIT: 1184 cont: 1185 continue; 1186 } 1187 1188 // all need to match 1189 return 0; 1190 } 1191 1192 return 1; 1193 } 1194 1195 int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note) 1196 { 1197 return ndb_filter_matches_with(filter, note, 0); 1198 } 1199 1200 // because elements are stored as offsets and qsort doesn't support context, 1201 // we do a dumb thing where we convert elements to pointers and back if we 1202 // are doing an id or string sort 1203 static void sort_filter_elements(struct ndb_filter *filter, 1204 struct ndb_filter_elements *els, 1205 int (*cmp)(const void *, const void *)) 1206 { 1207 int i; 1208 1209 assert(ndb_filter_elem_is_ptr(&els->field)); 1210 1211 for (i = 0; i < els->count; i++) 1212 els->elements[i] += (uint64_t)filter->data_buf.start; 1213 1214 qsort(&els->elements[0], els->count, sizeof(els->elements[0]), cmp); 1215 1216 for (i = 0; i < els->count; i++) 1217 els->elements[i] -= (uint64_t)filter->data_buf.start; 1218 } 1219 1220 static int ndb_filter_field_eq(struct ndb_filter *a_filt, 1221 struct ndb_filter_elements *a_field, 1222 struct ndb_filter *b_filt, 1223 struct ndb_filter_elements *b_field) 1224 { 1225 int i; 1226 const char *a_str, *b_str; 1227 unsigned char *a_id, *b_id; 1228 uint64_t a_int, b_int; 1229 1230 if (a_field->count != b_field->count) 1231 return 0; 1232 1233 if (a_field->field.type != b_field->field.type) { 1234 ndb_debug("UNUSUAL: field types do not match in ndb_filter_field_eq\n"); 1235 return 0; 1236 } 1237 1238 if (a_field->field.elem_type != b_field->field.elem_type) { 1239 ndb_debug("UNUSUAL: field element types do not match in ndb_filter_field_eq\n"); 1240 return 0; 1241 } 1242 1243 if (a_field->field.elem_type == NDB_ELEMENT_UNKNOWN) { 1244 ndb_debug("UNUSUAL: field element types are unknown\n"); 1245 return 0; 1246 } 1247 1248 for (i = 0; i < a_field->count; i++) { 1249 switch (a_field->field.elem_type) { 1250 case NDB_ELEMENT_UNKNOWN: 1251 return 0; 1252 case NDB_ELEMENT_STRING: 1253 a_str = ndb_filter_get_string_element(a_filt, a_field, i); 1254 b_str = ndb_filter_get_string_element(b_filt, b_field, i); 1255 if (strcmp(a_str, b_str)) 1256 return 0; 1257 break; 1258 case NDB_ELEMENT_ID: 1259 a_id = ndb_filter_get_id_element(a_filt, a_field, i); 1260 b_id = ndb_filter_get_id_element(b_filt, b_field, i); 1261 if (memcmp(a_id, b_id, 32)) 1262 return 0; 1263 break; 1264 case NDB_ELEMENT_INT: 1265 a_int = ndb_filter_get_int_element(a_field, i); 1266 b_int = ndb_filter_get_int_element(b_field, i); 1267 if (a_int != b_int) 1268 return 0; 1269 break; 1270 } 1271 } 1272 1273 return 1; 1274 } 1275 1276 void ndb_filter_end_field(struct ndb_filter *filter) 1277 { 1278 int cur_offset; 1279 struct ndb_filter_elements *cur; 1280 1281 cur_offset = filter->current; 1282 1283 if (!(cur = ndb_filter_current_element(filter))) 1284 return; 1285 1286 filter->elements[filter->num_elements++] = cur_offset; 1287 1288 // sort elements for binary search 1289 switch (cur->field.type) { 1290 case NDB_FILTER_IDS: 1291 case NDB_FILTER_AUTHORS: 1292 sort_filter_elements(filter, cur, compare_ids); 1293 break; 1294 case NDB_FILTER_KINDS: 1295 qsort(&cur->elements[0], cur->count, 1296 sizeof(cur->elements[0]), compare_kinds); 1297 break; 1298 case NDB_FILTER_TAGS: 1299 // TODO: generic tag search sorting 1300 break; 1301 case NDB_FILTER_SINCE: 1302 case NDB_FILTER_UNTIL: 1303 case NDB_FILTER_LIMIT: 1304 case NDB_FILTER_SEARCH: 1305 // don't need to sort these 1306 break; 1307 } 1308 1309 filter->current = -1; 1310 1311 } 1312 1313 static void ndb_filter_group_init(struct ndb_filter_group *group) 1314 { 1315 group->num_filters = 0; 1316 } 1317 1318 static int ndb_filter_group_add(struct ndb_filter_group *group, 1319 struct ndb_filter *filter) 1320 { 1321 if (group->num_filters + 1 > MAX_FILTERS) 1322 return 0; 1323 1324 return ndb_filter_clone(&group->filters[group->num_filters++], filter); 1325 } 1326 1327 static int ndb_filter_group_matches(struct ndb_filter_group *group, 1328 struct ndb_note *note) 1329 { 1330 int i; 1331 struct ndb_filter *filter; 1332 1333 if (group->num_filters == 0) 1334 return 1; 1335 1336 for (i = 0; i < group->num_filters; i++) { 1337 filter = &group->filters[i]; 1338 1339 if (ndb_filter_matches(filter, note)) 1340 return 1; 1341 } 1342 1343 return 0; 1344 } 1345 1346 static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id, 1347 uint64_t timestamp, const char *search) 1348 { 1349 memcpy(key->id, id, 32); 1350 key->timestamp = timestamp; 1351 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 1352 key->search[sizeof(key->search) - 1] = '\0'; 1353 } 1354 1355 static int ndb_write_profile_search_index(struct ndb_txn *txn, 1356 struct ndb_search_key *index_key, 1357 uint64_t profile_key) 1358 { 1359 int rc; 1360 MDB_val key, val; 1361 1362 key.mv_data = index_key; 1363 key.mv_size = sizeof(*index_key); 1364 val.mv_data = &profile_key; 1365 val.mv_size = sizeof(profile_key); 1366 1367 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 1368 &key, &val, 0))) 1369 { 1370 ndb_debug("ndb_write_profile_search_index failed: %s\n", 1371 mdb_strerror(rc)); 1372 return 0; 1373 } 1374 1375 return 1; 1376 } 1377 1378 1379 // map usernames and display names to profile keys for user searching 1380 static int ndb_write_profile_search_indices(struct ndb_txn *txn, 1381 struct ndb_note *note, 1382 uint64_t profile_key, 1383 void *profile_root) 1384 { 1385 struct ndb_search_key index; 1386 NdbProfileRecord_table_t profile_record; 1387 NdbProfile_table_t profile; 1388 1389 profile_record = NdbProfileRecord_as_root(profile_root); 1390 profile = NdbProfileRecord_profile_get(profile_record); 1391 1392 const char *name = NdbProfile_name_get(profile); 1393 const char *display_name = NdbProfile_display_name_get(profile); 1394 1395 // words + pubkey + created 1396 if (name) { 1397 ndb_make_search_key(&index, note->pubkey, note->created_at, 1398 name); 1399 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 1400 return 0; 1401 } 1402 1403 if (display_name) { 1404 // don't write the same name/display_name twice 1405 if (name && !strcmp(display_name, name)) { 1406 return 1; 1407 } 1408 ndb_make_search_key(&index, note->pubkey, note->created_at, 1409 display_name); 1410 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 1411 return 0; 1412 } 1413 1414 return 1; 1415 } 1416 1417 static inline void ndb_tsid_init(struct ndb_tsid *key, unsigned char *id, 1418 uint64_t timestamp) 1419 { 1420 memcpy(key->id, id, 32); 1421 key->timestamp = timestamp; 1422 } 1423 1424 static inline void ndb_tsid_low(struct ndb_tsid *key, unsigned char *id) 1425 { 1426 memcpy(key->id, id, 32); 1427 key->timestamp = 0; 1428 } 1429 1430 static inline void ndb_u64_ts_init(struct ndb_u64_ts *key, uint64_t integer, 1431 uint64_t timestamp) 1432 { 1433 key->u64 = integer; 1434 key->timestamp = timestamp; 1435 } 1436 1437 // useful for range-searching for the latest key with a clustered created_at timen 1438 static inline void ndb_tsid_high(struct ndb_tsid *key, const unsigned char *id) 1439 { 1440 memcpy(key->id, id, 32); 1441 key->timestamp = UINT64_MAX; 1442 } 1443 1444 static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags) 1445 { 1446 txn->lmdb = &ndb->lmdb; 1447 MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn; 1448 if (!txn->lmdb->env) 1449 return 0; 1450 return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0; 1451 } 1452 1453 int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn) 1454 { 1455 return _ndb_begin_query(ndb, txn, MDB_RDONLY); 1456 } 1457 1458 // this should only be used in migrations, etc 1459 static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn) 1460 { 1461 return _ndb_begin_query(ndb, txn, 0); 1462 } 1463 1464 static int ndb_db_is_index(enum ndb_dbs index) 1465 { 1466 switch (index) { 1467 case NDB_DB_NOTE: 1468 case NDB_DB_META: 1469 case NDB_DB_PROFILE: 1470 case NDB_DB_NOTE_ID: 1471 case NDB_DB_NDB_META: 1472 case NDB_DB_PROFILE_SEARCH: 1473 case NDB_DB_PROFILE_LAST_FETCH: 1474 case NDB_DBS: 1475 return 0; 1476 case NDB_DB_PROFILE_PK: 1477 case NDB_DB_NOTE_KIND: 1478 case NDB_DB_NOTE_TEXT: 1479 case NDB_DB_NOTE_BLOCKS: 1480 case NDB_DB_NOTE_TAGS: 1481 case NDB_DB_NOTE_PUBKEY: 1482 case NDB_DB_NOTE_PUBKEY_KIND: 1483 return 1; 1484 } 1485 1486 return 0; 1487 } 1488 1489 static inline void ndb_id_u64_ts_init(struct ndb_id_u64_ts *key, 1490 unsigned char *id, uint64_t iu64, 1491 uint64_t timestamp) 1492 { 1493 memcpy(key->id, id, 32); 1494 key->u64 = iu64; 1495 key->timestamp = timestamp; 1496 } 1497 1498 static int ndb_write_note_pubkey_index(struct ndb_txn *txn, struct ndb_note *note, 1499 uint64_t note_key) 1500 { 1501 int rc; 1502 struct ndb_tsid key; 1503 MDB_val k, v; 1504 1505 ndb_tsid_init(&key, ndb_note_pubkey(note), ndb_note_created_at(note)); 1506 1507 k.mv_data = &key; 1508 k.mv_size = sizeof(key); 1509 1510 v.mv_data = ¬e_key; 1511 v.mv_size = sizeof(note_key); 1512 1513 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY], &k, &v, 0))) { 1514 fprintf(stderr, "write note pubkey index failed: %s\n", 1515 mdb_strerror(rc)); 1516 return 0; 1517 } 1518 1519 return 1; 1520 } 1521 1522 static int ndb_write_note_pubkey_kind_index(struct ndb_txn *txn, 1523 struct ndb_note *note, 1524 uint64_t note_key) 1525 { 1526 int rc; 1527 struct ndb_id_u64_ts key; 1528 MDB_val k, v; 1529 1530 ndb_id_u64_ts_init(&key, ndb_note_pubkey(note), ndb_note_kind(note), 1531 ndb_note_created_at(note)); 1532 1533 k.mv_data = &key; 1534 k.mv_size = sizeof(key); 1535 1536 v.mv_data = ¬e_key; 1537 v.mv_size = sizeof(note_key); 1538 1539 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND], &k, &v, 0))) { 1540 fprintf(stderr, "write note pubkey_kind index failed: %s\n", 1541 mdb_strerror(rc)); 1542 return 0; 1543 } 1544 1545 return 1; 1546 } 1547 1548 1549 static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, int num_indices) 1550 { 1551 MDB_val k, v; 1552 MDB_cursor *cur; 1553 int i, drop_dbi, count, rc; 1554 uint64_t note_key; 1555 struct ndb_note *note; 1556 enum ndb_dbs index; 1557 1558 // 0 means empty, not delete the dbi 1559 drop_dbi = 0; 1560 1561 // ensure they are all index dbs 1562 for (i = 0; i < num_indices; i++) { 1563 if (!ndb_db_is_index(indices[i])) { 1564 fprintf(stderr, "ndb_rebuild_note_index: %s is not an index db\n", ndb_db_name(indices[i])); 1565 return -1; 1566 } 1567 } 1568 1569 // empty the index dbs before we rebuild 1570 for (i = 0; i < num_indices; i++) { 1571 index = indices[i]; 1572 if (mdb_drop(txn->mdb_txn, index, drop_dbi)) { 1573 fprintf(stderr, "ndb_rebuild_pubkey_index: mdb_drop failed for %s\n", ndb_db_name(index)); 1574 return -1; 1575 } 1576 } 1577 1578 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE], &cur))) { 1579 fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); 1580 return -1; 1581 } 1582 1583 count = 0; 1584 1585 // loop through all notes and write search indices 1586 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1587 note = v.mv_data; 1588 note_key = *((uint64_t*)k.mv_data); 1589 1590 for (i = 0; i < num_indices; i++) { 1591 index = indices[i]; 1592 switch (index) { 1593 case NDB_DB_NOTE: 1594 case NDB_DB_META: 1595 case NDB_DB_PROFILE: 1596 case NDB_DB_NOTE_ID: 1597 case NDB_DB_NDB_META: 1598 case NDB_DB_PROFILE_SEARCH: 1599 case NDB_DB_PROFILE_LAST_FETCH: 1600 case NDB_DBS: 1601 // this should never happen since we check at 1602 // the start 1603 count = -1; 1604 goto cleanup; 1605 case NDB_DB_PROFILE_PK: 1606 case NDB_DB_NOTE_KIND: 1607 case NDB_DB_NOTE_TEXT: 1608 case NDB_DB_NOTE_BLOCKS: 1609 case NDB_DB_NOTE_TAGS: 1610 fprintf(stderr, "%s index rebuild not supported yet. sorry.\n", ndb_db_name(index)); 1611 count = -1; 1612 goto cleanup; 1613 case NDB_DB_NOTE_PUBKEY: 1614 if (!ndb_write_note_pubkey_index(txn, note, note_key)) { 1615 count = -1; 1616 goto cleanup; 1617 } 1618 break; 1619 case NDB_DB_NOTE_PUBKEY_KIND: 1620 if (!ndb_write_note_pubkey_kind_index(txn, note, note_key)) { 1621 count = -1; 1622 goto cleanup; 1623 } 1624 break; 1625 } 1626 } 1627 1628 count++; 1629 } 1630 1631 cleanup: 1632 mdb_cursor_close(cur); 1633 1634 return count; 1635 } 1636 1637 1638 // Migrations 1639 // 1640 1641 // This was before we had note_profile_pubkey{,_kind} indices. Let's create them. 1642 static int ndb_migrate_profile_indices(struct ndb_txn *txn) 1643 { 1644 int count; 1645 1646 enum ndb_dbs indices[] = {NDB_DB_NOTE_PUBKEY, NDB_DB_NOTE_PUBKEY_KIND}; 1647 if ((count = ndb_rebuild_note_indices(txn, indices, 2)) != -1) { 1648 fprintf(stderr, "migrated %d notes to have pubkey and pubkey_kind indices\n", count); 1649 return 1; 1650 } else { 1651 fprintf(stderr, "error migrating notes to have pubkey and pubkey_kind indices, aborting.\n"); 1652 return 0; 1653 } 1654 } 1655 1656 static int ndb_migrate_user_search_indices(struct ndb_txn *txn) 1657 { 1658 int rc; 1659 MDB_cursor *cur; 1660 MDB_val k, v; 1661 void *profile_root; 1662 NdbProfileRecord_table_t record; 1663 struct ndb_note *note; 1664 uint64_t note_key, profile_key; 1665 size_t len; 1666 int count; 1667 1668 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { 1669 fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); 1670 return 0; 1671 } 1672 1673 count = 0; 1674 1675 // loop through all profiles and write search indices 1676 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1677 profile_root = v.mv_data; 1678 profile_key = *((uint64_t*)k.mv_data); 1679 record = NdbProfileRecord_as_root(profile_root); 1680 note_key = NdbProfileRecord_note_key(record); 1681 note = ndb_get_note_by_key(txn, note_key, &len); 1682 1683 if (note == NULL) { 1684 continue; 1685 } 1686 1687 if (!ndb_write_profile_search_indices(txn, note, profile_key, 1688 profile_root)) { 1689 fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n"); 1690 continue; 1691 } 1692 1693 count++; 1694 } 1695 1696 fprintf(stderr, "migrated %d profiles to include search indices\n", count); 1697 1698 mdb_cursor_close(cur); 1699 1700 return 1; 1701 } 1702 1703 static int ndb_migrate_lower_user_search_indices(struct ndb_txn *txn) 1704 { 1705 // just drop the search db so we can rebuild it 1706 if (mdb_drop(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 0)) { 1707 fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n"); 1708 return 0; 1709 } 1710 1711 return ndb_migrate_user_search_indices(txn); 1712 } 1713 1714 int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile); 1715 1716 1717 int ndb_db_version(struct ndb_txn *txn) 1718 { 1719 uint64_t version, version_key; 1720 MDB_val k, v; 1721 1722 version_key = NDB_META_KEY_VERSION; 1723 k.mv_data = &version_key; 1724 k.mv_size = sizeof(version_key); 1725 1726 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &k, &v)) { 1727 version = -1; 1728 } else { 1729 if (v.mv_size != 8) { 1730 fprintf(stderr, "run_migrations: invalid version size?"); 1731 return 0; 1732 } 1733 version = *((uint64_t*)v.mv_data); 1734 } 1735 1736 return version; 1737 } 1738 1739 // custom pubkey+kind+timestamp comparison function. This is used by lmdb to 1740 // perform b+ tree searches over the pubkey+kind+timestamp index 1741 static int ndb_id_u64_ts_compare(const MDB_val *a, const MDB_val *b) 1742 { 1743 struct ndb_id_u64_ts *tsa, *tsb; 1744 MDB_val a2 = *a, b2 = *b; 1745 1746 a2.mv_size = sizeof(tsa->id); 1747 b2.mv_size = sizeof(tsb->id); 1748 1749 int cmp = mdb_cmp_memn(&a2, &b2); 1750 if (cmp) return cmp; 1751 1752 tsa = a->mv_data; 1753 tsb = b->mv_data; 1754 1755 if (tsa->u64 < tsb->u64) 1756 return -1; 1757 else if (tsa->u64 > tsb->u64) 1758 return 1; 1759 1760 if (tsa->timestamp < tsb->timestamp) 1761 return -1; 1762 else if (tsa->timestamp > tsb->timestamp) 1763 return 1; 1764 1765 return 0; 1766 } 1767 1768 // custom kind+timestamp comparison function. This is used by lmdb to perform 1769 // b+ tree searches over the kind+timestamp index 1770 static int ndb_u64_ts_compare(const MDB_val *a, const MDB_val *b) 1771 { 1772 struct ndb_u64_ts *tsa, *tsb; 1773 tsa = a->mv_data; 1774 tsb = b->mv_data; 1775 1776 if (tsa->u64 < tsb->u64) 1777 return -1; 1778 else if (tsa->u64 > tsb->u64) 1779 return 1; 1780 1781 if (tsa->timestamp < tsb->timestamp) 1782 return -1; 1783 else if (tsa->timestamp > tsb->timestamp) 1784 return 1; 1785 1786 return 0; 1787 } 1788 1789 static int ndb_tsid_compare(const MDB_val *a, const MDB_val *b) 1790 { 1791 struct ndb_tsid *tsa, *tsb; 1792 MDB_val a2 = *a, b2 = *b; 1793 1794 a2.mv_size = sizeof(tsa->id); 1795 b2.mv_size = sizeof(tsb->id); 1796 1797 int cmp = mdb_cmp_memn(&a2, &b2); 1798 if (cmp) return cmp; 1799 1800 tsa = a->mv_data; 1801 tsb = b->mv_data; 1802 1803 if (tsa->timestamp < tsb->timestamp) 1804 return -1; 1805 else if (tsa->timestamp > tsb->timestamp) 1806 return 1; 1807 return 0; 1808 } 1809 1810 enum ndb_ingester_msgtype { 1811 NDB_INGEST_EVENT, // write json to the ingester queue for processing 1812 NDB_INGEST_QUIT, // kill ingester thread immediately 1813 }; 1814 1815 struct ndb_ingester_event { 1816 char *json; 1817 unsigned client : 1; // ["EVENT", {...}] messages 1818 unsigned len : 31; 1819 }; 1820 1821 struct ndb_writer_note { 1822 struct ndb_note *note; 1823 size_t note_len; 1824 }; 1825 1826 struct ndb_writer_profile { 1827 struct ndb_writer_note note; 1828 struct ndb_profile_record_builder record; 1829 }; 1830 1831 struct ndb_ingester_msg { 1832 enum ndb_ingester_msgtype type; 1833 union { 1834 struct ndb_ingester_event event; 1835 }; 1836 }; 1837 1838 struct ndb_writer_ndb_meta { 1839 // these are 64 bit because I'm paranoid of db-wide alignment issues 1840 uint64_t version; 1841 }; 1842 1843 // Used in the writer thread when writing ndb_profile_fetch_record's 1844 // kv = pubkey: recor 1845 struct ndb_writer_last_fetch { 1846 unsigned char pubkey[32]; 1847 uint64_t fetched_at; 1848 }; 1849 1850 // write note blocks 1851 struct ndb_writer_blocks { 1852 struct ndb_blocks *blocks; 1853 uint64_t note_key; 1854 }; 1855 1856 // The different types of messages that the writer thread can write to the 1857 // database 1858 struct ndb_writer_msg { 1859 enum ndb_writer_msgtype type; 1860 union { 1861 struct ndb_writer_note note; 1862 struct ndb_writer_profile profile; 1863 struct ndb_writer_ndb_meta ndb_meta; 1864 struct ndb_writer_last_fetch last_fetch; 1865 struct ndb_writer_blocks blocks; 1866 }; 1867 }; 1868 1869 static inline int ndb_writer_queue_msg(struct ndb_writer *writer, 1870 struct ndb_writer_msg *msg) 1871 { 1872 return prot_queue_push(&writer->inbox, msg); 1873 } 1874 1875 static uint64_t ndb_write_note_and_profile(struct ndb_txn *txn, struct ndb_writer_profile *profile, unsigned char *scratch, size_t scratch_size, uint32_t ndb_flags); 1876 static int ndb_migrate_utf8_profile_names(struct ndb_txn *txn) 1877 { 1878 int rc; 1879 MDB_cursor *cur; 1880 MDB_val k, v; 1881 void *profile_root; 1882 NdbProfileRecord_table_t record; 1883 struct ndb_note *note, *copied_note; 1884 uint64_t note_key; 1885 size_t len; 1886 int count, failed, ret; 1887 struct ndb_writer_profile profile; 1888 1889 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { 1890 fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc); 1891 return 0; 1892 } 1893 1894 size_t scratch_size = 8 * 1024 * 1024; 1895 unsigned char *scratch = malloc(scratch_size); 1896 1897 ret = 1; 1898 count = 0; 1899 failed = 0; 1900 1901 // loop through all profiles and write search indices 1902 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1903 profile_root = v.mv_data; 1904 record = NdbProfileRecord_as_root(profile_root); 1905 note_key = NdbProfileRecord_note_key(record); 1906 note = ndb_get_note_by_key(txn, note_key, &len); 1907 1908 if (note == NULL) { 1909 failed++; 1910 continue; 1911 } 1912 1913 struct ndb_profile_record_builder *b = &profile.record; 1914 1915 // reprocess profile 1916 if (!ndb_process_profile_note(note, b)) { 1917 failed++; 1918 continue; 1919 } 1920 1921 // the writer needs to own this note, and its expected to free it 1922 copied_note = malloc(len); 1923 memcpy(copied_note, note, len); 1924 1925 profile.note.note = copied_note; 1926 profile.note.note_len = len; 1927 1928 // we don't pass in flags when migrating... a bit sketchy but 1929 // whatever. noone is using this to customize nostrdb atm 1930 if (ndb_write_note_and_profile(txn, &profile, scratch, scratch_size, 0)) { 1931 count++; 1932 } 1933 } 1934 1935 fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count); 1936 1937 if (failed != 0) { 1938 fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed); 1939 } 1940 1941 free(scratch); 1942 mdb_cursor_close(cur); 1943 1944 return ret; 1945 } 1946 1947 static struct ndb_migration MIGRATIONS[] = { 1948 { .fn = ndb_migrate_user_search_indices }, 1949 { .fn = ndb_migrate_lower_user_search_indices }, 1950 { .fn = ndb_migrate_utf8_profile_names }, 1951 { .fn = ndb_migrate_profile_indices }, 1952 }; 1953 1954 1955 int ndb_end_query(struct ndb_txn *txn) 1956 { 1957 // this works on read or write queries. 1958 return mdb_txn_commit(txn->mdb_txn) == 0; 1959 } 1960 1961 int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32], 1962 unsigned char sig[64]) 1963 { 1964 secp256k1_xonly_pubkey xonly_pubkey; 1965 int ok; 1966 1967 ok = secp256k1_xonly_pubkey_parse((secp256k1_context*)ctx, &xonly_pubkey, 1968 pubkey) != 0; 1969 if (!ok) return 0; 1970 1971 ok = secp256k1_schnorrsig_verify((secp256k1_context*)ctx, sig, id, 32, 1972 &xonly_pubkey) > 0; 1973 if (!ok) return 0; 1974 1975 return 1; 1976 } 1977 1978 static void ndb_writer_last_profile_fetch(struct ndb_txn *txn, 1979 const unsigned char *pubkey, 1980 uint64_t fetched_at) 1981 { 1982 int rc; 1983 MDB_val key, val; 1984 1985 key.mv_data = (unsigned char*)pubkey; 1986 key.mv_size = 32; 1987 val.mv_data = &fetched_at; 1988 val.mv_size = sizeof(fetched_at); 1989 1990 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], 1991 &key, &val, 0))) 1992 { 1993 ndb_debug("write version to ndb_meta failed: %s\n", 1994 mdb_strerror(rc)); 1995 return; 1996 } 1997 1998 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 1999 } 2000 2001 2002 // We just received a profile that we haven't processed yet, but it could 2003 // be an older one! Make sure we only write last fetched profile if it's a new 2004 // one 2005 // 2006 // To do this, we first check the latest profile in the database. If the 2007 // created_date for this profile note is newer, then we write a 2008 // last_profile_fetch record, otherwise we do not. 2009 // 2010 // WARNING: This function is only valid when called from the writer thread 2011 static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn, 2012 struct ndb_note *note) 2013 { 2014 size_t len; 2015 uint64_t profile_key, note_key; 2016 void *root; 2017 struct ndb_note *last_profile; 2018 NdbProfileRecord_table_t record; 2019 2020 if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) { 2021 record = NdbProfileRecord_as_root(root); 2022 note_key = NdbProfileRecord_note_key(record); 2023 last_profile = ndb_get_note_by_key(txn, note_key, &len); 2024 if (last_profile == NULL) { 2025 return 0; 2026 } 2027 2028 // found profile, let's see if it's newer than ours 2029 if (note->created_at > last_profile->created_at) { 2030 // this is a new profile note, record last fetched time 2031 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 2032 } 2033 } else { 2034 // couldn't fetch profile. record last fetched time 2035 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 2036 } 2037 2038 return 1; 2039 } 2040 2041 int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, 2042 uint64_t fetched_at) 2043 { 2044 struct ndb_writer_msg msg; 2045 msg.type = NDB_WRITER_PROFILE_LAST_FETCH; 2046 memcpy(&msg.last_fetch.pubkey[0], pubkey, 32); 2047 msg.last_fetch.fetched_at = fetched_at; 2048 2049 return ndb_writer_queue_msg(&ndb->writer, &msg); 2050 } 2051 2052 2053 // When doing cursor scans from greatest to lowest, this function positions the 2054 // cursor at the first element before descending. MDB_SET_RANGE puts us right 2055 // after the first element, so we have to go back one. 2056 static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v) 2057 { 2058 // Position cursor at the next key greater than or equal to the 2059 // specified key 2060 if (mdb_cursor_get(cur, k, v, MDB_SET_RANGE)) { 2061 // Failed :(. It could be the last element? 2062 if (mdb_cursor_get(cur, k, v, MDB_LAST)) 2063 return 0; 2064 } else { 2065 // if set range worked and our key exists, it should be 2066 // the one right before this one 2067 if (mdb_cursor_get(cur, k, v, MDB_PREV)) 2068 return 0; 2069 } 2070 2071 return 1; 2072 } 2073 2074 // get some value based on a clustered id key 2075 int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id, 2076 MDB_val *val) 2077 { 2078 MDB_val k, v; 2079 MDB_cursor *cur; 2080 int success = 0, rc; 2081 struct ndb_tsid tsid; 2082 2083 // position at the most recent 2084 ndb_tsid_high(&tsid, id); 2085 2086 k.mv_data = &tsid; 2087 k.mv_size = sizeof(tsid); 2088 2089 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur))) { 2090 ndb_debug("ndb_get_tsid: failed to open cursor: '%s'\n", mdb_strerror(rc)); 2091 return 0; 2092 } 2093 2094 if (!ndb_cursor_start(cur, &k, &v)) 2095 goto cleanup; 2096 2097 if (memcmp(k.mv_data, id, 32) == 0) { 2098 *val = v; 2099 success = 1; 2100 } 2101 2102 cleanup: 2103 mdb_cursor_close(cur); 2104 return success; 2105 } 2106 2107 static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key, 2108 enum ndb_dbs store, size_t *len) 2109 { 2110 MDB_val k, v; 2111 2112 k.mv_data = &key; 2113 k.mv_size = sizeof(key); 2114 2115 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 2116 ndb_debug("ndb_lookup_by_key: mdb_get note failed\n"); 2117 return NULL; 2118 } 2119 2120 if (len) 2121 *len = v.mv_size; 2122 2123 return v.mv_data; 2124 } 2125 2126 static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind, 2127 enum ndb_dbs store, const unsigned char *pk, 2128 size_t *len, uint64_t *primkey) 2129 { 2130 MDB_val k, v; 2131 void *res = NULL; 2132 if (len) 2133 *len = 0; 2134 2135 if (!ndb_get_tsid(txn, ind, pk, &k)) { 2136 //ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n"); 2137 return 0; 2138 } 2139 2140 if (primkey) 2141 *primkey = *(uint64_t*)k.mv_data; 2142 2143 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 2144 ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n"); 2145 return 0; 2146 } 2147 2148 res = v.mv_data; 2149 assert(((uint64_t)res % 4) == 0); 2150 if (len) 2151 *len = v.mv_size; 2152 return res; 2153 } 2154 2155 void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pk, size_t *len, uint64_t *key) 2156 { 2157 return ndb_lookup_tsid(txn, NDB_DB_PROFILE_PK, NDB_DB_PROFILE, pk, len, key); 2158 } 2159 2160 struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *key) 2161 { 2162 return ndb_lookup_tsid(txn, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len, key); 2163 } 2164 2165 static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn, 2166 enum ndb_dbs db, 2167 const unsigned char *id) 2168 { 2169 MDB_val k; 2170 2171 if (!ndb_get_tsid(txn, db, id, &k)) 2172 return 0; 2173 2174 return *(uint32_t*)k.mv_data; 2175 } 2176 2177 uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id) 2178 { 2179 return ndb_get_indexkey_by_id(txn, NDB_DB_NOTE_ID, id); 2180 } 2181 2182 uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id) 2183 { 2184 return ndb_get_indexkey_by_id(txn, NDB_DB_PROFILE_PK, id); 2185 } 2186 2187 struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 2188 { 2189 return ndb_lookup_by_key(txn, key, NDB_DB_NOTE, len); 2190 } 2191 2192 void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 2193 { 2194 return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len); 2195 } 2196 2197 uint64_t 2198 ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey) 2199 { 2200 MDB_val k, v; 2201 2202 k.mv_data = (unsigned char*)pubkey; 2203 k.mv_size = 32; 2204 2205 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) { 2206 //ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n"); 2207 return 0; 2208 } 2209 2210 return *((uint64_t*)v.mv_data); 2211 } 2212 2213 2214 static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id) 2215 { 2216 MDB_val val; 2217 2218 if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val)) 2219 return 0; 2220 2221 return 1; 2222 } 2223 2224 static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb, 2225 MDB_txn *mdb_txn) 2226 { 2227 txn->lmdb = lmdb; 2228 txn->mdb_txn = mdb_txn; 2229 } 2230 2231 static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid) 2232 { 2233 unsigned char id[32]; 2234 struct ndb_ingest_controller *c = data; 2235 struct ndb_txn txn; 2236 2237 hex_decode(hexid, 64, id, sizeof(id)); 2238 2239 // let's see if we already have it 2240 2241 ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn); 2242 if (!ndb_has_note(&txn, id)) 2243 return NDB_IDRES_CONT; 2244 2245 return NDB_IDRES_STOP; 2246 } 2247 2248 static int ndbprofile_parse_json(flatcc_builder_t *B, 2249 const char *buf, size_t bufsiz, int flags, NdbProfile_ref_t *profile) 2250 { 2251 flatcc_json_parser_t parser, *ctx = &parser; 2252 flatcc_json_parser_init(ctx, B, buf, buf + bufsiz, flags); 2253 2254 if (flatcc_builder_start_buffer(B, 0, 0, 0)) 2255 return 0; 2256 2257 NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile); 2258 if (ctx->error) 2259 return 0; 2260 2261 if (!flatcc_builder_end_buffer(B, *profile)) 2262 return 0; 2263 2264 ctx->end_loc = buf; 2265 2266 2267 return 1; 2268 } 2269 2270 void ndb_profile_record_builder_init(struct ndb_profile_record_builder *b) 2271 { 2272 b->builder = malloc(sizeof(*b->builder)); 2273 b->flatbuf = NULL; 2274 } 2275 2276 void ndb_profile_record_builder_free(struct ndb_profile_record_builder *b) 2277 { 2278 if (b->builder) 2279 free(b->builder); 2280 if (b->flatbuf) 2281 free(b->flatbuf); 2282 2283 b->builder = NULL; 2284 b->flatbuf = NULL; 2285 } 2286 2287 int ndb_process_profile_note(struct ndb_note *note, 2288 struct ndb_profile_record_builder *profile) 2289 { 2290 int res; 2291 2292 NdbProfile_ref_t profile_table; 2293 flatcc_builder_t *builder; 2294 2295 ndb_profile_record_builder_init(profile); 2296 builder = profile->builder; 2297 flatcc_builder_init(builder); 2298 2299 NdbProfileRecord_start_as_root(builder); 2300 2301 //printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note)); 2302 if (!(res = ndbprofile_parse_json(builder, ndb_note_content(note), 2303 note->content_length, 2304 flatcc_json_parser_f_skip_unknown, 2305 &profile_table))) 2306 { 2307 ndb_debug("profile_parse_json failed %d '%.*s'\n", res, 2308 note->content_length, ndb_note_content(note)); 2309 ndb_profile_record_builder_free(profile); 2310 return 0; 2311 } 2312 2313 uint64_t received_at = time(NULL); 2314 const char *lnurl = "fixme"; 2315 2316 NdbProfileRecord_profile_add(builder, profile_table); 2317 NdbProfileRecord_received_at_add(builder, received_at); 2318 2319 flatcc_builder_ref_t lnurl_off; 2320 lnurl_off = flatcc_builder_create_string_str(builder, lnurl); 2321 2322 NdbProfileRecord_lnurl_add(builder, lnurl_off); 2323 2324 //*profile = flatcc_builder_finalize_aligned_buffer(builder, profile_len); 2325 return 1; 2326 } 2327 2328 static int ndb_ingester_queue_event(struct ndb_ingester *ingester, 2329 char *json, unsigned len, unsigned client) 2330 { 2331 struct ndb_ingester_msg msg; 2332 msg.type = NDB_INGEST_EVENT; 2333 2334 msg.event.json = json; 2335 msg.event.len = len; 2336 msg.event.client = client; 2337 2338 return threadpool_dispatch(&ingester->tp, &msg); 2339 } 2340 2341 2342 static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json, 2343 int len, unsigned client) 2344 { 2345 // Since we need to return as soon as possible, and we're not 2346 // making any assumptions about the lifetime of the string, we 2347 // definitely need to copy the json here. In the future once we 2348 // have our thread that manages a websocket connection, we can 2349 // avoid the copy and just use the buffer we get from that 2350 // thread. 2351 char *json_copy = strdupn(json, len); 2352 if (json_copy == NULL) 2353 return 0; 2354 2355 return ndb_ingester_queue_event(ingester, json_copy, len, client); 2356 } 2357 2358 2359 static int ndb_ingester_process_note(secp256k1_context *ctx, 2360 struct ndb_note *note, 2361 size_t note_size, 2362 struct ndb_writer_msg *out, 2363 struct ndb_ingester *ingester) 2364 { 2365 enum ndb_ingest_filter_action action; 2366 action = NDB_INGEST_ACCEPT; 2367 2368 if (ingester->filter) 2369 action = ingester->filter(ingester->filter_context, note); 2370 2371 if (action == NDB_INGEST_REJECT) 2372 return 0; 2373 2374 // some special situations we might want to skip sig validation, 2375 // like during large imports 2376 if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) { 2377 // if we're skipping validation we don't need to verify 2378 } else { 2379 // verify! If it's an invalid note we don't need to 2380 // bother writing it to the database 2381 if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) { 2382 ndb_debug("signature verification failed\n"); 2383 return 0; 2384 } 2385 } 2386 2387 // we didn't find anything. let's send it 2388 // to the writer thread 2389 note = realloc(note, note_size); 2390 assert(((uint64_t)note % 4) == 0); 2391 2392 if (note->kind == 0) { 2393 struct ndb_profile_record_builder *b = 2394 &out->profile.record; 2395 2396 ndb_process_profile_note(note, b); 2397 2398 out->type = NDB_WRITER_PROFILE; 2399 out->profile.note.note = note; 2400 out->profile.note.note_len = note_size; 2401 return 1; 2402 } else if (note->kind == 6) { 2403 // process the repost if we have a repost event 2404 ndb_debug("processing kind 6 repost\n"); 2405 ndb_ingest_event(ingester, ndb_note_content(note), 2406 ndb_note_content_length(note), 0); 2407 } 2408 2409 out->type = NDB_WRITER_NOTE; 2410 out->note.note = note; 2411 out->note.note_len = note_size; 2412 2413 return 1; 2414 } 2415 2416 2417 static int ndb_ingester_process_event(secp256k1_context *ctx, 2418 struct ndb_ingester *ingester, 2419 struct ndb_ingester_event *ev, 2420 struct ndb_writer_msg *out, 2421 MDB_txn *read_txn 2422 ) 2423 { 2424 struct ndb_tce tce; 2425 struct ndb_fce fce; 2426 struct ndb_note *note; 2427 struct ndb_ingest_controller controller; 2428 struct ndb_id_cb cb; 2429 void *buf; 2430 int ok; 2431 size_t bufsize, note_size; 2432 2433 ok = 0; 2434 2435 // we will use this to check if we already have it in the DB during 2436 // ID parsing 2437 controller.read_txn = read_txn; 2438 controller.lmdb = ingester->lmdb; 2439 cb.fn = ndb_ingester_json_controller; 2440 cb.data = &controller; 2441 2442 // since we're going to be passing this allocated note to a different 2443 // thread, we can't use thread-local buffers. just allocate a block 2444 bufsize = max(ev->len * 8.0, 4096); 2445 buf = malloc(bufsize); 2446 if (!buf) { 2447 ndb_debug("couldn't malloc buf\n"); 2448 return 0; 2449 } 2450 2451 note_size = 2452 ev->client ? 2453 ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) : 2454 ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb); 2455 2456 if ((int)note_size == -42) { 2457 // we already have this! 2458 //ndb_debug("already have id??\n"); 2459 goto cleanup; 2460 } else if (note_size == 0) { 2461 ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json); 2462 goto cleanup; 2463 } 2464 2465 //ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json); 2466 2467 if (ev->client) { 2468 switch (fce.evtype) { 2469 case NDB_FCE_EVENT: 2470 note = fce.event.note; 2471 if (note != buf) { 2472 ndb_debug("note buffer not equal to malloc'd buffer\n"); 2473 goto cleanup; 2474 } 2475 2476 if (!ndb_ingester_process_note(ctx, note, note_size, 2477 out, ingester)) { 2478 ndb_debug("failed to process note\n"); 2479 goto cleanup; 2480 } else { 2481 // we're done with the original json, free it 2482 free(ev->json); 2483 return 1; 2484 } 2485 } 2486 } else { 2487 switch (tce.evtype) { 2488 case NDB_TCE_AUTH: goto cleanup; 2489 case NDB_TCE_NOTICE: goto cleanup; 2490 case NDB_TCE_EOSE: goto cleanup; 2491 case NDB_TCE_OK: goto cleanup; 2492 case NDB_TCE_EVENT: 2493 note = tce.event.note; 2494 if (note != buf) { 2495 ndb_debug("note buffer not equal to malloc'd buffer\n"); 2496 goto cleanup; 2497 } 2498 2499 if (!ndb_ingester_process_note(ctx, note, note_size, 2500 out, ingester)) { 2501 ndb_debug("failed to process note\n"); 2502 goto cleanup; 2503 } else { 2504 // we're done with the original json, free it 2505 free(ev->json); 2506 return 1; 2507 } 2508 } 2509 } 2510 2511 2512 cleanup: 2513 free(ev->json); 2514 free(buf); 2515 2516 return ok; 2517 } 2518 2519 static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db) 2520 { 2521 MDB_cursor *mc; 2522 MDB_val key, val; 2523 2524 if (mdb_cursor_open(txn, db, &mc)) 2525 return 0; 2526 2527 if (mdb_cursor_get(mc, &key, &val, MDB_LAST)) { 2528 mdb_cursor_close(mc); 2529 return 0; 2530 } 2531 2532 mdb_cursor_close(mc); 2533 2534 assert(key.mv_size == 8); 2535 return *((uint64_t*)key.mv_data); 2536 } 2537 2538 // 2539 // make a search key meant for user queries without any other note info 2540 static void ndb_make_search_key_low(struct ndb_search_key *key, const char *search) 2541 { 2542 memset(key->id, 0, sizeof(key->id)); 2543 key->timestamp = 0; 2544 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 2545 key->search[sizeof(key->search) - 1] = '\0'; 2546 } 2547 2548 int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query) 2549 { 2550 int rc; 2551 struct ndb_search_key s; 2552 MDB_val k, v; 2553 search->cursor = NULL; 2554 2555 MDB_cursor **cursor = (MDB_cursor **)&search->cursor; 2556 2557 ndb_make_search_key_low(&s, query); 2558 2559 k.mv_data = &s; 2560 k.mv_size = sizeof(s); 2561 2562 if ((rc = mdb_cursor_open(txn->mdb_txn, 2563 txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 2564 cursor))) { 2565 printf("search_profile: cursor opened failed: %s\n", 2566 mdb_strerror(rc)); 2567 return 0; 2568 } 2569 2570 // Position cursor at the next key greater than or equal to the specified key 2571 if (mdb_cursor_get(search->cursor, &k, &v, MDB_SET_RANGE)) { 2572 printf("search_profile: cursor get failed\n"); 2573 goto cleanup; 2574 } else { 2575 search->key = k.mv_data; 2576 assert(v.mv_size == 8); 2577 search->profile_key = *((uint64_t*)v.mv_data); 2578 return 1; 2579 } 2580 2581 cleanup: 2582 mdb_cursor_close(search->cursor); 2583 search->cursor = NULL; 2584 return 0; 2585 } 2586 2587 void ndb_search_profile_end(struct ndb_search *search) 2588 { 2589 if (search->cursor) 2590 mdb_cursor_close(search->cursor); 2591 } 2592 2593 int ndb_search_profile_next(struct ndb_search *search) 2594 { 2595 int rc; 2596 MDB_val k, v; 2597 unsigned char *init_id; 2598 2599 init_id = search->key->id; 2600 k.mv_data = search->key; 2601 k.mv_size = sizeof(*search->key); 2602 2603 retry: 2604 if ((rc = mdb_cursor_get(search->cursor, &k, &v, MDB_NEXT))) { 2605 ndb_debug("ndb_search_profile_next: %s\n", 2606 mdb_strerror(rc)); 2607 return 0; 2608 } else { 2609 search->key = k.mv_data; 2610 assert(v.mv_size == 8); 2611 search->profile_key = *((uint64_t*)v.mv_data); 2612 2613 // skip duplicate pubkeys 2614 if (!memcmp(init_id, search->key->id, 32)) 2615 goto retry; 2616 } 2617 2618 return 1; 2619 } 2620 2621 static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b) 2622 { 2623 int cmp; 2624 struct ndb_search_key *ska, *skb; 2625 2626 ska = a->mv_data; 2627 skb = b->mv_data; 2628 2629 MDB_val a2 = *a; 2630 MDB_val b2 = *b; 2631 2632 a2.mv_data = ska->search; 2633 a2.mv_size = sizeof(ska->search) + sizeof(ska->id); 2634 2635 cmp = mdb_cmp_memn(&a2, &b2); 2636 if (cmp) return cmp; 2637 2638 if (ska->timestamp < skb->timestamp) 2639 return -1; 2640 else if (ska->timestamp > skb->timestamp) 2641 return 1; 2642 2643 return 0; 2644 } 2645 2646 static int ndb_write_profile_pk_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key) 2647 2648 { 2649 MDB_val key, val; 2650 int rc; 2651 struct ndb_tsid tsid; 2652 MDB_dbi pk_db; 2653 2654 pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK]; 2655 2656 // write profile_pk + created_at index 2657 ndb_tsid_init(&tsid, note->pubkey, note->created_at); 2658 2659 key.mv_data = &tsid; 2660 key.mv_size = sizeof(tsid); 2661 val.mv_data = &profile_key; 2662 val.mv_size = sizeof(profile_key); 2663 2664 if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) { 2665 ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n", 2666 profile_key, mdb_strerror(rc)); 2667 return 0; 2668 } 2669 2670 return 1; 2671 } 2672 2673 static int ndb_write_profile(struct ndb_txn *txn, 2674 struct ndb_writer_profile *profile, 2675 uint64_t note_key) 2676 { 2677 uint64_t profile_key; 2678 struct ndb_note *note; 2679 void *flatbuf; 2680 size_t flatbuf_len; 2681 int rc; 2682 2683 MDB_val key, val; 2684 MDB_dbi profile_db; 2685 2686 note = profile->note.note; 2687 2688 // add note_key to profile record 2689 NdbProfileRecord_note_key_add(profile->record.builder, note_key); 2690 NdbProfileRecord_end_as_root(profile->record.builder); 2691 2692 flatbuf = profile->record.flatbuf = 2693 flatcc_builder_finalize_aligned_buffer(profile->record.builder, &flatbuf_len); 2694 2695 assert(((uint64_t)flatbuf % 8) == 0); 2696 2697 // TODO: this may not be safe!? 2698 flatbuf_len = (flatbuf_len + 7) & ~7; 2699 2700 //assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0); 2701 2702 // get dbs 2703 profile_db = txn->lmdb->dbs[NDB_DB_PROFILE]; 2704 2705 // get new key 2706 profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1; 2707 2708 // write profile to profile store 2709 key.mv_data = &profile_key; 2710 key.mv_size = sizeof(profile_key); 2711 val.mv_data = flatbuf; 2712 val.mv_size = flatbuf_len; 2713 2714 if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) { 2715 ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc)); 2716 return 0; 2717 } 2718 2719 // write last fetched record 2720 if (!ndb_maybe_write_last_profile_fetch(txn, note)) { 2721 ndb_debug("failed to write last profile fetched record\n"); 2722 } 2723 2724 // write profile pubkey index 2725 if (!ndb_write_profile_pk_index(txn, note, profile_key)) { 2726 ndb_debug("failed to write profile pubkey index\n"); 2727 return 0; 2728 } 2729 2730 // write name, display_name profile search indices 2731 if (!ndb_write_profile_search_indices(txn, note, profile_key, 2732 flatbuf)) { 2733 ndb_debug("failed to write profile search indices\n"); 2734 return 0; 2735 } 2736 2737 return 1; 2738 } 2739 2740 // find the last id tag in a note (e, p, etc) 2741 static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type) 2742 { 2743 unsigned char *last = NULL; 2744 struct ndb_iterator iter; 2745 struct ndb_str str; 2746 2747 // get the liked event id (last id) 2748 ndb_tags_iterate_start(note, &iter); 2749 2750 while (ndb_tags_iterate_next(&iter)) { 2751 if (iter.tag->count < 2) 2752 continue; 2753 2754 str = ndb_tag_str(note, iter.tag, 0); 2755 2756 // assign liked to the last e tag 2757 if (str.flag == NDB_PACKED_STR && str.str[0] == type) { 2758 str = ndb_tag_str(note, iter.tag, 1); 2759 if (str.flag == NDB_PACKED_ID) 2760 last = str.id; 2761 } 2762 } 2763 2764 return last; 2765 } 2766 2767 void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len) 2768 { 2769 MDB_val k, v; 2770 2771 k.mv_data = (unsigned char*)id; 2772 k.mv_size = 32; 2773 2774 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) { 2775 //ndb_debug("ndb_get_note_meta: mdb_get note failed\n"); 2776 return NULL; 2777 } 2778 2779 if (len) 2780 *len = v.mv_size; 2781 2782 return v.mv_data; 2783 } 2784 2785 // When receiving a reaction note, look for the liked id and increase the 2786 // reaction counter in the note metadata database 2787 static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note) 2788 { 2789 size_t len; 2790 void *root; 2791 int reactions, rc; 2792 MDB_val key, val; 2793 NdbEventMeta_table_t meta; 2794 unsigned char *liked = ndb_note_last_id_tag(note, 'e'); 2795 2796 if (liked == NULL) 2797 return 0; 2798 2799 root = ndb_get_note_meta(txn, liked, &len); 2800 2801 flatcc_builder_t builder; 2802 flatcc_builder_init(&builder); 2803 NdbEventMeta_start_as_root(&builder); 2804 2805 // no meta record, let's make one 2806 if (root == NULL) { 2807 NdbEventMeta_reactions_add(&builder, 1); 2808 } else { 2809 // clone existing and add to it 2810 meta = NdbEventMeta_as_root(root); 2811 2812 reactions = NdbEventMeta_reactions_get(meta); 2813 NdbEventMeta_clone(&builder, meta); 2814 NdbEventMeta_reactions_add(&builder, reactions + 1); 2815 } 2816 2817 NdbProfileRecord_end_as_root(&builder); 2818 root = flatcc_builder_finalize_aligned_buffer(&builder, &len); 2819 assert(((uint64_t)root % 8) == 0); 2820 2821 if (root == NULL) { 2822 ndb_debug("failed to create note metadata record\n"); 2823 return 0; 2824 } 2825 2826 // metadata is keyed on id because we want to collect stats regardless 2827 // if we have the note yet or not 2828 key.mv_data = liked; 2829 key.mv_size = 32; 2830 2831 val.mv_data = root; 2832 val.mv_size = len; 2833 2834 // write the new meta record 2835 //ndb_debug("writing stats record for "); 2836 //print_hex(liked, 32); 2837 //ndb_debug("\n"); 2838 2839 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) { 2840 ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc)); 2841 free(root); 2842 return 0; 2843 } 2844 2845 free(root); 2846 2847 return 1; 2848 } 2849 2850 2851 static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note, 2852 uint64_t note_key) 2853 2854 { 2855 struct ndb_tsid tsid; 2856 int rc; 2857 MDB_val key, val; 2858 MDB_dbi id_db; 2859 2860 ndb_tsid_init(&tsid, note->id, note->created_at); 2861 2862 key.mv_data = &tsid; 2863 key.mv_size = sizeof(tsid); 2864 val.mv_data = ¬e_key; 2865 val.mv_size = sizeof(note_key); 2866 2867 id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 2868 2869 if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) { 2870 ndb_debug("write note id index to db failed: %s\n", 2871 mdb_strerror(rc)); 2872 return 0; 2873 } 2874 2875 return 1; 2876 } 2877 2878 static int ndb_filter_group_add_filters(struct ndb_filter_group *group, 2879 struct ndb_filter *filters, 2880 int num_filters) 2881 { 2882 int i; 2883 2884 for (i = 0; i < num_filters; i++) { 2885 if (!ndb_filter_group_add(group, &filters[i])) 2886 return 0; 2887 } 2888 2889 return 1; 2890 } 2891 2892 2893 static struct ndb_filter_elements * 2894 ndb_filter_find_elements(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) 2895 { 2896 int i; 2897 struct ndb_filter_elements *els; 2898 2899 for (i = 0; i < filter->num_elements; i++) { 2900 els = ndb_filter_get_elements(filter, i); 2901 assert(els); 2902 if (els->field.type == typ) { 2903 return els; 2904 } 2905 } 2906 2907 return NULL; 2908 } 2909 2910 static const char *ndb_filter_find_search(struct ndb_filter *filter) 2911 { 2912 struct ndb_filter_elements *els; 2913 2914 if (!(els = ndb_filter_find_elements(filter, NDB_FILTER_SEARCH))) 2915 return NULL; 2916 2917 return ndb_filter_get_string_element(filter, els, 0); 2918 } 2919 2920 int ndb_filter_is_subset_of(const struct ndb_filter *a, const struct ndb_filter *b) 2921 { 2922 int i; 2923 struct ndb_filter_elements *b_field, *a_field; 2924 2925 // Everything is always a subset of {} 2926 if (b->num_elements == 0) 2927 return 1; 2928 2929 // We can't be a subset if the number of elements in the other 2930 // filter is larger then the number of elements we have. 2931 if (b->num_elements > a->num_elements) 2932 return 0; 2933 2934 // If our filter count matches, we can only be a subset if we are 2935 // equal 2936 if (b->num_elements == a->num_elements) 2937 return ndb_filter_eq(a, b); 2938 2939 // If our element count is larger than the other filter, then we 2940 // must see if every element in the other filter exists in ours. If 2941 // so, then we are a subset of the other. 2942 // 2943 // eg: B={k:1, a:b} <- A={t:x, k:1, a:b} 2944 // 2945 // A is a subset of B because `k:1` and `a:b` both exist in A 2946 2947 for (i = 0; i < b->num_elements; i++) { 2948 b_field = ndb_filter_get_elements((struct ndb_filter*)b, i); 2949 a_field = ndb_filter_find_elements((struct ndb_filter*)a, 2950 b_field->field.type); 2951 2952 if (a_field == NULL) 2953 return 0; 2954 2955 if (!ndb_filter_field_eq((struct ndb_filter*)a, a_field, 2956 (struct ndb_filter*)b, b_field)) 2957 return 0; 2958 } 2959 2960 return 1; 2961 } 2962 2963 int ndb_filter_eq(const struct ndb_filter *a, const struct ndb_filter *b) 2964 { 2965 int i; 2966 struct ndb_filter_elements *a_els, *b_els; 2967 2968 if (a->num_elements != b->num_elements) 2969 return 0; 2970 2971 for (i = 0; i < a->num_elements; i++) { 2972 a_els = ndb_filter_get_elements((struct ndb_filter*)a, i); 2973 b_els = ndb_filter_find_elements((struct ndb_filter *)b, 2974 a_els->field.type); 2975 2976 if (b_els == NULL) 2977 return 0; 2978 2979 if (!ndb_filter_field_eq((struct ndb_filter*)a, a_els, 2980 (struct ndb_filter*)b, b_els)) 2981 return 0; 2982 } 2983 2984 return 1; 2985 } 2986 2987 2988 static uint64_t * 2989 ndb_filter_get_elem(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) 2990 { 2991 struct ndb_filter_elements *els; 2992 if ((els = ndb_filter_find_elements(filter, typ))) 2993 return &els->elements[0]; 2994 return NULL; 2995 } 2996 2997 static uint64_t *ndb_filter_get_int(struct ndb_filter *filter, 2998 enum ndb_filter_fieldtype typ) 2999 { 3000 uint64_t *el; 3001 if (NULL == (el = ndb_filter_get_elem(filter, typ))) 3002 return NULL; 3003 return el; 3004 } 3005 3006 static inline int push_query_result(struct ndb_query_results *results, 3007 struct ndb_query_result *result) 3008 { 3009 return cursor_push(&results->cur, (unsigned char*)result, sizeof(*result)); 3010 } 3011 3012 static int compare_query_results(const void *pa, const void *pb) 3013 { 3014 struct ndb_query_result *a, *b; 3015 3016 a = (struct ndb_query_result *)pa; 3017 b = (struct ndb_query_result *)pb; 3018 3019 if (a->note->created_at == b->note->created_at) { 3020 return 0; 3021 } else if (a->note->created_at > b->note->created_at) { 3022 return -1; 3023 } else { 3024 return 1; 3025 } 3026 } 3027 3028 static void ndb_query_result_init(struct ndb_query_result *res, 3029 struct ndb_note *note, 3030 uint64_t note_size, 3031 uint64_t note_id) 3032 { 3033 *res = (struct ndb_query_result){ 3034 .note_id = note_id, 3035 .note_size = note_size, 3036 .note = note, 3037 }; 3038 } 3039 3040 static int query_is_full(struct ndb_query_results *results, int limit) 3041 { 3042 if (results->cur.p >= results->cur.end) 3043 return 1; 3044 3045 return cursor_count(&results->cur, sizeof(struct ndb_query_result)) >= limit; 3046 } 3047 3048 static int ndb_query_plan_execute_search(struct ndb_txn *txn, 3049 struct ndb_filter *filter, 3050 struct ndb_query_results *results, 3051 int limit) 3052 { 3053 const char *search; 3054 int i; 3055 struct ndb_text_search_results text_results; 3056 struct ndb_text_search_result *text_result; 3057 struct ndb_text_search_config config; 3058 struct ndb_query_result result; 3059 3060 ndb_default_text_search_config(&config); 3061 3062 if (!(search = ndb_filter_find_search(filter))) 3063 return 0; 3064 3065 if (!ndb_text_search_with(txn, search, &text_results, &config, filter)) 3066 return 0; 3067 3068 for (i = 0; i < text_results.num_results; i++) { 3069 if (query_is_full(results, limit)) 3070 break; 3071 3072 text_result = &text_results.results[i]; 3073 3074 result.note = text_result->note; 3075 result.note_size = text_result->note_size; 3076 result.note_id = text_result->key.note_id; 3077 3078 if (!push_query_result(results, &result)) 3079 break; 3080 } 3081 3082 return 1; 3083 } 3084 3085 static int ndb_query_plan_execute_ids(struct ndb_txn *txn, 3086 struct ndb_filter *filter, 3087 struct ndb_query_results *results, 3088 int limit) 3089 { 3090 MDB_cursor *cur; 3091 MDB_dbi db; 3092 MDB_val k, v; 3093 int rc, i; 3094 struct ndb_filter_elements *ids; 3095 struct ndb_note *note; 3096 struct ndb_query_result res; 3097 struct ndb_tsid tsid, *ptsid; 3098 uint64_t note_id, until, *pint; 3099 size_t note_size; 3100 unsigned char *id; 3101 3102 until = UINT64_MAX; 3103 3104 if (!(ids = ndb_filter_find_elements(filter, NDB_FILTER_IDS))) 3105 return 0; 3106 3107 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3108 until = *pint; 3109 3110 db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 3111 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3112 return 0; 3113 3114 // for each id in our ids filter, find in the db 3115 for (i = 0; i < ids->count; i++) { 3116 if (query_is_full(results, limit)) 3117 break; 3118 3119 id = ndb_filter_get_id_element(filter, ids, i); 3120 ndb_tsid_init(&tsid, (unsigned char *)id, until); 3121 3122 k.mv_data = &tsid; 3123 k.mv_size = sizeof(tsid); 3124 3125 if (!ndb_cursor_start(cur, &k, &v)) 3126 continue; 3127 3128 ptsid = (struct ndb_tsid *)k.mv_data; 3129 note_id = *(uint64_t*)v.mv_data; 3130 3131 if (memcmp(id, ptsid->id, 32)) 3132 continue; 3133 3134 // get the note because we need it to match against the filter 3135 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3136 continue; 3137 3138 // Sure this particular lookup matched the index query, but 3139 // does it match the entire filter? Check! We also pass in 3140 // things we've already matched via the filter so we don't have 3141 // to check again. This can be pretty important for filters 3142 // with a large number of entries. 3143 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_IDS)) 3144 continue; 3145 3146 ndb_query_result_init(&res, note, note_size, note_id); 3147 if (!push_query_result(results, &res)) 3148 break; 3149 } 3150 3151 mdb_cursor_close(cur); 3152 return 1; 3153 } 3154 3155 // 3156 // encode a tag index key 3157 // 3158 // consists of: 3159 // 3160 // u8 tag 3161 // u8 tag_val_len 3162 // [u8] tag_val_bytes 3163 // u64 created_at 3164 // 3165 static int ndb_encode_tag_key(unsigned char *buf, int buf_size, 3166 char tag, const unsigned char *val, 3167 unsigned char val_len, 3168 uint64_t timestamp) 3169 { 3170 struct cursor writer; 3171 int ok; 3172 3173 // quick exit for obvious case where it will be too big. There can be 3174 // values of val_len that still fail, but we just let the writer handle 3175 // those failure cases 3176 if (val_len >= buf_size) 3177 return 0; 3178 3179 make_cursor(buf, buf + buf_size, &writer); 3180 3181 ok = 3182 cursor_push_byte(&writer, tag) && 3183 cursor_push(&writer, (unsigned char*)val, val_len) && 3184 cursor_push(&writer, (unsigned char*)×tamp, sizeof(timestamp)); 3185 3186 if (!ok) 3187 return 0; 3188 3189 return writer.p - writer.start; 3190 } 3191 3192 static int ndb_query_plan_execute_authors(struct ndb_txn *txn, 3193 struct ndb_filter *filter, 3194 struct ndb_query_results *results, 3195 int limit) 3196 { 3197 MDB_val k, v; 3198 MDB_cursor *cur; 3199 int rc, i; 3200 uint64_t *pint, until, since, note_key; 3201 unsigned char *author; 3202 struct ndb_note *note; 3203 size_t note_size; 3204 struct ndb_filter_elements *authors; 3205 struct ndb_query_result res; 3206 struct ndb_tsid tsid, *ptsid; 3207 enum ndb_dbs db; 3208 3209 db = txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY]; 3210 3211 if (!(authors = ndb_filter_find_elements(filter, NDB_FILTER_AUTHORS))) 3212 return 0; 3213 3214 until = UINT64_MAX; 3215 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3216 until = *pint; 3217 3218 since = 0; 3219 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3220 since = *pint; 3221 3222 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3223 return 0; 3224 3225 for (i = 0; i < authors->count; i++) { 3226 author = ndb_filter_get_id_element(filter, authors, i); 3227 3228 ndb_tsid_init(&tsid, author, until); 3229 3230 k.mv_data = &tsid; 3231 k.mv_size = sizeof(tsid); 3232 3233 if (!ndb_cursor_start(cur, &k, &v)) 3234 continue; 3235 3236 // for each id in our ids filter, find in the db 3237 while (!query_is_full(results, limit)) { 3238 ptsid = (struct ndb_tsid *)k.mv_data; 3239 note_key = *(uint64_t*)v.mv_data; 3240 3241 // don't continue the scan if we're below `since` 3242 if (ptsid->timestamp < since) 3243 break; 3244 3245 // our author should match, if not bail 3246 if (memcmp(author, ptsid->id, 32)) 3247 break; 3248 3249 // fetch the note, we need it for our query results 3250 // and to match further against the filter 3251 if (!(note = ndb_get_note_by_key(txn, note_key, ¬e_size))) 3252 goto next; 3253 3254 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_AUTHORS)) 3255 goto next; 3256 3257 ndb_query_result_init(&res, note, note_size, note_key); 3258 if (!push_query_result(results, &res)) 3259 break; 3260 3261 next: 3262 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3263 break; 3264 } 3265 } 3266 3267 mdb_cursor_close(cur); 3268 return 1; 3269 } 3270 3271 static int ndb_query_plan_execute_created_at(struct ndb_txn *txn, 3272 struct ndb_filter *filter, 3273 struct ndb_query_results *results, 3274 int limit) 3275 { 3276 MDB_dbi db; 3277 MDB_val k, v; 3278 MDB_cursor *cur; 3279 int rc; 3280 struct ndb_note *note; 3281 struct ndb_tsid key, *pkey; 3282 uint64_t *pint, until, since, note_id; 3283 size_t note_size; 3284 struct ndb_query_result res; 3285 unsigned char high_key[32] = {0xFF}; 3286 3287 db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 3288 3289 until = UINT64_MAX; 3290 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3291 until = *pint; 3292 3293 since = 0; 3294 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3295 since = *pint; 3296 3297 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3298 return 0; 3299 3300 // if we have until, start there, otherwise just use max 3301 ndb_tsid_init(&key, high_key, until); 3302 k.mv_data = &key; 3303 k.mv_size = sizeof(key); 3304 3305 if (!ndb_cursor_start(cur, &k, &v)) 3306 return 1; 3307 3308 while (!query_is_full(results, limit)) { 3309 pkey = (struct ndb_tsid *)k.mv_data; 3310 note_id = *(uint64_t*)v.mv_data; 3311 assert(v.mv_size == 8); 3312 3313 // don't continue the scan if we're below `since` 3314 if (pkey->timestamp < since) 3315 break; 3316 3317 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3318 goto next; 3319 3320 // does this entry match our filter? 3321 if (!ndb_filter_matches_with(filter, note, 0)) 3322 goto next; 3323 3324 ndb_query_result_init(&res, note, (uint64_t)note_size, note_id); 3325 if (!push_query_result(results, &res)) 3326 break; 3327 next: 3328 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3329 break; 3330 } 3331 3332 mdb_cursor_close(cur); 3333 return 1; 3334 } 3335 3336 static int ndb_query_plan_execute_tags(struct ndb_txn *txn, 3337 struct ndb_filter *filter, 3338 struct ndb_query_results *results, 3339 int limit) 3340 { 3341 MDB_cursor *cur; 3342 MDB_dbi db; 3343 MDB_val k, v; 3344 int len, taglen, rc, i; 3345 uint64_t *pint, until, note_id; 3346 size_t note_size; 3347 unsigned char key_buffer[255]; 3348 struct ndb_note *note; 3349 struct ndb_filter_elements *tags; 3350 unsigned char *tag; 3351 struct ndb_query_result res; 3352 3353 db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; 3354 3355 if (!(tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS))) 3356 return 0; 3357 3358 until = UINT64_MAX; 3359 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3360 until = *pint; 3361 3362 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3363 return 0; 3364 3365 for (i = 0; i < tags->count; i++) { 3366 tag = ndb_filter_get_id_element(filter, tags, i); 3367 3368 taglen = tags->field.elem_type == NDB_ELEMENT_ID 3369 ? 32 : strlen((const char*)tag); 3370 3371 if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), 3372 tags->field.tag, tag, taglen, 3373 until))) 3374 return 0; 3375 3376 k.mv_data = key_buffer; 3377 k.mv_size = len; 3378 3379 if (!ndb_cursor_start(cur, &k, &v)) 3380 continue; 3381 3382 // for each id in our ids filter, find in the db 3383 while (!query_is_full(results, limit)) { 3384 // check if tag value matches, bail if not 3385 if (((unsigned char *)k.mv_data)[0] != tags->field.tag) 3386 break; 3387 3388 // check if tag value matches, bail if not 3389 if (taglen != k.mv_size - 9) 3390 break; 3391 3392 if (memcmp((unsigned char *)k.mv_data+1, tag, k.mv_size-9)) 3393 break; 3394 3395 note_id = *(uint64_t*)v.mv_data; 3396 3397 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3398 goto next; 3399 3400 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_TAGS)) 3401 goto next; 3402 3403 ndb_query_result_init(&res, note, note_size, note_id); 3404 if (!push_query_result(results, &res)) 3405 break; 3406 3407 next: 3408 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3409 break; 3410 } 3411 } 3412 3413 mdb_cursor_close(cur); 3414 return 1; 3415 } 3416 3417 static int ndb_query_plan_execute_kinds(struct ndb_txn *txn, 3418 struct ndb_filter *filter, 3419 struct ndb_query_results *results, 3420 int limit) 3421 { 3422 MDB_cursor *cur; 3423 MDB_dbi db; 3424 MDB_val k, v; 3425 struct ndb_note *note; 3426 struct ndb_u64_ts tsid, *ptsid; 3427 struct ndb_filter_elements *kinds; 3428 struct ndb_query_result res; 3429 uint64_t kind, note_id, until, since, *pint; 3430 size_t note_size; 3431 int i, rc; 3432 3433 // we should have kinds in a kinds filter! 3434 if (!(kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS))) 3435 return 0; 3436 3437 until = UINT64_MAX; 3438 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3439 until = *pint; 3440 3441 since = 0; 3442 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3443 since = *pint; 3444 3445 db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 3446 3447 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3448 return 0; 3449 3450 for (i = 0; i < kinds->count; i++) { 3451 if (query_is_full(results, limit)) 3452 break; 3453 3454 kind = kinds->elements[i]; 3455 ndb_debug("kind %" PRIu64 "\n", kind); 3456 ndb_u64_ts_init(&tsid, kind, until); 3457 3458 k.mv_data = &tsid; 3459 k.mv_size = sizeof(tsid); 3460 3461 if (!ndb_cursor_start(cur, &k, &v)) 3462 continue; 3463 3464 // for each id in our ids filter, find in the db 3465 while (!query_is_full(results, limit)) { 3466 ptsid = (struct ndb_u64_ts *)k.mv_data; 3467 if (ptsid->u64 != kind) 3468 break; 3469 3470 // don't continue the scan if we're below `since` 3471 if (ptsid->timestamp < since) 3472 break; 3473 3474 note_id = *(uint64_t*)v.mv_data; 3475 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3476 goto next; 3477 3478 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_KINDS)) 3479 goto next; 3480 3481 ndb_query_result_init(&res, note, note_size, note_id); 3482 if (!push_query_result(results, &res)) 3483 break; 3484 3485 next: 3486 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3487 break; 3488 } 3489 } 3490 3491 mdb_cursor_close(cur); 3492 return 1; 3493 } 3494 3495 static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter) 3496 { 3497 struct ndb_filter_elements *ids, *kinds, *authors, *tags, *search; 3498 3499 ids = ndb_filter_find_elements(filter, NDB_FILTER_IDS); 3500 search = ndb_filter_find_elements(filter, NDB_FILTER_SEARCH); 3501 kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS); 3502 authors = ndb_filter_find_elements(filter, NDB_FILTER_AUTHORS); 3503 tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS); 3504 3505 // this is rougly similar to the heuristic in strfry's dbscan 3506 if (search) { 3507 return NDB_PLAN_SEARCH; 3508 } else if (ids) { 3509 return NDB_PLAN_IDS; 3510 } else if (kinds && authors && authors->count <= 10) { 3511 return NDB_PLAN_AUTHOR_KINDS; 3512 } else if (authors && authors->count <= 10) { 3513 return NDB_PLAN_AUTHORS; 3514 } else if (tags && tags->count <= 10) { 3515 return NDB_PLAN_TAGS; 3516 } else if (kinds) { 3517 return NDB_PLAN_KINDS; 3518 } 3519 3520 return NDB_PLAN_CREATED; 3521 } 3522 3523 static const char *ndb_query_plan_name(int plan_id) 3524 { 3525 switch (plan_id) { 3526 case NDB_PLAN_IDS: return "ids"; 3527 case NDB_PLAN_SEARCH: return "search"; 3528 case NDB_PLAN_KINDS: return "kinds"; 3529 case NDB_PLAN_TAGS: return "tags"; 3530 case NDB_PLAN_CREATED: return "created"; 3531 case NDB_PLAN_AUTHORS: return "authors"; 3532 } 3533 3534 return "unknown"; 3535 } 3536 3537 static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter, 3538 struct ndb_query_result *res, int capacity, 3539 int *results_out) 3540 { 3541 struct ndb_query_results results; 3542 uint64_t limit, *pint; 3543 enum ndb_query_plan plan; 3544 limit = capacity; 3545 3546 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT))) 3547 limit = *pint; 3548 3549 limit = min(capacity, limit); 3550 make_cursor((unsigned char *)res, 3551 ((unsigned char *)res) + limit * sizeof(*res), 3552 &results.cur); 3553 3554 plan = ndb_filter_plan(filter); 3555 ndb_debug("using query plan '%s'\n", ndb_query_plan_name(plan)); 3556 switch (plan) { 3557 // We have a list of ids, just open a cursor and jump to each once 3558 case NDB_PLAN_IDS: 3559 if (!ndb_query_plan_execute_ids(txn, filter, &results, limit)) 3560 return 0; 3561 break; 3562 3563 case NDB_PLAN_SEARCH: 3564 if (!ndb_query_plan_execute_search(txn, filter, &results, limit)) 3565 return 0; 3566 break; 3567 3568 // We have just kinds, just scan the kind index 3569 case NDB_PLAN_KINDS: 3570 if (!ndb_query_plan_execute_kinds(txn, filter, &results, limit)) 3571 return 0; 3572 break; 3573 3574 case NDB_PLAN_TAGS: 3575 if (!ndb_query_plan_execute_tags(txn, filter, &results, limit)) 3576 return 0; 3577 break; 3578 case NDB_PLAN_CREATED: 3579 if (!ndb_query_plan_execute_created_at(txn, filter, &results, limit)) 3580 return 0; 3581 break; 3582 case NDB_PLAN_AUTHORS: 3583 if (!ndb_query_plan_execute_authors(txn, filter, &results, limit)) 3584 return 0; 3585 break; 3586 case NDB_PLAN_AUTHOR_KINDS: 3587 /* TODO: author kinds 3588 if (!ndb_query_plan_execute_author_kinds(txn, filter, &results, limit)) 3589 return 0; 3590 */ 3591 if (!ndb_query_plan_execute_authors(txn, filter, &results, limit)) 3592 return 0; 3593 break; 3594 } 3595 3596 *results_out = cursor_count(&results.cur, sizeof(*res)); 3597 return 1; 3598 } 3599 3600 int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters, 3601 struct ndb_query_result *results, int result_capacity, int *count) 3602 { 3603 int i, out; 3604 struct ndb_query_result *p = results; 3605 3606 out = 0; 3607 *count = 0; 3608 3609 for (i = 0; i < num_filters; i++) { 3610 if (!ndb_query_filter(txn, &filters[i], p, 3611 result_capacity, &out)) { 3612 return 0; 3613 } 3614 3615 *count += out; 3616 p += out; 3617 result_capacity -= out; 3618 if (result_capacity <= 0) 3619 break; 3620 } 3621 3622 // sort results 3623 qsort(results, *count, sizeof(*results), compare_query_results); 3624 return 1; 3625 } 3626 3627 static int ndb_write_note_tag_index(struct ndb_txn *txn, struct ndb_note *note, 3628 uint64_t note_key) 3629 { 3630 unsigned char key_buffer[255]; 3631 struct ndb_iterator iter; 3632 struct ndb_str tkey, tval; 3633 char tchar; 3634 int len, rc; 3635 MDB_val key, val; 3636 MDB_dbi tags_db; 3637 3638 tags_db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; 3639 3640 ndb_tags_iterate_start(note, &iter); 3641 3642 while (ndb_tags_iterate_next(&iter)) { 3643 if (iter.tag->count < 2) 3644 continue; 3645 3646 tkey = ndb_tag_str(note, iter.tag, 0); 3647 3648 // we only write indices for 1-char tags. 3649 tchar = tkey.str[0]; 3650 if (tchar == 0 || tkey.str[1] != 0) 3651 continue; 3652 3653 tval = ndb_tag_str(note, iter.tag, 1); 3654 len = ndb_str_len(&tval); 3655 3656 if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), 3657 tchar, tval.id, (unsigned char)len, 3658 ndb_note_created_at(note)))) { 3659 // this will fail when we try to encode a key that is 3660 // too big 3661 continue; 3662 } 3663 3664 //ndb_debug("writing tag '%c':'data:%d' to index\n", tchar, len); 3665 3666 key.mv_data = key_buffer; 3667 key.mv_size = len; 3668 3669 val.mv_data = ¬e_key; 3670 val.mv_size = sizeof(note_key); 3671 3672 if ((rc = mdb_put(txn->mdb_txn, tags_db, &key, &val, 0))) { 3673 ndb_debug("write note tag index to db failed: %s\n", 3674 mdb_strerror(rc)); 3675 return 0; 3676 } 3677 } 3678 3679 return 1; 3680 } 3681 3682 static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note, 3683 uint64_t note_key) 3684 { 3685 struct ndb_u64_ts tsid; 3686 int rc; 3687 MDB_val key, val; 3688 MDB_dbi kind_db; 3689 3690 ndb_u64_ts_init(&tsid, note->kind, note->created_at); 3691 3692 key.mv_data = &tsid; 3693 key.mv_size = sizeof(tsid); 3694 val.mv_data = ¬e_key; 3695 val.mv_size = sizeof(note_key); 3696 3697 kind_db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 3698 3699 if ((rc = mdb_put(txn->mdb_txn, kind_db, &key, &val, 0))) { 3700 ndb_debug("write note kind index to db failed: %s\n", 3701 mdb_strerror(rc)); 3702 return 0; 3703 } 3704 3705 return 1; 3706 } 3707 3708 static int ndb_write_word_to_index(struct ndb_txn *txn, const char *word, 3709 int word_len, int word_index, 3710 uint64_t timestamp, uint64_t note_id) 3711 { 3712 // cap to some reasonable key size 3713 unsigned char buffer[1024]; 3714 int keysize, rc; 3715 MDB_val k, v; 3716 MDB_dbi text_db; 3717 3718 // build our compressed text index key 3719 if (!ndb_make_text_search_key(buffer, sizeof(buffer), word_index, 3720 word_len, word, timestamp, note_id, 3721 &keysize)) { 3722 // probably too big 3723 3724 return 0; 3725 } 3726 3727 k.mv_data = buffer; 3728 k.mv_size = keysize; 3729 3730 v.mv_data = NULL; 3731 v.mv_size = 0; 3732 3733 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 3734 3735 if ((rc = mdb_put(txn->mdb_txn, text_db, &k, &v, 0))) { 3736 ndb_debug("write note text index to db failed: %s\n", 3737 mdb_strerror(rc)); 3738 return 0; 3739 } 3740 3741 return 1; 3742 } 3743 3744 3745 3746 // break a string into individual words for querying or for building the 3747 // fulltext search index. This is callback based so we don't need to 3748 // build up an intermediate structure 3749 static int ndb_parse_words(struct cursor *cur, void *ctx, ndb_word_parser_fn fn) 3750 { 3751 int word_len, words; 3752 const char *word; 3753 3754 words = 0; 3755 3756 while (cur->p < cur->end) { 3757 consume_whitespace_or_punctuation(cur); 3758 if (cur->p >= cur->end) 3759 break; 3760 word = (const char *)cur->p; 3761 3762 if (!consume_until_boundary(cur)) 3763 break; 3764 3765 // start of word or end 3766 word_len = cur->p - (unsigned char *)word; 3767 if (word_len == 0 && cur->p >= cur->end) 3768 break; 3769 3770 if (word_len == 0) { 3771 if (!cursor_skip(cur, 1)) 3772 break; 3773 continue; 3774 } 3775 3776 //ndb_debug("writing word index '%.*s'\n", word_len, word); 3777 3778 if (!fn(ctx, word, word_len, words)) 3779 continue; 3780 3781 words++; 3782 } 3783 3784 return 1; 3785 } 3786 3787 struct ndb_word_writer_ctx 3788 { 3789 struct ndb_txn *txn; 3790 struct ndb_note *note; 3791 uint64_t note_id; 3792 }; 3793 3794 static int ndb_fulltext_word_writer(void *ctx, 3795 const char *word, int word_len, int words) 3796 { 3797 struct ndb_word_writer_ctx *wctx = ctx; 3798 3799 if (!ndb_write_word_to_index(wctx->txn, word, word_len, words, 3800 wctx->note->created_at, wctx->note_id)) { 3801 // too big to write this one, just skip it 3802 ndb_debug("failed to write word '%.*s' to index\n", word_len, word); 3803 3804 return 0; 3805 } 3806 3807 //fprintf(stderr, "wrote '%.*s' to note text index\n", word_len, word); 3808 return 1; 3809 } 3810 3811 static int ndb_write_note_fulltext_index(struct ndb_txn *txn, 3812 struct ndb_note *note, 3813 uint64_t note_id) 3814 { 3815 struct cursor cur; 3816 unsigned char *content; 3817 struct ndb_str str; 3818 struct ndb_word_writer_ctx ctx; 3819 3820 str = ndb_note_str(note, ¬e->content); 3821 // I don't think this should happen? 3822 if (unlikely(str.flag == NDB_PACKED_ID)) 3823 return 0; 3824 3825 content = (unsigned char *)str.str; 3826 3827 make_cursor(content, content + note->content_length, &cur); 3828 3829 ctx.txn = txn; 3830 ctx.note = note; 3831 ctx.note_id = note_id; 3832 3833 ndb_parse_words(&cur, &ctx, ndb_fulltext_word_writer); 3834 3835 return 1; 3836 } 3837 3838 static int ndb_parse_search_words(void *ctx, const char *word_str, int word_len, int word_index) 3839 { 3840 (void)word_index; 3841 struct ndb_search_words *words = ctx; 3842 struct ndb_word *word; 3843 3844 if (words->num_words + 1 > MAX_TEXT_SEARCH_WORDS) 3845 return 0; 3846 3847 word = &words->words[words->num_words++]; 3848 word->word = word_str; 3849 word->word_len = word_len; 3850 3851 return 1; 3852 } 3853 3854 static void ndb_search_words_init(struct ndb_search_words *words) 3855 { 3856 words->num_words = 0; 3857 } 3858 3859 static int prefix_count(const char *str1, int len1, const char *str2, int len2) { 3860 int i, count = 0; 3861 int min_len = len1 < len2 ? len1 : len2; 3862 3863 for (i = 0; i < min_len; i++) { 3864 // case insensitive 3865 if (tolower(str1[i]) == tolower(str2[i])) 3866 count++; 3867 else 3868 break; 3869 } 3870 3871 return count; 3872 } 3873 3874 static int ndb_prefix_matches(struct ndb_text_search_result *result, 3875 struct ndb_word *search_word) 3876 { 3877 // Empty strings shouldn't happen but let's 3878 if (result->key.str_len < 2 || search_word->word_len < 2) 3879 return 0; 3880 3881 // make sure we at least have two matching prefix characters. exact 3882 // matches are nice but range searches allow us to match prefixes as 3883 // well. A double-char prefix is suffient, but maybe we could up this 3884 // in the future. 3885 // 3886 // TODO: How are we handling utf-8 prefix matches like 3887 // japanese? 3888 // 3889 if ( result->key.str[0] != tolower(search_word->word[0]) 3890 && result->key.str[1] != tolower(search_word->word[1]) 3891 ) 3892 return 0; 3893 3894 // count the number of prefix-matched characters. This will be used 3895 // for ranking search results 3896 result->prefix_chars = prefix_count(result->key.str, 3897 result->key.str_len, 3898 search_word->word, 3899 search_word->word_len); 3900 3901 if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5)) 3902 return 0; 3903 3904 return 1; 3905 } 3906 3907 // This is called when scanning the full text search index. Scanning stops 3908 // when we no longer have a prefix match for the word 3909 static int ndb_text_search_next_word(MDB_cursor *cursor, MDB_cursor_op op, 3910 MDB_val *k, struct ndb_word *search_word, 3911 struct ndb_text_search_result *last_result, 3912 struct ndb_text_search_result *result, 3913 MDB_cursor_op order_op) 3914 { 3915 struct cursor key_cursor; 3916 //struct ndb_text_search_key search_key; 3917 MDB_val v; 3918 int retries; 3919 retries = -1; 3920 3921 make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor); 3922 3923 // When op is MDB_SET_RANGE, this initializes the search. Position 3924 // the cursor at the next key greater than or equal to the specified 3925 // key. 3926 // 3927 // Subsequent searches should use MDB_NEXT 3928 if (mdb_cursor_get(cursor, k, &v, op)) { 3929 // we should only do this if we're going in reverse 3930 if (op == MDB_SET_RANGE && order_op == MDB_PREV) { 3931 // if set range worked and our key exists, it should be 3932 // the one right before this one 3933 if (mdb_cursor_get(cursor, k, &v, MDB_PREV)) 3934 return 0; 3935 } else { 3936 return 0; 3937 } 3938 } 3939 3940 retry: 3941 retries++; 3942 /* 3943 printf("continuing from "); 3944 if (ndb_unpack_text_search_key(k->mv_data, k->mv_size, &search_key)) { 3945 ndb_print_text_search_key(&search_key); 3946 } else { printf("??"); } 3947 printf("\n"); 3948 */ 3949 3950 make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor); 3951 3952 if (unlikely(!ndb_unpack_text_search_key_noteid(&key_cursor, &result->key.note_id))) { 3953 fprintf(stderr, "UNUSUAL: failed to unpack text search key note_id\n"); 3954 return 0; 3955 } 3956 3957 // if the note id doesn't match the last result, then we stop trying 3958 // each search word 3959 if (last_result && last_result->key.note_id != result->key.note_id) { 3960 return 0; 3961 } 3962 3963 // On success, this could still be not related at all. 3964 // It could just be adjacent to the word. Let's check 3965 // if we have a matching prefix at least. 3966 3967 // Before we unpack the entire key, let's quickly 3968 // unpack just the string to check the prefix. We don't 3969 // need to unpack the entire key if the prefix doesn't 3970 // match 3971 if (!ndb_unpack_text_search_key_string(&key_cursor, 3972 &result->key.str, 3973 &result->key.str_len)) { 3974 // this should never happen 3975 fprintf(stderr, "UNUSUAL: failed to unpack text search key string\n"); 3976 return 0; 3977 } 3978 3979 if (!ndb_prefix_matches(result, search_word)) { 3980 /* 3981 printf("result prefix '%.*s' didn't match search word '%.*s'\n", 3982 result->key.str_len, result->key.str, 3983 search_word->word_len, search_word->word); 3984 */ 3985 // we should only do this if we're going in reverse 3986 if (retries == 0 && op == MDB_SET_RANGE && order_op == MDB_PREV) { 3987 // if set range worked and our key exists, it should be 3988 // the one right before this one 3989 mdb_cursor_get(cursor, k, &v, MDB_PREV); 3990 goto retry; 3991 } else { 3992 return 0; 3993 } 3994 } 3995 3996 // Unpack the remaining text search key, we will need this information 3997 // when building up our search results. 3998 if (!ndb_unpack_remaining_text_search_key(&key_cursor, &result->key)) { 3999 // This should never happen 4000 fprintf(stderr, "UNUSUAL: failed to unpack text search key\n"); 4001 return 0; 4002 } 4003 4004 /* 4005 if (last_result) { 4006 if (result->key.word_index < last_result->key.word_index) { 4007 fprintf(stderr, "skipping '%.*s' because it is before last result '%.*s'\n", 4008 result->key.str_len, result->key.str, 4009 last_result->key.str_len, last_result->key.str); 4010 return 0; 4011 } 4012 } 4013 */ 4014 4015 return 1; 4016 } 4017 4018 static void ndb_text_search_results_init( 4019 struct ndb_text_search_results *results) { 4020 results->num_results = 0; 4021 } 4022 4023 void ndb_default_text_search_config(struct ndb_text_search_config *cfg) 4024 { 4025 cfg->order = NDB_ORDER_DESCENDING; 4026 cfg->limit = MAX_TEXT_SEARCH_RESULTS; 4027 } 4028 4029 void ndb_text_search_config_set_order(struct ndb_text_search_config *cfg, 4030 enum ndb_search_order order) 4031 { 4032 cfg->order = order; 4033 } 4034 4035 void ndb_text_search_config_set_limit(struct ndb_text_search_config *cfg, int limit) 4036 { 4037 cfg->limit = limit; 4038 } 4039 4040 static int compare_search_words(const void *pa, const void *pb) 4041 { 4042 struct ndb_word *a, *b; 4043 4044 a = (struct ndb_word *)pa; 4045 b = (struct ndb_word *)pb; 4046 4047 if (a->word_len == b->word_len) { 4048 return 0; 4049 } else if (a->word_len > b->word_len) { 4050 // biggest words should be at the front of the list, 4051 // so we say it's "smaller" here 4052 return -1; 4053 } else { 4054 return 1; 4055 } 4056 } 4057 4058 // Sort search words from largest to smallest. Larger words are less likely 4059 // in the index, allowing our scan to walk fewer words at the root when 4060 // recursively matching. 4061 void sort_largest_to_smallest(struct ndb_search_words *words) 4062 { 4063 qsort(words->words, words->num_words, sizeof(words->words[0]), compare_search_words); 4064 } 4065 4066 4067 int ndb_text_search_with(struct ndb_txn *txn, const char *query, 4068 struct ndb_text_search_results *results, 4069 struct ndb_text_search_config *config, 4070 struct ndb_filter *filter) 4071 { 4072 unsigned char buffer[1024], *buf; 4073 unsigned char saved_buf[1024], *saved; 4074 struct ndb_text_search_result *result, *last_result; 4075 struct ndb_text_search_result candidate, last_candidate; 4076 struct ndb_search_words search_words; 4077 //struct ndb_text_search_key search_key; 4078 struct ndb_word *search_word; 4079 struct ndb_note *note; 4080 struct cursor cur; 4081 uint64_t since, until, timestamp_op, *pint, note_size; 4082 ndb_text_search_key_order_fn key_order_fn; 4083 MDB_dbi text_db; 4084 MDB_cursor *cursor; 4085 MDB_val k, v; 4086 int i, j, keysize, saved_size, limit; 4087 MDB_cursor_op op, order_op; 4088 4089 note_size = 0; 4090 note = 0; 4091 saved = NULL; 4092 ndb_text_search_results_init(results); 4093 ndb_search_words_init(&search_words); 4094 4095 until = UINT64_MAX; 4096 since = 0; 4097 limit = MAX_TEXT_SEARCH_RESULTS; 4098 4099 // until, since from filter 4100 if (filter != NULL) { 4101 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 4102 until = *pint; 4103 4104 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 4105 since = *pint; 4106 4107 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT))) 4108 limit = *pint; 4109 } 4110 4111 order_op = MDB_PREV; 4112 key_order_fn = ndb_make_text_search_key_high; 4113 timestamp_op = until; 4114 if (config) { 4115 if (config->order == NDB_ORDER_ASCENDING) { 4116 order_op = MDB_NEXT; 4117 // set the min timestamp value to since when ascending 4118 timestamp_op = since; 4119 key_order_fn = ndb_make_text_search_key_low; 4120 } 4121 limit = min(limit, config->limit); 4122 } 4123 // end search config 4124 4125 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 4126 make_cursor((unsigned char *)query, (unsigned char *)query + strlen(query), &cur); 4127 4128 ndb_parse_words(&cur, &search_words, ndb_parse_search_words); 4129 if (search_words.num_words == 0) 4130 return 0; 4131 4132 if ((i = mdb_cursor_open(txn->mdb_txn, text_db, &cursor))) { 4133 fprintf(stderr, "nd_text_search: mdb_cursor_open failed, error %d\n", i); 4134 return 0; 4135 } 4136 4137 // This should complete the query quicker because the larger words are 4138 // likely to have fewer entries in the search index. This is not always 4139 // true. Words with higher frequency (like bitcoin on nostr in 2024) 4140 // may be slower. TODO: Skip word recursion by leveraging a minimal 4141 // perfect hashmap of parsed words on a note 4142 sort_largest_to_smallest(&search_words); 4143 4144 // for each word, we recursively find all of the submatches 4145 while (results->num_results < limit) { 4146 last_result = NULL; 4147 result = &results->results[results->num_results]; 4148 4149 // if we have saved, then we continue from the last root search 4150 // sequence 4151 if (saved) { 4152 buf = saved_buf; 4153 saved = NULL; 4154 keysize = saved_size; 4155 4156 k.mv_data = buf; 4157 k.mv_size = saved_size; 4158 4159 // reposition the cursor so we can continue 4160 if (mdb_cursor_get(cursor, &k, &v, MDB_SET_RANGE)) 4161 break; 4162 4163 op = order_op; 4164 } else { 4165 // construct a packed fulltext search key using this 4166 // word. This key doesn't contain any timestamp or index 4167 // info, so it should range match instead of exact 4168 // match 4169 if (!key_order_fn(buffer, sizeof(buffer), 4170 search_words.words[0].word_len, 4171 search_words.words[0].word, 4172 timestamp_op, 4173 &keysize)) 4174 { 4175 // word is too big to fit in 1024-sized key 4176 continue; 4177 } 4178 4179 buf = buffer; 4180 op = MDB_SET_RANGE; 4181 } 4182 4183 for (j = 0; j < search_words.num_words; j++) { 4184 search_word = &search_words.words[j]; 4185 4186 // shouldn't happen but let's be defensive a bit 4187 if (search_word->word_len == 0) 4188 continue; 4189 4190 // if we already matched a note in this phrase, make 4191 // sure we're including the note id in the query 4192 if (last_result) { 4193 // we are narrowing down a search. 4194 // if we already have this note id, just continue 4195 for (i = 0; i < results->num_results; i++) { 4196 if (results->results[i].key.note_id == last_result->key.note_id) 4197 // we can't break here to 4198 // leave the word loop so 4199 // have to use a goto 4200 goto cont; 4201 } 4202 4203 if (!ndb_make_noted_text_search_key( 4204 buffer, sizeof(buffer), 4205 search_word->word_len, 4206 search_word->word, 4207 last_result->key.timestamp, 4208 last_result->key.note_id, 4209 &keysize)) 4210 { 4211 continue; 4212 } 4213 4214 buf = buffer; 4215 } 4216 4217 k.mv_data = buf; 4218 k.mv_size = keysize; 4219 4220 // TODO: we can speed this up with the minimal perfect 4221 // hashmap by quickly rejecting the remaining words 4222 // by looking in the word hashmap on the note. This 4223 // would allow us to skip the recursive word lookup 4224 // thing 4225 if (!ndb_text_search_next_word(cursor, op, &k, 4226 search_word, 4227 last_result, 4228 &candidate, 4229 order_op)) { 4230 // we didn't find a match for this note_id 4231 if (j == 0) 4232 // if we're at one of the root words, 4233 // this means that there are no further 4234 // root word matches for any note, so 4235 // we know we're done 4236 goto done; 4237 else 4238 break; 4239 } 4240 4241 *result = candidate; 4242 op = MDB_SET_RANGE; 4243 4244 // save the first key match, since we will continue from 4245 // this on the next root word result 4246 if (j == 0) { 4247 if (!saved) { 4248 memcpy(saved_buf, k.mv_data, k.mv_size); 4249 saved = saved_buf; 4250 saved_size = k.mv_size; 4251 } 4252 4253 // since we will be trying to match the same 4254 // note_id on all subsequent word matches, 4255 // let's lookup this note and make sure it 4256 // matches the filter if we have one. If it 4257 // doesn't match, we can quickly skip the 4258 // remaining word queries 4259 if (filter) { 4260 if ((note = ndb_get_note_by_key(txn, 4261 result->key.note_id, 4262 ¬e_size))) 4263 { 4264 if (!ndb_filter_matches(filter, note)) { 4265 break; 4266 } 4267 result->note = note; 4268 result->note_size = note_size; 4269 } 4270 } 4271 } 4272 4273 result->note = note; 4274 result->note_size = note_size; 4275 last_candidate = *result; 4276 last_result = &last_candidate; 4277 } 4278 4279 // we matched all of the queries! 4280 if (j == search_words.num_words) { 4281 results->num_results++; 4282 } 4283 4284 cont: 4285 } 4286 4287 done: 4288 mdb_cursor_close(cursor); 4289 4290 return 1; 4291 } 4292 4293 int ndb_text_search(struct ndb_txn *txn, const char *query, 4294 struct ndb_text_search_results *results, 4295 struct ndb_text_search_config *config) 4296 { 4297 return ndb_text_search_with(txn, query, results, config, NULL); 4298 } 4299 4300 static void ndb_write_blocks(struct ndb_txn *txn, uint64_t note_key, 4301 struct ndb_blocks *blocks) 4302 { 4303 int rc; 4304 MDB_val key, val; 4305 4306 // make sure we're not writing the owned flag to the db 4307 blocks->flags &= ~NDB_BLOCK_FLAG_OWNED; 4308 4309 key.mv_data = ¬e_key; 4310 key.mv_size = sizeof(note_key); 4311 val.mv_data = blocks; 4312 val.mv_size = ndb_blocks_total_size(blocks); 4313 assert((val.mv_size % 8) == 0); 4314 4315 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_BLOCKS], &key, &val, 0))) { 4316 ndb_debug("write version to note_blocks failed: %s\n", 4317 mdb_strerror(rc)); 4318 return; 4319 } 4320 } 4321 4322 static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note, 4323 uint64_t note_key, unsigned char *scratch, 4324 size_t scratch_size) 4325 { 4326 size_t content_len; 4327 const char *content; 4328 struct ndb_blocks *blocks; 4329 4330 content_len = ndb_note_content_length(note); 4331 content = ndb_note_content(note); 4332 4333 if (!ndb_parse_content(scratch, scratch_size, content, content_len, &blocks)) { 4334 //ndb_debug("failed to parse content '%.*s'\n", content_len, content); 4335 return 0; 4336 } 4337 4338 ndb_write_blocks(txn, note_key, blocks); 4339 return 1; 4340 } 4341 4342 static uint64_t ndb_write_note(struct ndb_txn *txn, 4343 struct ndb_writer_note *note, 4344 unsigned char *scratch, size_t scratch_size, 4345 uint32_t ndb_flags) 4346 { 4347 int rc; 4348 uint64_t note_key, kind; 4349 MDB_dbi note_db; 4350 MDB_val key, val; 4351 4352 kind = note->note->kind; 4353 4354 // let's quickly sanity check if we already have this note 4355 if (ndb_get_notekey_by_id(txn, note->note->id)) 4356 return 0; 4357 4358 // get dbs 4359 note_db = txn->lmdb->dbs[NDB_DB_NOTE]; 4360 4361 // get new key 4362 note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1; 4363 4364 // write note to event store 4365 key.mv_data = ¬e_key; 4366 key.mv_size = sizeof(note_key); 4367 val.mv_data = note->note; 4368 val.mv_size = note->note_len; 4369 4370 if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) { 4371 ndb_debug("write note to db failed: %s\n", mdb_strerror(rc)); 4372 return 0; 4373 } 4374 4375 ndb_write_note_id_index(txn, note->note, note_key); 4376 ndb_write_note_kind_index(txn, note->note, note_key); 4377 ndb_write_note_tag_index(txn, note->note, note_key); 4378 ndb_write_note_pubkey_index(txn, note->note, note_key); 4379 ndb_write_note_pubkey_kind_index(txn, note->note, note_key); 4380 4381 // only parse content and do fulltext index on text and longform notes 4382 if (kind == 1 || kind == 30023) { 4383 if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_FULLTEXT)) { 4384 if (!ndb_write_note_fulltext_index(txn, note->note, note_key)) 4385 return 0; 4386 } 4387 4388 // write note blocks 4389 if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_NOTE_BLOCKS)) { 4390 ndb_write_new_blocks(txn, note->note, note_key, scratch, scratch_size); 4391 } 4392 } else if (kind == 7 && !ndb_flag_set(ndb_flags, NDB_FLAG_NO_STATS)) { 4393 ndb_write_reaction_stats(txn, note->note); 4394 } 4395 4396 return note_key; 4397 } 4398 4399 static void ndb_monitor_lock(struct ndb_monitor *mon) { 4400 pthread_mutex_lock(&mon->mutex); 4401 } 4402 4403 static void ndb_monitor_unlock(struct ndb_monitor *mon) { 4404 pthread_mutex_unlock(&mon->mutex); 4405 } 4406 4407 struct written_note { 4408 uint64_t note_id; 4409 struct ndb_writer_note *note; 4410 }; 4411 4412 // When the data has been committed to the database, take all of the written 4413 // notes, check them against subscriptions, and then write to the subscription 4414 // inbox for all matching notes 4415 static void ndb_notify_subscriptions(struct ndb_monitor *monitor, 4416 struct written_note *wrote, int num_notes) 4417 { 4418 int i, k; 4419 int pushed; 4420 struct written_note *written; 4421 struct ndb_note *note; 4422 struct ndb_subscription *sub; 4423 4424 ndb_monitor_lock(monitor); 4425 4426 for (i = 0; i < monitor->num_subscriptions; i++) { 4427 sub = &monitor->subscriptions[i]; 4428 ndb_debug("checking subscription %d, %d notes\n", i, num_notes); 4429 4430 pushed = 0; 4431 for (k = 0; k < num_notes; k++) { 4432 written = &wrote[k]; 4433 note = written->note->note; 4434 4435 if (ndb_filter_group_matches(&sub->group, note)) { 4436 ndb_debug("pushing note\n"); 4437 4438 if (!prot_queue_push(&sub->inbox, &written->note_id)) { 4439 ndb_debug("couldn't push note to subscriber"); 4440 } else { 4441 pushed++; 4442 } 4443 } else { 4444 ndb_debug("not pushing note\n"); 4445 } 4446 } 4447 4448 // After pushing all of the matching notes, check to see if we 4449 // have a registered subscription callback. If so, we call it. 4450 // The callback needs to call ndb_poll_for_notes to pull data 4451 // that was just pushed to the queue in the for loop above. 4452 if (monitor->sub_cb != NULL && pushed > 0) { 4453 monitor->sub_cb(monitor->sub_cb_ctx, sub->subid); 4454 } 4455 } 4456 4457 ndb_monitor_unlock(monitor); 4458 } 4459 4460 uint64_t ndb_write_note_and_profile( 4461 struct ndb_txn *txn, 4462 struct ndb_writer_profile *profile, 4463 unsigned char *scratch, 4464 size_t scratch_size, 4465 uint32_t ndb_flags) 4466 { 4467 uint64_t note_nkey; 4468 4469 note_nkey = ndb_write_note(txn, &profile->note, scratch, scratch_size, ndb_flags); 4470 4471 if (profile->record.builder) { 4472 // only write if parsing didn't fail 4473 ndb_write_profile(txn, profile, note_nkey); 4474 } 4475 4476 return note_nkey; 4477 } 4478 4479 // only to be called from the writer thread 4480 static int ndb_write_version(struct ndb_txn *txn, uint64_t version) 4481 { 4482 int rc; 4483 MDB_val key, val; 4484 uint64_t version_key; 4485 4486 version_key = NDB_META_KEY_VERSION; 4487 4488 key.mv_data = &version_key; 4489 key.mv_size = sizeof(version_key); 4490 val.mv_data = &version; 4491 val.mv_size = sizeof(version); 4492 4493 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { 4494 ndb_debug("write version to ndb_meta failed: %s\n", 4495 mdb_strerror(rc)); 4496 return 0; 4497 } 4498 4499 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 4500 return 1; 4501 } 4502 4503 4504 static int ndb_run_migrations(struct ndb_txn *txn) 4505 { 4506 int64_t version, latest_version, i; 4507 4508 latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); 4509 4510 if ((version = ndb_db_version(txn)) == -1) { 4511 ndb_debug("run_migrations: no version found, assuming new db\n"); 4512 version = latest_version; 4513 4514 // no version found. fresh db? 4515 if (!ndb_write_version(txn, version)) { 4516 fprintf(stderr, "run_migrations: failed writing db version"); 4517 return 0; 4518 } 4519 4520 return 1; 4521 } else { 4522 ndb_debug("ndb: version %" PRIu64 " found\n", version); 4523 } 4524 4525 if (version < latest_version) 4526 fprintf(stderr, "nostrdb: migrating v%d -> v%d\n", 4527 (int)version, (int)latest_version); 4528 4529 for (i = version; i < latest_version; i++) { 4530 if (!MIGRATIONS[i].fn(txn)) { 4531 fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); 4532 return 0; 4533 } 4534 4535 if (!ndb_write_version(txn, i+1)) { 4536 fprintf(stderr, "run_migrations: failed writing db version"); 4537 return 0; 4538 } 4539 4540 version = i+1; 4541 } 4542 4543 return 1; 4544 } 4545 4546 4547 static void *ndb_writer_thread(void *data) 4548 { 4549 ndb_debug("started writer thread\n"); 4550 struct ndb_writer *writer = data; 4551 struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; 4552 struct written_note written_notes[THREAD_QUEUE_BATCH]; 4553 size_t scratch_size; 4554 int i, popped, done, needs_commit, num_notes; 4555 uint64_t note_nkey; 4556 struct ndb_txn txn; 4557 unsigned char *scratch; 4558 4559 // 8mb scratch buffer for parsing note content 4560 scratch_size = 8 * 1024 * 1024; 4561 scratch = malloc(scratch_size); 4562 MDB_txn *mdb_txn = NULL; 4563 ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn); 4564 4565 done = 0; 4566 while (!done) { 4567 txn.mdb_txn = NULL; 4568 num_notes = 0; 4569 ndb_debug("writer waiting for items\n"); 4570 popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); 4571 ndb_debug("writer popped %d items\n", popped); 4572 4573 needs_commit = 0; 4574 for (i = 0 ; i < popped; i++) { 4575 msg = &msgs[i]; 4576 switch (msg->type) { 4577 case NDB_WRITER_NOTE: needs_commit = 1; break; 4578 case NDB_WRITER_PROFILE: needs_commit = 1; break; 4579 case NDB_WRITER_DBMETA: needs_commit = 1; break; 4580 case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break; 4581 case NDB_WRITER_BLOCKS: needs_commit = 1; break; 4582 case NDB_WRITER_MIGRATE: needs_commit = 1; break; 4583 case NDB_WRITER_QUIT: break; 4584 } 4585 } 4586 4587 if (needs_commit && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn)) 4588 { 4589 fprintf(stderr, "writer thread txn_begin failed"); 4590 // should definitely not happen unless DB is full 4591 // or something ? 4592 continue; 4593 } 4594 4595 for (i = 0; i < popped; i++) { 4596 msg = &msgs[i]; 4597 4598 switch (msg->type) { 4599 case NDB_WRITER_QUIT: 4600 // quits are handled before this 4601 ndb_debug("writer thread got quit message\n"); 4602 done = 1; 4603 continue; 4604 case NDB_WRITER_PROFILE: 4605 note_nkey = 4606 ndb_write_note_and_profile( 4607 &txn, 4608 &msg->profile, 4609 scratch, 4610 scratch_size, 4611 writer->ndb_flags); 4612 4613 if (note_nkey > 0) { 4614 written_notes[num_notes++] = 4615 (struct written_note){ 4616 .note_id = note_nkey, 4617 .note = &msg->profile.note, 4618 }; 4619 } else { 4620 ndb_debug("failed to write note\n"); 4621 } 4622 break; 4623 case NDB_WRITER_NOTE: 4624 note_nkey = ndb_write_note(&txn, &msg->note, 4625 scratch, 4626 scratch_size, 4627 writer->ndb_flags); 4628 4629 if (note_nkey > 0) { 4630 written_notes[num_notes++] = (struct written_note){ 4631 .note_id = note_nkey, 4632 .note = &msg->note, 4633 }; 4634 } 4635 break; 4636 case NDB_WRITER_DBMETA: 4637 ndb_write_version(&txn, msg->ndb_meta.version); 4638 break; 4639 case NDB_WRITER_BLOCKS: 4640 ndb_write_blocks(&txn, msg->blocks.note_key, 4641 msg->blocks.blocks); 4642 break; 4643 case NDB_WRITER_MIGRATE: 4644 if (!ndb_run_migrations(&txn)) { 4645 mdb_txn_abort(txn.mdb_txn); 4646 goto bail; 4647 } 4648 break; 4649 case NDB_WRITER_PROFILE_LAST_FETCH: 4650 ndb_writer_last_profile_fetch(&txn, 4651 msg->last_fetch.pubkey, 4652 msg->last_fetch.fetched_at 4653 ); 4654 break; 4655 } 4656 } 4657 4658 // commit writes 4659 if (needs_commit) { 4660 if (!ndb_end_query(&txn)) { 4661 ndb_debug("writer thread txn commit failed\n"); 4662 } else { 4663 ndb_debug("notifying subscriptions, %d notes\n", num_notes); 4664 ndb_notify_subscriptions(writer->monitor, 4665 written_notes, 4666 num_notes); 4667 // update subscriptions 4668 } 4669 } 4670 4671 // free notes 4672 for (i = 0; i < popped; i++) { 4673 msg = &msgs[i]; 4674 if (msg->type == NDB_WRITER_NOTE) { 4675 free(msg->note.note); 4676 } else if (msg->type == NDB_WRITER_PROFILE) { 4677 free(msg->profile.note.note); 4678 //ndb_profile_record_builder_free(&msg->profile.record); 4679 } else if (msg->type == NDB_WRITER_BLOCKS) { 4680 ndb_blocks_free(msg->blocks.blocks); 4681 } 4682 } 4683 } 4684 4685 bail: 4686 free(scratch); 4687 ndb_debug("quitting writer thread\n"); 4688 return NULL; 4689 } 4690 4691 static void *ndb_ingester_thread(void *data) 4692 { 4693 secp256k1_context *ctx; 4694 struct thread *thread = data; 4695 struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx; 4696 struct ndb_lmdb *lmdb = ingester->lmdb; 4697 struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg; 4698 struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out; 4699 int i, to_write, popped, done, any_event; 4700 MDB_txn *read_txn = NULL; 4701 int rc; 4702 4703 ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); 4704 ndb_debug("started ingester thread\n"); 4705 4706 done = 0; 4707 while (!done) { 4708 to_write = 0; 4709 any_event = 0; 4710 4711 popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH); 4712 ndb_debug("ingester popped %d items\n", popped); 4713 4714 for (i = 0; i < popped; i++) { 4715 msg = &msgs[i]; 4716 if (msg->type == NDB_INGEST_EVENT) { 4717 any_event = 1; 4718 break; 4719 } 4720 } 4721 4722 if (any_event && (rc = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn))) { 4723 // this is bad 4724 fprintf(stderr, "UNUSUAL ndb_ingester: mdb_txn_begin failed: '%s'\n", 4725 mdb_strerror(rc)); 4726 continue; 4727 } 4728 4729 for (i = 0; i < popped; i++) { 4730 msg = &msgs[i]; 4731 switch (msg->type) { 4732 case NDB_INGEST_QUIT: 4733 done = 1; 4734 break; 4735 4736 case NDB_INGEST_EVENT: 4737 out = &outs[to_write]; 4738 if (ndb_ingester_process_event(ctx, ingester, 4739 &msg->event, out, 4740 read_txn)) { 4741 to_write++; 4742 } 4743 } 4744 } 4745 4746 if (any_event) 4747 mdb_txn_abort(read_txn); 4748 4749 if (to_write > 0) { 4750 ndb_debug("pushing %d events to write queue\n", to_write); 4751 if (!prot_queue_push_all(ingester->writer_inbox, outs, to_write)) { 4752 ndb_debug("failed pushing %d events to write queue\n", to_write); 4753 } 4754 } 4755 } 4756 4757 ndb_debug("quitting ingester thread\n"); 4758 secp256k1_context_destroy(ctx); 4759 return NULL; 4760 } 4761 4762 4763 static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb, 4764 struct ndb_monitor *monitor, uint32_t ndb_flags) 4765 { 4766 writer->lmdb = lmdb; 4767 writer->monitor = monitor; 4768 writer->ndb_flags = ndb_flags; 4769 writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE; 4770 writer->queue_buf = malloc(writer->queue_buflen); 4771 if (writer->queue_buf == NULL) { 4772 fprintf(stderr, "ndb: failed to allocate space for writer queue"); 4773 return 0; 4774 } 4775 4776 // init the writer queue. 4777 prot_queue_init(&writer->inbox, writer->queue_buf, 4778 writer->queue_buflen, sizeof(struct ndb_writer_msg)); 4779 4780 // spin up the writer thread 4781 if (THREAD_CREATE(writer->thread_id, ndb_writer_thread, writer)) 4782 { 4783 fprintf(stderr, "ndb writer thread failed to create\n"); 4784 return 0; 4785 } 4786 4787 return 1; 4788 } 4789 4790 // initialize the ingester queue and then spawn the thread 4791 static int ndb_ingester_init(struct ndb_ingester *ingester, 4792 struct ndb_lmdb *lmdb, 4793 struct prot_queue *writer_inbox, 4794 const struct ndb_config *config) 4795 { 4796 int elem_size, num_elems; 4797 static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT }; 4798 4799 // TODO: configurable queue sizes 4800 elem_size = sizeof(struct ndb_ingester_msg); 4801 num_elems = DEFAULT_QUEUE_SIZE; 4802 4803 ingester->writer_inbox = writer_inbox; 4804 ingester->lmdb = lmdb; 4805 ingester->flags = config->flags; 4806 ingester->filter = config->ingest_filter; 4807 ingester->filter_context = config->filter_context; 4808 4809 if (!threadpool_init(&ingester->tp, config->ingester_threads, 4810 elem_size, num_elems, &quit_msg, ingester, 4811 ndb_ingester_thread)) 4812 { 4813 fprintf(stderr, "ndb ingester threadpool failed to init\n"); 4814 return 0; 4815 } 4816 4817 return 1; 4818 } 4819 4820 static int ndb_writer_destroy(struct ndb_writer *writer) 4821 { 4822 struct ndb_writer_msg msg; 4823 4824 // kill thread 4825 msg.type = NDB_WRITER_QUIT; 4826 ndb_debug("writer: pushing quit message\n"); 4827 if (!prot_queue_push(&writer->inbox, &msg)) { 4828 // queue is too full to push quit message. just kill it. 4829 ndb_debug("writer: terminating thread\n"); 4830 THREAD_TERMINATE(writer->thread_id); 4831 } else { 4832 ndb_debug("writer: joining thread\n"); 4833 THREAD_FINISH(writer->thread_id); 4834 } 4835 4836 // cleanup 4837 ndb_debug("writer: cleaning up protected queue\n"); 4838 prot_queue_destroy(&writer->inbox); 4839 4840 free(writer->queue_buf); 4841 4842 return 1; 4843 } 4844 4845 static int ndb_ingester_destroy(struct ndb_ingester *ingester) 4846 { 4847 threadpool_destroy(&ingester->tp); 4848 return 1; 4849 } 4850 4851 static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize) 4852 { 4853 int rc; 4854 MDB_txn *txn; 4855 4856 if ((rc = mdb_env_create(&lmdb->env))) { 4857 fprintf(stderr, "mdb_env_create failed, error %d\n", rc); 4858 return 0; 4859 } 4860 4861 if ((rc = mdb_env_set_mapsize(lmdb->env, mapsize))) { 4862 fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc); 4863 return 0; 4864 } 4865 4866 if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) { 4867 fprintf(stderr, "mdb_env_set_maxdbs failed, error %d\n", rc); 4868 return 0; 4869 } 4870 4871 if ((rc = mdb_env_open(lmdb->env, filename, 0, 0664))) { 4872 fprintf(stderr, "mdb_env_open failed, error %d\n", rc); 4873 return 0; 4874 } 4875 4876 // Initialize DBs 4877 if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) { 4878 fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc); 4879 return 0; 4880 } 4881 4882 // note flatbuffer db 4883 if ((rc = mdb_dbi_open(txn, "note", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NOTE]))) { 4884 fprintf(stderr, "mdb_dbi_open event failed, error %d\n", rc); 4885 return 0; 4886 } 4887 4888 // note metadata db 4889 if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) { 4890 fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc); 4891 return 0; 4892 } 4893 4894 // profile flatbuffer db 4895 if ((rc = mdb_dbi_open(txn, "profile", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_PROFILE]))) { 4896 fprintf(stderr, "mdb_dbi_open profile failed, error %d\n", rc); 4897 return 0; 4898 } 4899 4900 // profile search db 4901 if ((rc = mdb_dbi_open(txn, "profile_search", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_SEARCH]))) { 4902 fprintf(stderr, "mdb_dbi_open profile_search failed, error %d\n", rc); 4903 return 0; 4904 } 4905 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], ndb_search_key_cmp); 4906 4907 // ndb metadata (db version, etc) 4908 if ((rc = mdb_dbi_open(txn, "ndb_meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NDB_META]))) { 4909 fprintf(stderr, "mdb_dbi_open ndb_meta failed, error %d\n", rc); 4910 return 0; 4911 } 4912 4913 // profile last fetches 4914 if ((rc = mdb_dbi_open(txn, "profile_last_fetch", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH]))) { 4915 fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); 4916 return 0; 4917 } 4918 4919 // id+ts index flags 4920 unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; 4921 4922 // index dbs 4923 if ((rc = mdb_dbi_open(txn, "note_id", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_ID]))) { 4924 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 4925 return 0; 4926 } 4927 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare); 4928 4929 if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) { 4930 fprintf(stderr, "mdb_dbi_open profile_pk failed: %s\n", mdb_strerror(rc)); 4931 return 0; 4932 } 4933 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare); 4934 4935 if ((rc = mdb_dbi_open(txn, "note_kind", 4936 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 4937 &lmdb->dbs[NDB_DB_NOTE_KIND]))) { 4938 fprintf(stderr, "mdb_dbi_open note_kind failed: %s\n", mdb_strerror(rc)); 4939 return 0; 4940 } 4941 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_ts_compare); 4942 4943 if ((rc = mdb_dbi_open(txn, "note_pubkey", 4944 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 4945 &lmdb->dbs[NDB_DB_NOTE_PUBKEY]))) { 4946 fprintf(stderr, "mdb_dbi_open note_pubkey failed: %s\n", mdb_strerror(rc)); 4947 return 0; 4948 } 4949 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_PUBKEY], ndb_tsid_compare); 4950 4951 if ((rc = mdb_dbi_open(txn, "note_pubkey_kind", 4952 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 4953 &lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND]))) { 4954 fprintf(stderr, "mdb_dbi_open note_pubkey_kind failed: %s\n", mdb_strerror(rc)); 4955 return 0; 4956 } 4957 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND], ndb_id_u64_ts_compare); 4958 4959 if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT, 4960 &lmdb->dbs[NDB_DB_NOTE_TEXT]))) { 4961 fprintf(stderr, "mdb_dbi_open note_text failed: %s\n", mdb_strerror(rc)); 4962 return 0; 4963 } 4964 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare); 4965 4966 if ((rc = mdb_dbi_open(txn, "note_blocks", MDB_CREATE | MDB_INTEGERKEY, 4967 &lmdb->dbs[NDB_DB_NOTE_BLOCKS]))) { 4968 fprintf(stderr, "mdb_dbi_open note_blocks failed: %s\n", mdb_strerror(rc)); 4969 return 0; 4970 } 4971 4972 if ((rc = mdb_dbi_open(txn, "note_tags", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, 4973 &lmdb->dbs[NDB_DB_NOTE_TAGS]))) { 4974 fprintf(stderr, "mdb_dbi_open note_tags failed: %s\n", mdb_strerror(rc)); 4975 return 0; 4976 } 4977 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TAGS], ndb_tag_key_compare); 4978 4979 // Commit the transaction 4980 if ((rc = mdb_txn_commit(txn))) { 4981 fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc); 4982 return 0; 4983 } 4984 4985 return 1; 4986 } 4987 4988 static int ndb_queue_write_version(struct ndb *ndb, uint64_t version) 4989 { 4990 struct ndb_writer_msg msg; 4991 msg.type = NDB_WRITER_DBMETA; 4992 msg.ndb_meta.version = version; 4993 return ndb_writer_queue_msg(&ndb->writer, &msg); 4994 } 4995 4996 static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb, 4997 void *sub_cb_ctx) 4998 { 4999 monitor->num_subscriptions = 0; 5000 monitor->sub_cb = cb; 5001 monitor->sub_cb_ctx = sub_cb_ctx; 5002 pthread_mutex_init(&monitor->mutex, NULL); 5003 } 5004 5005 void ndb_filter_group_destroy(struct ndb_filter_group *group) 5006 { 5007 struct ndb_filter *filter; 5008 int i; 5009 for (i = 0; i < group->num_filters; i++) { 5010 filter = &group->filters[i]; 5011 ndb_filter_destroy(filter); 5012 } 5013 } 5014 5015 static void ndb_subscription_destroy(struct ndb_subscription *sub) 5016 { 5017 ndb_filter_group_destroy(&sub->group); 5018 // this was malloc'd inside ndb_subscribe 5019 free(sub->inbox.buf); 5020 prot_queue_destroy(&sub->inbox); 5021 sub->subid = 0; 5022 } 5023 5024 static void ndb_monitor_destroy(struct ndb_monitor *monitor) 5025 { 5026 int i; 5027 5028 ndb_monitor_lock(monitor); 5029 5030 for (i = 0; i < monitor->num_subscriptions; i++) { 5031 ndb_subscription_destroy(&monitor->subscriptions[i]); 5032 } 5033 5034 monitor->num_subscriptions = 0; 5035 5036 ndb_monitor_unlock(monitor); 5037 5038 pthread_mutex_destroy(&monitor->mutex); 5039 } 5040 5041 int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config) 5042 { 5043 struct ndb *ndb; 5044 //MDB_dbi ind_id; // TODO: ind_pk, etc 5045 5046 ndb = *pndb = calloc(1, sizeof(struct ndb)); 5047 ndb->flags = config->flags; 5048 5049 if (ndb == NULL) { 5050 fprintf(stderr, "ndb_init: malloc failed\n"); 5051 return 0; 5052 } 5053 5054 if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize)) 5055 return 0; 5056 5057 ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx); 5058 5059 if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor, ndb->flags)) { 5060 fprintf(stderr, "ndb_writer_init failed\n"); 5061 return 0; 5062 } 5063 5064 if (!ndb_ingester_init(&ndb->ingester, &ndb->lmdb, &ndb->writer.inbox, config)) { 5065 fprintf(stderr, "failed to initialize %d ingester thread(s)\n", 5066 config->ingester_threads); 5067 return 0; 5068 } 5069 5070 if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE)) { 5071 struct ndb_writer_msg msg = { .type = NDB_WRITER_MIGRATE }; 5072 ndb_writer_queue_msg(&ndb->writer, &msg); 5073 } 5074 5075 // Initialize LMDB environment and spin up threads 5076 return 1; 5077 } 5078 5079 void ndb_destroy(struct ndb *ndb) 5080 { 5081 if (ndb == NULL) 5082 return; 5083 5084 // ingester depends on writer and must be destroyed first 5085 ndb_debug("destroying ingester\n"); 5086 ndb_ingester_destroy(&ndb->ingester); 5087 ndb_debug("destroying writer\n"); 5088 ndb_writer_destroy(&ndb->writer); 5089 ndb_debug("destroying monitor\n"); 5090 ndb_monitor_destroy(&ndb->monitor); 5091 5092 ndb_debug("closing env\n"); 5093 mdb_env_close(ndb->lmdb.env); 5094 5095 ndb_debug("ndb destroyed\n"); 5096 free(ndb); 5097 } 5098 5099 // Process a nostr event from a client 5100 // 5101 // ie: ["EVENT", {"content":"..."} ...] 5102 // 5103 // The client-sent variation of ndb_process_event 5104 int ndb_process_client_event(struct ndb *ndb, const char *json, int len) 5105 { 5106 return ndb_ingest_event(&ndb->ingester, json, len, 1); 5107 } 5108 5109 // Process anostr event from a relay, 5110 // 5111 // ie: ["EVENT", "subid", {"content":"..."}...] 5112 // 5113 // This function returns as soon as possible, first copying the passed 5114 // json and then queueing it up for processing. Worker threads then take 5115 // the json and process it. 5116 // 5117 // Processing: 5118 // 5119 // 1. The event is parsed into ndb_notes and the signature is validated 5120 // 2. A quick lookup is made on the database to see if we already have 5121 // the note id, if we do we don't need to waste time on json parsing 5122 // or note validation. 5123 // 3. Once validation is done we pass it to the writer queue for writing 5124 // to LMDB. 5125 // 5126 int ndb_process_event(struct ndb *ndb, const char *json, int json_len) 5127 { 5128 return ndb_ingest_event(&ndb->ingester, json, json_len, 0); 5129 } 5130 5131 5132 int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client) 5133 { 5134 const char *start, *end, *very_end; 5135 start = ldjson; 5136 end = start + json_len; 5137 very_end = ldjson + json_len; 5138 int (* process)(struct ndb *, const char *, int); 5139 #if DEBUG 5140 int processed = 0; 5141 #endif 5142 process = client ? ndb_process_client_event : ndb_process_event; 5143 5144 while ((end = fast_strchr(start, '\n', very_end - start))) { 5145 //printf("processing '%.*s'\n", (int)(end-start), start); 5146 if (!process(ndb, start, end - start)) { 5147 ndb_debug("ndb_process_client_event failed\n"); 5148 return 0; 5149 } 5150 start = end + 1; 5151 #if DEBUG 5152 processed++; 5153 #endif 5154 } 5155 5156 #if DEBUG 5157 ndb_debug("ndb_process_events: processed %d events\n", processed); 5158 #endif 5159 5160 return 1; 5161 } 5162 5163 #ifndef _WIN32 5164 // TODO: windows 5165 int ndb_process_events_stream(struct ndb *ndb, FILE* fp) 5166 { 5167 char *line = NULL; 5168 size_t len = 0; 5169 ssize_t nread; 5170 5171 while ((nread = getline(&line, &len, fp)) != -1) { 5172 if (line == NULL) 5173 break; 5174 ndb_process_event(ndb, line, len); 5175 } 5176 5177 if (line) 5178 free(line); 5179 5180 return 1; 5181 } 5182 #endif 5183 5184 int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len) 5185 { 5186 return _ndb_process_events(ndb, ldjson, json_len, 1); 5187 } 5188 5189 int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len) 5190 { 5191 return _ndb_process_events(ndb, ldjson, json_len, 0); 5192 } 5193 5194 static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag) 5195 { 5196 return cursor_push_u16(cur, tag->count); 5197 } 5198 5199 int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf, 5200 size_t bufsize) 5201 { 5202 struct ndb_note *note; 5203 int half, size, str_indices_size; 5204 5205 // come on bruh 5206 if (bufsize < sizeof(struct ndb_note) * 2) 5207 return 0; 5208 5209 str_indices_size = bufsize / 32; 5210 size = bufsize - str_indices_size; 5211 half = size / 2; 5212 5213 //debug("size %d half %d str_indices %d\n", size, half, str_indices_size); 5214 5215 // make a safe cursor of our available memory 5216 make_cursor(buf, buf + bufsize, &builder->mem); 5217 5218 note = builder->note = (struct ndb_note *)buf; 5219 5220 // take slices of the memory into subcursors 5221 if (!(cursor_slice(&builder->mem, &builder->note_cur, half) && 5222 cursor_slice(&builder->mem, &builder->strings, half) && 5223 cursor_slice(&builder->mem, &builder->str_indices, str_indices_size))) { 5224 return 0; 5225 } 5226 5227 memset(note, 0, sizeof(*note)); 5228 builder->note_cur.p += sizeof(*note); 5229 5230 note->strings = builder->strings.start - buf; 5231 note->version = 1; 5232 5233 return 1; 5234 } 5235 5236 5237 5238 static inline int ndb_json_parser_init(struct ndb_json_parser *p, 5239 const char *json, int json_len, 5240 unsigned char *buf, int bufsize) 5241 { 5242 int half = bufsize / 2; 5243 5244 unsigned char *tok_start = buf + half; 5245 unsigned char *tok_end = buf + bufsize; 5246 5247 p->toks = (jsmntok_t*)tok_start; 5248 p->toks_end = (jsmntok_t*)tok_end; 5249 p->num_tokens = 0; 5250 p->json = json; 5251 p->json_len = json_len; 5252 5253 // ndb_builder gets the first half of the buffer, and jsmn gets the 5254 // second half. I like this way of alloating memory (without actually 5255 // dynamically allocating memory). You get one big chunk upfront and 5256 // then submodules can recursively subdivide it. Maybe you could do 5257 // something even more clever like golden-ratio style subdivision where 5258 // the more important stuff gets a larger chunk and then it spirals 5259 // downward into smaller chunks. Thanks for coming to my TED talk. 5260 5261 if (!ndb_builder_init(&p->builder, buf, half)) 5262 return 0; 5263 5264 jsmn_init(&p->json_parser); 5265 5266 return 1; 5267 } 5268 5269 static inline int ndb_json_parser_parse(struct ndb_json_parser *p, 5270 struct ndb_id_cb *cb) 5271 { 5272 jsmntok_t *tok; 5273 int cap = ((unsigned char *)p->toks_end - (unsigned char*)p->toks)/sizeof(*p->toks); 5274 int res = 5275 jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap, cb != NULL); 5276 5277 // got an ID! 5278 if (res == -42) { 5279 tok = &p->toks[p->json_parser.toknext-1]; 5280 5281 switch (cb->fn(cb->data, p->json + tok->start)) { 5282 case NDB_IDRES_CONT: 5283 res = jsmn_parse(&p->json_parser, p->json, p->json_len, 5284 p->toks, cap, 0); 5285 break; 5286 case NDB_IDRES_STOP: 5287 return -42; 5288 } 5289 } else if (res == 0) { 5290 return 0; 5291 } 5292 5293 p->num_tokens = res; 5294 p->i = 0; 5295 5296 return 1; 5297 } 5298 5299 static inline int toksize(jsmntok_t *tok) 5300 { 5301 return tok->end - tok->start; 5302 } 5303 5304 5305 5306 static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2) 5307 { 5308 switch (c2) { 5309 case 't': return cursor_push_byte(cur, '\t'); 5310 case 'n': return cursor_push_byte(cur, '\n'); 5311 case 'r': return cursor_push_byte(cur, '\r'); 5312 case 'b': return cursor_push_byte(cur, '\b'); 5313 case 'f': return cursor_push_byte(cur, '\f'); 5314 case '\\': return cursor_push_byte(cur, '\\'); 5315 case '/': return cursor_push_byte(cur, '/'); 5316 case '"': return cursor_push_byte(cur, '"'); 5317 case 'u': 5318 // these aren't handled yet 5319 return 0; 5320 default: 5321 return cursor_push_byte(cur, c1) && cursor_push_byte(cur, c2); 5322 } 5323 } 5324 5325 static int cursor_push_escaped_char(struct cursor *cur, char c) 5326 { 5327 switch (c) { 5328 case '"': return cursor_push_str(cur, "\\\""); 5329 case '\\': return cursor_push_str(cur, "\\\\"); 5330 case '\b': return cursor_push_str(cur, "\\b"); 5331 case '\f': return cursor_push_str(cur, "\\f"); 5332 case '\n': return cursor_push_str(cur, "\\n"); 5333 case '\r': return cursor_push_str(cur, "\\r"); 5334 case '\t': return cursor_push_str(cur, "\\t"); 5335 // TODO: \u hex hex hex hex 5336 } 5337 return cursor_push_byte(cur, c); 5338 } 5339 5340 static int cursor_push_hex_str(struct cursor *cur, unsigned char *buf, int len) 5341 { 5342 int i; 5343 5344 if (len % 2 != 0) 5345 return 0; 5346 5347 if (!cursor_push_byte(cur, '"')) 5348 return 0; 5349 5350 for (i = 0; i < len; i++) { 5351 unsigned int c = ((const unsigned char *)buf)[i]; 5352 if (!cursor_push_byte(cur, hexchar(c >> 4))) 5353 return 0; 5354 if (!cursor_push_byte(cur, hexchar(c & 0xF))) 5355 return 0; 5356 } 5357 5358 if (!cursor_push_byte(cur, '"')) 5359 return 0; 5360 5361 return 1; 5362 } 5363 5364 static int cursor_push_jsonstr(struct cursor *cur, const char *str) 5365 { 5366 int i; 5367 int len; 5368 5369 len = strlen(str); 5370 5371 if (!cursor_push_byte(cur, '"')) 5372 return 0; 5373 5374 for (i = 0; i < len; i++) { 5375 if (!cursor_push_escaped_char(cur, str[i])) 5376 return 0; 5377 } 5378 5379 if (!cursor_push_byte(cur, '"')) 5380 return 0; 5381 5382 return 1; 5383 } 5384 5385 5386 static inline int cursor_push_json_tag_str(struct cursor *cur, struct ndb_str str) 5387 { 5388 if (str.flag == NDB_PACKED_ID) 5389 return cursor_push_hex_str(cur, str.id, 32); 5390 5391 return cursor_push_jsonstr(cur, str.str); 5392 } 5393 5394 static int cursor_push_json_tag(struct cursor *cur, struct ndb_note *note, 5395 struct ndb_tag *tag) 5396 { 5397 int i; 5398 5399 if (!cursor_push_byte(cur, '[')) 5400 return 0; 5401 5402 for (i = 0; i < tag->count; i++) { 5403 if (!cursor_push_json_tag_str(cur, ndb_tag_str(note, tag, i))) 5404 return 0; 5405 if (i != tag->count-1 && !cursor_push_byte(cur, ',')) 5406 return 0; 5407 } 5408 5409 return cursor_push_byte(cur, ']'); 5410 } 5411 5412 static int cursor_push_json_tags(struct cursor *cur, struct ndb_note *note) 5413 { 5414 int i; 5415 struct ndb_iterator iter, *it = &iter; 5416 ndb_tags_iterate_start(note, it); 5417 5418 if (!cursor_push_byte(cur, '[')) 5419 return 0; 5420 5421 i = 0; 5422 while (ndb_tags_iterate_next(it)) { 5423 if (!cursor_push_json_tag(cur, note, it->tag)) 5424 return 0; 5425 if (i != note->tags.count-1 && !cursor_push_str(cur, ",")) 5426 return 0; 5427 i++; 5428 } 5429 5430 if (!cursor_push_byte(cur, ']')) 5431 return 0; 5432 5433 return 1; 5434 } 5435 5436 static int ndb_event_commitment(struct ndb_note *ev, unsigned char *buf, int buflen) 5437 { 5438 char timebuf[16] = {0}; 5439 char kindbuf[16] = {0}; 5440 char pubkey[65]; 5441 struct cursor cur; 5442 int ok; 5443 5444 if (!hex_encode(ev->pubkey, sizeof(ev->pubkey), pubkey)) 5445 return 0; 5446 5447 make_cursor(buf, buf + buflen, &cur); 5448 5449 // TODO: update in 2106 ... 5450 snprintf(timebuf, sizeof(timebuf), "%d", (uint32_t)ev->created_at); 5451 snprintf(kindbuf, sizeof(kindbuf), "%d", ev->kind); 5452 5453 ok = 5454 cursor_push_str(&cur, "[0,\"") && 5455 cursor_push_str(&cur, pubkey) && 5456 cursor_push_str(&cur, "\",") && 5457 cursor_push_str(&cur, timebuf) && 5458 cursor_push_str(&cur, ",") && 5459 cursor_push_str(&cur, kindbuf) && 5460 cursor_push_str(&cur, ",") && 5461 cursor_push_json_tags(&cur, ev) && 5462 cursor_push_str(&cur, ",") && 5463 cursor_push_jsonstr(&cur, ndb_note_str(ev, &ev->content).str) && 5464 cursor_push_str(&cur, "]"); 5465 5466 if (!ok) 5467 return 0; 5468 5469 return cur.p - cur.start; 5470 } 5471 5472 static int cursor_push_hex(struct cursor *c, unsigned char *bytes, int len) 5473 { 5474 int i; 5475 unsigned char chr; 5476 if (c->p + (len * 2) >= c->end) 5477 return 0; 5478 5479 for (i = 0; i < len; i++) { 5480 chr = bytes[i]; 5481 5482 *(c->p++) = hexchar(chr >> 4); 5483 *(c->p++) = hexchar(chr & 0xF); 5484 } 5485 5486 return 1; 5487 } 5488 5489 static int cursor_push_int_str(struct cursor *c, uint64_t num) 5490 { 5491 char timebuf[16] = {0}; 5492 snprintf(timebuf, sizeof(timebuf), "%" PRIu64, num); 5493 return cursor_push_str(c, timebuf); 5494 } 5495 5496 int ndb_note_json(struct ndb_note *note, char *buf, int buflen) 5497 { 5498 struct cursor cur, *c = &cur; 5499 5500 make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, &cur); 5501 5502 int ok = cursor_push_str(c, "{\"id\":\"") && 5503 cursor_push_hex(c, ndb_note_id(note), 32) && 5504 cursor_push_str(c, "\",\"pubkey\":\"") && 5505 cursor_push_hex(c, ndb_note_pubkey(note), 32) && 5506 cursor_push_str(c, "\",\"created_at\":") && 5507 cursor_push_int_str(c, ndb_note_created_at(note)) && 5508 cursor_push_str(c, ",\"kind\":") && 5509 cursor_push_int_str(c, ndb_note_kind(note)) && 5510 cursor_push_str(c, ",\"tags\":") && 5511 cursor_push_json_tags(c, note) && 5512 cursor_push_str(c, ",\"content\":") && 5513 cursor_push_jsonstr(c, ndb_note_content(note)) && 5514 cursor_push_str(c, ",\"sig\":\"") && 5515 cursor_push_hex(c, ndb_note_sig(note), 64) && 5516 cursor_push_c_str(c, "\"}"); 5517 5518 if (!ok) { 5519 return 0; 5520 } 5521 5522 return cur.p - cur.start; 5523 } 5524 5525 static int cursor_push_json_elem_array(struct cursor *cur, 5526 const struct ndb_filter *filter, 5527 struct ndb_filter_elements *elems) 5528 { 5529 int i; 5530 unsigned char *id; 5531 const char *str; 5532 uint64_t val; 5533 5534 if (!cursor_push_byte(cur, '[')) 5535 return 0; 5536 5537 for (i = 0; i < elems->count; i++) { 5538 5539 switch (elems->field.elem_type) { 5540 case NDB_ELEMENT_STRING: 5541 str = ndb_filter_get_string_element(filter, elems, i); 5542 if (!cursor_push_jsonstr(cur, str)) 5543 return 0; 5544 break; 5545 case NDB_ELEMENT_ID: 5546 id = ndb_filter_get_id_element(filter, elems, i); 5547 if (!cursor_push_hex_str(cur, id, 32)) 5548 return 0; 5549 break; 5550 case NDB_ELEMENT_INT: 5551 val = ndb_filter_get_int_element(elems, i); 5552 if (!cursor_push_int_str(cur, val)) 5553 return 0; 5554 break; 5555 case NDB_ELEMENT_UNKNOWN: 5556 ndb_debug("unknown element in cursor_push_json_elem_array"); 5557 return 0; 5558 } 5559 5560 if (i != elems->count-1) { 5561 if (!cursor_push_byte(cur, ',')) 5562 return 0; 5563 } 5564 } 5565 5566 if (!cursor_push_str(cur, "]")) 5567 return 0; 5568 5569 return 1; 5570 } 5571 5572 int ndb_filter_json(const struct ndb_filter *filter, char *buf, int buflen) 5573 { 5574 const char *str; 5575 struct cursor cur, *c = &cur; 5576 struct ndb_filter_elements *elems; 5577 int i; 5578 5579 if (!filter->finalized) { 5580 ndb_debug("filter not finalized in ndb_filter_json\n"); 5581 return 0; 5582 } 5583 5584 make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, c); 5585 5586 if (!cursor_push_str(c, "{")) 5587 return 0; 5588 5589 for (i = 0; i < filter->num_elements; i++) { 5590 elems = ndb_filter_get_elements(filter, i); 5591 switch (elems->field.type) { 5592 case NDB_FILTER_IDS: 5593 if (!cursor_push_str(c, "\"ids\":")) 5594 return 0; 5595 if (!cursor_push_json_elem_array(c, filter, elems)) 5596 return 0; 5597 break; 5598 case NDB_FILTER_SEARCH: 5599 if (!cursor_push_str(c, "\"search\":")) 5600 return 0; 5601 if (!(str = ndb_filter_get_string_element(filter, elems, 0))) 5602 return 0; 5603 if (!cursor_push_jsonstr(c, str)) 5604 return 0; 5605 break; 5606 case NDB_FILTER_AUTHORS: 5607 if (!cursor_push_str(c, "\"authors\":")) 5608 return 0; 5609 if (!cursor_push_json_elem_array(c, filter, elems)) 5610 return 0; 5611 break; 5612 case NDB_FILTER_KINDS: 5613 if (!cursor_push_str(c, "\"kinds\":")) 5614 return 0; 5615 if (!cursor_push_json_elem_array(c, filter, elems)) 5616 return 0; 5617 break; 5618 case NDB_FILTER_TAGS: 5619 if (!cursor_push_str(c, "\"#")) 5620 return 0; 5621 if (!cursor_push_byte(c, elems->field.tag)) 5622 return 0; 5623 if (!cursor_push_str(c, "\":")) 5624 return 0; 5625 if (!cursor_push_json_elem_array(c, filter, elems)) 5626 return 0; 5627 break; 5628 case NDB_FILTER_SINCE: 5629 if (!cursor_push_str(c, "\"since\":")) 5630 return 0; 5631 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 5632 return 0; 5633 break; 5634 case NDB_FILTER_UNTIL: 5635 if (!cursor_push_str(c, "\"until\":")) 5636 return 0; 5637 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 5638 return 0; 5639 break; 5640 case NDB_FILTER_LIMIT: 5641 if (!cursor_push_str(c, "\"limit\":")) 5642 return 0; 5643 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 5644 return 0; 5645 break; 5646 } 5647 5648 if (i != filter->num_elements-1) { 5649 if (!cursor_push_byte(c, ',')) { 5650 return 0; 5651 } 5652 } 5653 5654 } 5655 5656 if (!cursor_push_c_str(c, "}")) 5657 return 0; 5658 5659 return cur.p - cur.start; 5660 } 5661 5662 int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen) { 5663 int len; 5664 5665 if (!(len = ndb_event_commitment(note, buf, buflen))) 5666 return 0; 5667 5668 //fprintf(stderr, "%.*s\n", len, buf); 5669 5670 sha256((struct sha256*)note->id, buf, len); 5671 5672 return 1; 5673 } 5674 5675 int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32], 5676 unsigned char sig[64]) 5677 { 5678 unsigned char aux[32]; 5679 secp256k1_keypair *pair = (secp256k1_keypair*) keypair->pair; 5680 5681 if (!fill_random(aux, sizeof(aux))) 5682 return 0; 5683 5684 secp256k1_context *ctx = 5685 secp256k1_context_create(SECP256K1_CONTEXT_NONE); 5686 5687 return secp256k1_schnorrsig_sign32(ctx, sig, id, pair, aux); 5688 } 5689 5690 int ndb_create_keypair(struct ndb_keypair *kp) 5691 { 5692 secp256k1_keypair *keypair = (secp256k1_keypair*)kp->pair; 5693 secp256k1_xonly_pubkey pubkey; 5694 5695 secp256k1_context *ctx = 5696 secp256k1_context_create(SECP256K1_CONTEXT_NONE);; 5697 5698 /* Try to create a keypair with a valid context, it should only 5699 * fail if the secret key is zero or out of range. */ 5700 if (!secp256k1_keypair_create(ctx, keypair, kp->secret)) 5701 return 0; 5702 5703 if (!secp256k1_keypair_xonly_pub(ctx, &pubkey, NULL, keypair)) 5704 return 0; 5705 5706 /* Serialize the public key. Should always return 1 for a valid public key. */ 5707 return secp256k1_xonly_pubkey_serialize(ctx, kp->pubkey, &pubkey); 5708 } 5709 5710 int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair) 5711 { 5712 if (!hex_decode(secstr, strlen(secstr), keypair->secret, 32)) { 5713 fprintf(stderr, "could not hex decode secret key\n"); 5714 return 0; 5715 } 5716 5717 return ndb_create_keypair(keypair); 5718 } 5719 5720 int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note, 5721 struct ndb_keypair *keypair) 5722 { 5723 int strings_len = builder->strings.p - builder->strings.start; 5724 unsigned char *note_end = builder->note_cur.p + strings_len; 5725 int total_size = note_end - builder->note_cur.start; 5726 5727 // move the strings buffer next to the end of our ndb_note 5728 memmove(builder->note_cur.p, builder->strings.start, strings_len); 5729 5730 // set the strings location 5731 builder->note->strings = builder->note_cur.p - builder->note_cur.start; 5732 5733 // record the total size 5734 //builder->note->size = total_size; 5735 5736 *note = builder->note; 5737 5738 // generate id and sign if we're building this manually 5739 if (keypair) { 5740 // use the remaining memory for building our id buffer 5741 unsigned char *end = builder->mem.end; 5742 unsigned char *start = (unsigned char*)(*note) + total_size; 5743 5744 ndb_builder_set_pubkey(builder, keypair->pubkey); 5745 5746 if (!ndb_calculate_id(builder->note, start, end - start)) 5747 return 0; 5748 5749 if (!ndb_sign_id(keypair, (*note)->id, (*note)->sig)) 5750 return 0; 5751 } 5752 5753 // make sure we're aligned as a whole 5754 total_size = (total_size + 7) & ~7; 5755 assert((total_size % 8) == 0); 5756 return total_size; 5757 } 5758 5759 struct ndb_note * ndb_builder_note(struct ndb_builder *builder) 5760 { 5761 return builder->note; 5762 } 5763 5764 static union ndb_packed_str ndb_offset_str(uint32_t offset) 5765 { 5766 // ensure accidents like -1 don't corrupt our packed_str 5767 union ndb_packed_str str; 5768 // most significant byte is reserved for ndb_packtype 5769 str.offset = offset & 0xFFFFFF; 5770 return str; 5771 } 5772 5773 5774 /// find an existing string via str_indices. these indices only exist in the 5775 /// builder phase just for this purpose. 5776 static inline int ndb_builder_find_str(struct ndb_builder *builder, 5777 const char *str, int len, 5778 union ndb_packed_str *pstr) 5779 { 5780 // find existing matching string to avoid duplicate strings 5781 int indices = cursor_count(&builder->str_indices, sizeof(uint32_t)); 5782 for (int i = 0; i < indices; i++) { 5783 uint32_t index = ((uint32_t*)builder->str_indices.start)[i]; 5784 const char *some_str = (const char*)builder->strings.start + index; 5785 5786 if (!memcmp(some_str, str, len) && some_str[len] == '\0') { 5787 // found an existing matching str, use that index 5788 *pstr = ndb_offset_str(index); 5789 return 1; 5790 } 5791 } 5792 5793 return 0; 5794 } 5795 5796 static int ndb_builder_push_str(struct ndb_builder *builder, const char *str, 5797 int len, union ndb_packed_str *pstr) 5798 { 5799 uint32_t loc; 5800 5801 // no string found, push a new one 5802 loc = builder->strings.p - builder->strings.start; 5803 if (!(cursor_push(&builder->strings, (unsigned char*)str, len) && 5804 cursor_push_byte(&builder->strings, '\0'))) { 5805 return 0; 5806 } 5807 5808 *pstr = ndb_offset_str(loc); 5809 5810 // record in builder indices. ignore return value, if we can't cache it 5811 // then whatever 5812 cursor_push_u32(&builder->str_indices, loc); 5813 5814 return 1; 5815 } 5816 5817 static int ndb_builder_push_packed_id(struct ndb_builder *builder, 5818 unsigned char *id, 5819 union ndb_packed_str *pstr) 5820 { 5821 // Don't both find id duplicates. very rarely are they duplicated 5822 // and it slows things down quite a bit. If we really care about this 5823 // We can switch to a hash table. 5824 //if (ndb_builder_find_str(builder, (const char*)id, 32, pstr)) { 5825 // pstr->packed.flag = NDB_PACKED_ID; 5826 // return 1; 5827 //} 5828 5829 if (ndb_builder_push_str(builder, (const char*)id, 32, pstr)) { 5830 pstr->packed.flag = NDB_PACKED_ID; 5831 return 1; 5832 } 5833 5834 return 0; 5835 } 5836 5837 union ndb_packed_str ndb_chars_to_packed_str(char c1, char c2) 5838 { 5839 union ndb_packed_str str; 5840 str.packed.flag = NDB_PACKED_STR; 5841 str.packed.str[0] = c1; 5842 str.packed.str[1] = c2; 5843 str.packed.str[2] = '\0'; 5844 return str; 5845 } 5846 5847 static union ndb_packed_str ndb_char_to_packed_str(char c) 5848 { 5849 union ndb_packed_str str; 5850 str.packed.flag = NDB_PACKED_STR; 5851 str.packed.str[0] = c; 5852 str.packed.str[1] = '\0'; 5853 return str; 5854 } 5855 5856 5857 /// Check for small strings to pack 5858 static inline int ndb_builder_try_compact_str(struct ndb_builder *builder, 5859 const char *str, int len, 5860 union ndb_packed_str *pstr, 5861 int pack_ids) 5862 { 5863 unsigned char id_buf[32]; 5864 5865 if (len == 0) { 5866 *pstr = ndb_char_to_packed_str(0); 5867 return 1; 5868 } else if (len == 1) { 5869 *pstr = ndb_char_to_packed_str(str[0]); 5870 return 1; 5871 } else if (len == 2) { 5872 *pstr = ndb_chars_to_packed_str(str[0], str[1]); 5873 return 1; 5874 } else if (pack_ids && len == 64 && hex_decode(str, 64, id_buf, 32)) { 5875 return ndb_builder_push_packed_id(builder, id_buf, pstr); 5876 } 5877 5878 return 0; 5879 } 5880 5881 5882 static int ndb_builder_push_unpacked_str(struct ndb_builder *builder, 5883 const char *str, int len, 5884 union ndb_packed_str *pstr) 5885 { 5886 if (ndb_builder_find_str(builder, str, len, pstr)) 5887 return 1; 5888 5889 return ndb_builder_push_str(builder, str, len, pstr); 5890 } 5891 5892 int ndb_builder_make_str(struct ndb_builder *builder, const char *str, int len, 5893 union ndb_packed_str *pstr, int pack_ids) 5894 { 5895 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 5896 return 1; 5897 5898 return ndb_builder_push_unpacked_str(builder, str, len, pstr); 5899 } 5900 5901 int ndb_builder_set_content(struct ndb_builder *builder, const char *content, 5902 int len) 5903 { 5904 int pack_ids = 0; 5905 builder->note->content_length = len; 5906 return ndb_builder_make_str(builder, content, len, 5907 &builder->note->content, pack_ids); 5908 } 5909 5910 5911 static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len, 5912 const char *s) 5913 { 5914 if (tok->type == JSMN_STRING && (int)strlen(s) == tok_len && 5915 memcmp(json + tok->start, s, tok_len) == 0) { 5916 return 1; 5917 } 5918 return 0; 5919 } 5920 5921 static int ndb_builder_finalize_tag(struct ndb_builder *builder, 5922 union ndb_packed_str offset) 5923 { 5924 if (!cursor_push_u32(&builder->note_cur, offset.offset)) 5925 return 0; 5926 builder->current_tag->count++; 5927 return 1; 5928 } 5929 5930 /// Unescape and push json strings 5931 static int ndb_builder_make_json_str(struct ndb_builder *builder, 5932 const char *str, int len, 5933 union ndb_packed_str *pstr, 5934 int *written, int pack_ids) 5935 { 5936 // let's not care about de-duping these. we should just unescape 5937 // in-place directly into the strings table. 5938 if (written) 5939 *written = len; 5940 5941 const char *p, *end, *start; 5942 unsigned char *builder_start; 5943 5944 // always try compact strings first 5945 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 5946 return 1; 5947 5948 end = str + len; 5949 start = str; // Initialize start to the beginning of the string 5950 5951 *pstr = ndb_offset_str(builder->strings.p - builder->strings.start); 5952 builder_start = builder->strings.p; 5953 5954 for (p = str; p < end; p++) { 5955 if (*p == '\\' && p+1 < end) { 5956 // Push the chunk of unescaped characters before this escape sequence 5957 if (start < p && !cursor_push(&builder->strings, 5958 (unsigned char *)start, 5959 p - start)) { 5960 return 0; 5961 } 5962 5963 if (!cursor_push_unescaped_char(&builder->strings, *p, *(p+1))) 5964 return 0; 5965 5966 p++; // Skip the character following the backslash 5967 start = p + 1; // Update the start pointer to the next character 5968 } 5969 } 5970 5971 // Handle the last chunk after the last escape sequence (or if there are no escape sequences at all) 5972 if (start < p && !cursor_push(&builder->strings, (unsigned char *)start, 5973 p - start)) { 5974 return 0; 5975 } 5976 5977 if (written) 5978 *written = builder->strings.p - builder_start; 5979 5980 // TODO: dedupe these!? 5981 return cursor_push_byte(&builder->strings, '\0'); 5982 } 5983 5984 static int ndb_builder_push_json_tag(struct ndb_builder *builder, 5985 const char *str, int len) 5986 { 5987 union ndb_packed_str pstr; 5988 int pack_ids = 1; 5989 if (!ndb_builder_make_json_str(builder, str, len, &pstr, NULL, pack_ids)) 5990 return 0; 5991 return ndb_builder_finalize_tag(builder, pstr); 5992 } 5993 5994 // Push a json array into an ndb tag ["p", "abcd..."] -> struct ndb_tag 5995 static int ndb_builder_tag_from_json_array(struct ndb_json_parser *p, 5996 jsmntok_t *array) 5997 { 5998 jsmntok_t *str_tok; 5999 const char *str; 6000 6001 if (array->size == 0) 6002 return 0; 6003 6004 if (!ndb_builder_new_tag(&p->builder)) 6005 return 0; 6006 6007 for (int i = 0; i < array->size; i++) { 6008 str_tok = &array[i+1]; 6009 str = p->json + str_tok->start; 6010 6011 if (!ndb_builder_push_json_tag(&p->builder, str, 6012 toksize(str_tok))) { 6013 return 0; 6014 } 6015 } 6016 6017 return 1; 6018 } 6019 6020 // Push json tags into ndb data 6021 // [["t", "hashtag"], ["p", "abcde..."]] -> struct ndb_tags 6022 static inline int ndb_builder_process_json_tags(struct ndb_json_parser *p, 6023 jsmntok_t *array) 6024 { 6025 jsmntok_t *tag = array; 6026 6027 if (array->size == 0) 6028 return 1; 6029 6030 for (int i = 0; i < array->size; i++) { 6031 if (!ndb_builder_tag_from_json_array(p, &tag[i+1])) 6032 return 0; 6033 tag += tag[i+1].size; 6034 } 6035 6036 return 1; 6037 } 6038 6039 static int parse_unsigned_int(const char *start, int len, unsigned int *num) 6040 { 6041 unsigned int number = 0; 6042 const char *p = start, *end = start + len; 6043 int digits = 0; 6044 6045 while (p < end) { 6046 char c = *p; 6047 6048 if (c < '0' || c > '9') 6049 break; 6050 6051 // Check for overflow 6052 char digit = c - '0'; 6053 if (number > (UINT_MAX - digit) / 10) 6054 return 0; // Overflow detected 6055 6056 number = number * 10 + digit; 6057 6058 p++; 6059 digits++; 6060 } 6061 6062 if (digits == 0) 6063 return 0; 6064 6065 *num = number; 6066 return 1; 6067 } 6068 6069 int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce, 6070 unsigned char *buf, int bufsize, struct ndb_id_cb *cb) 6071 { 6072 jsmntok_t *tok = NULL; 6073 int tok_len, res; 6074 struct ndb_json_parser parser; 6075 struct ndb_event *ev = &fce->event; 6076 6077 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6078 6079 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 6080 return res; 6081 6082 if (parser.toks[0].type == JSMN_OBJECT) { 6083 ndb_debug("got raw json in client_event_from_json\n"); 6084 fce->evtype = NDB_FCE_EVENT; 6085 return ndb_parse_json_note(&parser, &ev->note); 6086 } 6087 6088 if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY) 6089 return 0; 6090 6091 parser.i = 1; 6092 tok = &parser.toks[parser.i++]; 6093 tok_len = toksize(tok); 6094 if (tok->type != JSMN_STRING) 6095 return 0; 6096 6097 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 6098 fce->evtype = NDB_FCE_EVENT; 6099 return ndb_parse_json_note(&parser, &ev->note); 6100 } 6101 6102 return 0; 6103 } 6104 6105 6106 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, 6107 unsigned char *buf, int bufsize, 6108 struct ndb_id_cb *cb) 6109 { 6110 jsmntok_t *tok = NULL; 6111 int tok_len, res; 6112 struct ndb_json_parser parser; 6113 struct ndb_event *ev = &tce->event; 6114 6115 tce->subid_len = 0; 6116 tce->subid = ""; 6117 6118 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6119 6120 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 6121 return res; 6122 6123 if (parser.toks[0].type == JSMN_OBJECT) { 6124 ndb_debug("got raw json in ws_event_from_json\n"); 6125 tce->evtype = NDB_TCE_EVENT; 6126 return ndb_parse_json_note(&parser, &ev->note); 6127 } 6128 6129 if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY) 6130 return 0; 6131 6132 parser.i = 1; 6133 tok = &parser.toks[parser.i++]; 6134 tok_len = toksize(tok); 6135 if (tok->type != JSMN_STRING) 6136 return 0; 6137 6138 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 6139 tce->evtype = NDB_TCE_EVENT; 6140 6141 tok = &parser.toks[parser.i++]; 6142 if (tok->type != JSMN_STRING) 6143 return 0; 6144 6145 tce->subid = json + tok->start; 6146 tce->subid_len = toksize(tok); 6147 6148 return ndb_parse_json_note(&parser, &ev->note); 6149 } else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) { 6150 tce->evtype = NDB_TCE_EOSE; 6151 6152 tok = &parser.toks[parser.i++]; 6153 if (tok->type != JSMN_STRING) 6154 return 0; 6155 6156 tce->subid = json + tok->start; 6157 tce->subid_len = toksize(tok); 6158 return 1; 6159 } else if (tok_len == 2 && !memcmp("OK", json + tok->start, 2)) { 6160 if (parser.num_tokens != 5) 6161 return 0; 6162 6163 struct ndb_command_result *cr = &tce->command_result; 6164 6165 tce->evtype = NDB_TCE_OK; 6166 6167 tok = &parser.toks[parser.i++]; 6168 if (tok->type != JSMN_STRING) 6169 return 0; 6170 6171 tce->subid = json + tok->start; 6172 tce->subid_len = toksize(tok); 6173 6174 tok = &parser.toks[parser.i++]; 6175 if (tok->type != JSMN_PRIMITIVE || toksize(tok) == 0) 6176 return 0; 6177 6178 cr->ok = (json + tok->start)[0] == 't'; 6179 6180 tok = &parser.toks[parser.i++]; 6181 if (tok->type != JSMN_STRING) 6182 return 0; 6183 6184 tce->command_result.msg = json + tok->start; 6185 tce->command_result.msglen = toksize(tok); 6186 6187 return 1; 6188 } else if (tok_len == 4 && !memcmp("AUTH", json + tok->start, 4)) { 6189 tce->evtype = NDB_TCE_AUTH; 6190 6191 tok = &parser.toks[parser.i++]; 6192 if (tok->type != JSMN_STRING) 6193 return 0; 6194 6195 tce->subid = json + tok->start; 6196 tce->subid_len = toksize(tok); 6197 6198 return 1; 6199 } 6200 6201 return 0; 6202 } 6203 6204 static enum ndb_filter_fieldtype 6205 ndb_filter_parse_field(const char *tok, int len, char *tagchar) 6206 { 6207 *tagchar = 0; 6208 6209 if (len == 0) 6210 return 0; 6211 6212 if (len == 7 && !strncmp(tok, "authors", 7)) { 6213 return NDB_FILTER_AUTHORS; 6214 } else if (len == 3 && !strncmp(tok, "ids", 3)) { 6215 return NDB_FILTER_IDS; 6216 } else if (len == 5 && !strncmp(tok, "kinds", 5)) { 6217 return NDB_FILTER_KINDS; 6218 } else if (len == 2 && tok[0] == '#') { 6219 *tagchar = tok[1]; 6220 return NDB_FILTER_TAGS; 6221 } else if (len == 5 && !strncmp(tok, "since", 5)) { 6222 return NDB_FILTER_SINCE; 6223 } else if (len == 5 && !strncmp(tok, "until", 5)) { 6224 return NDB_FILTER_UNTIL; 6225 } else if (len == 5 && !strncmp(tok, "limit", 5)) { 6226 return NDB_FILTER_LIMIT; 6227 } else if (len == 6 && !strncmp(tok, "search", 6)) { 6228 return NDB_FILTER_SEARCH; 6229 } 6230 6231 return 0; 6232 } 6233 6234 static int ndb_filter_parse_json_ids(struct ndb_json_parser *parser, 6235 struct ndb_filter *filter) 6236 { 6237 jsmntok_t *tok; 6238 const char *start; 6239 unsigned char hexbuf[32]; 6240 int tok_len, i, size; 6241 6242 tok = &parser->toks[parser->i++]; 6243 6244 if (tok->type != JSMN_ARRAY) { 6245 ndb_debug("parse_json_ids: not an array\n"); 6246 return 0; 6247 } 6248 6249 size = tok->size; 6250 6251 for (i = 0; i < size; parser->i++, i++) { 6252 tok = &parser->toks[parser->i]; 6253 start = parser->json + tok->start; 6254 tok_len = toksize(tok); 6255 6256 if (tok->type != JSMN_STRING) { 6257 ndb_debug("parse_json_ids: not a string '%d'\n", tok->type); 6258 return 0; 6259 } 6260 6261 if (tok_len != 64) { 6262 ndb_debug("parse_json_ids: not len 64: '%.*s'\n", tok_len, start); 6263 return 0; 6264 } 6265 6266 // id 6267 if (!hex_decode(start, tok_len, hexbuf, sizeof(hexbuf))) { 6268 ndb_debug("parse_json_ids: hex decode failed\n"); 6269 return 0; 6270 } 6271 6272 ndb_debug("adding id elem\n"); 6273 if (!ndb_filter_add_id_element(filter, hexbuf)) { 6274 ndb_debug("parse_json_ids: failed to add id element\n"); 6275 return 0; 6276 } 6277 } 6278 6279 parser->i--; 6280 return 1; 6281 } 6282 6283 static int ndb_filter_parse_json_elems(struct ndb_json_parser *parser, 6284 struct ndb_filter *filter) 6285 { 6286 jsmntok_t *tok; 6287 const char *start; 6288 int tok_len; 6289 unsigned char hexbuf[32]; 6290 enum ndb_generic_element_type typ; 6291 tok = NULL; 6292 int i, size; 6293 6294 tok = &parser->toks[parser->i++]; 6295 6296 if (tok->type != JSMN_ARRAY) 6297 return 0; 6298 6299 size = tok->size; 6300 6301 for (i = 0; i < size; i++, parser->i++) { 6302 tok = &parser->toks[parser->i]; 6303 start = parser->json + tok->start; 6304 tok_len = toksize(tok); 6305 6306 if (tok->type != JSMN_STRING) 6307 return 0; 6308 6309 if (i == 0) { 6310 if (tok_len == 64 && hex_decode(start, 64, hexbuf, sizeof(hexbuf))) { 6311 typ = NDB_ELEMENT_ID; 6312 if (!ndb_filter_add_id_element(filter, hexbuf)) { 6313 ndb_debug("failed to push id elem\n"); 6314 return 0; 6315 } 6316 } else { 6317 typ = NDB_ELEMENT_STRING; 6318 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 6319 return 0; 6320 } 6321 } else if (typ == NDB_ELEMENT_ID) { 6322 if (!hex_decode(start, 64, hexbuf, sizeof(hexbuf))) 6323 return 0; 6324 if (!ndb_filter_add_id_element(filter, hexbuf)) 6325 return 0; 6326 } else if (typ == NDB_ELEMENT_STRING) { 6327 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 6328 return 0; 6329 } else { 6330 // ??? 6331 return 0; 6332 } 6333 } 6334 6335 parser->i--; 6336 return 1; 6337 } 6338 6339 static int ndb_filter_parse_json_str(struct ndb_json_parser *parser, 6340 struct ndb_filter *filter) 6341 { 6342 jsmntok_t *tok; 6343 const char *start; 6344 int tok_len; 6345 6346 tok = &parser->toks[parser->i]; 6347 start = parser->json + tok->start; 6348 tok_len = toksize(tok); 6349 6350 if (tok->type != JSMN_STRING) 6351 return 0; 6352 6353 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 6354 return 0; 6355 6356 ndb_debug("added str elem '%.*s'\n", tok_len, start); 6357 6358 return 1; 6359 } 6360 6361 static int ndb_filter_parse_json_int(struct ndb_json_parser *parser, 6362 struct ndb_filter *filter) 6363 { 6364 jsmntok_t *tok; 6365 const char *start; 6366 int tok_len; 6367 unsigned int value; 6368 6369 tok = &parser->toks[parser->i]; 6370 start = parser->json + tok->start; 6371 tok_len = toksize(tok); 6372 6373 if (tok->type != JSMN_PRIMITIVE) 6374 return 0; 6375 6376 if (!parse_unsigned_int(start, tok_len, &value)) 6377 return 0; 6378 6379 if (!ndb_filter_add_int_element(filter, (uint64_t)value)) 6380 return 0; 6381 6382 ndb_debug("added int elem %d\n", value); 6383 6384 return 1; 6385 } 6386 6387 6388 static int ndb_filter_parse_json_ints(struct ndb_json_parser *parser, 6389 struct ndb_filter *filter) 6390 { 6391 jsmntok_t *tok; 6392 int size, i; 6393 6394 tok = &parser->toks[parser->i++]; 6395 6396 if (tok->type != JSMN_ARRAY) 6397 return 0; 6398 6399 size = tok->size; 6400 6401 for (i = 0; i < size; parser->i++, i++) { 6402 if (!ndb_filter_parse_json_int(parser, filter)) 6403 return 0; 6404 } 6405 6406 parser->i--; 6407 return 1; 6408 } 6409 6410 6411 static int ndb_filter_parse_json(struct ndb_json_parser *parser, 6412 struct ndb_filter *filter) 6413 { 6414 jsmntok_t *tok = NULL; 6415 const char *json = parser->json; 6416 const char *start; 6417 char tag; 6418 int tok_len; 6419 enum ndb_filter_fieldtype field; 6420 6421 if (parser->toks[parser->i++].type != JSMN_OBJECT) 6422 return 0; 6423 6424 for (; parser->i < parser->num_tokens; parser->i++) { 6425 tok = &parser->toks[parser->i++]; 6426 start = json + tok->start; 6427 tok_len = toksize(tok); 6428 6429 if (!(field = ndb_filter_parse_field(start, tok_len, &tag))) { 6430 ndb_debug("failed field '%.*s'\n", tok_len, start); 6431 continue; 6432 } 6433 6434 if (tag) { 6435 ndb_debug("starting tag field '%c'\n", tag); 6436 if (!ndb_filter_start_tag_field(filter, tag)) { 6437 ndb_debug("failed to start tag field '%c'\n", tag); 6438 return 0; 6439 } 6440 } else if (!ndb_filter_start_field(filter, field)) { 6441 ndb_debug("field already started\n"); 6442 return 0; 6443 } 6444 6445 // we parsed a top-level field 6446 switch(field) { 6447 case NDB_FILTER_AUTHORS: 6448 case NDB_FILTER_IDS: 6449 if (!ndb_filter_parse_json_ids(parser, filter)) { 6450 ndb_debug("failed to parse filter ids/authors\n"); 6451 return 0; 6452 } 6453 break; 6454 case NDB_FILTER_SEARCH: 6455 if (!ndb_filter_parse_json_str(parser, filter)) { 6456 ndb_debug("failed to parse filter search str\n"); 6457 return 0; 6458 } 6459 break; 6460 case NDB_FILTER_SINCE: 6461 case NDB_FILTER_UNTIL: 6462 case NDB_FILTER_LIMIT: 6463 if (!ndb_filter_parse_json_int(parser, filter)) { 6464 ndb_debug("failed to parse filter since/until/limit\n"); 6465 return 0; 6466 } 6467 break; 6468 case NDB_FILTER_KINDS: 6469 if (!ndb_filter_parse_json_ints(parser, filter)) { 6470 ndb_debug("failed to parse filter kinds\n"); 6471 return 0; 6472 } 6473 break; 6474 case NDB_FILTER_TAGS: 6475 if (!ndb_filter_parse_json_elems(parser, filter)) { 6476 ndb_debug("failed to parse filter tags\n"); 6477 return 0; 6478 } 6479 break; 6480 } 6481 6482 ndb_filter_end_field(filter); 6483 } 6484 6485 return ndb_filter_end(filter); 6486 } 6487 6488 int ndb_parse_json_note(struct ndb_json_parser *parser, struct ndb_note **note) 6489 { 6490 jsmntok_t *tok = NULL; 6491 unsigned char hexbuf[64]; 6492 const char *json = parser->json; 6493 const char *start; 6494 int i, tok_len, parsed; 6495 6496 parsed = 0; 6497 6498 if (parser->toks[parser->i].type != JSMN_OBJECT) 6499 return 0; 6500 6501 // TODO: build id buffer and verify at end 6502 6503 for (i = parser->i + 1; i < parser->num_tokens; i++) { 6504 tok = &parser->toks[i]; 6505 start = json + tok->start; 6506 tok_len = toksize(tok); 6507 6508 //printf("toplevel %.*s %d\n", tok_len, json + tok->start, tok->type); 6509 if (tok_len == 0 || i + 1 >= parser->num_tokens) 6510 continue; 6511 6512 if (start[0] == 'p' && jsoneq(json, tok, tok_len, "pubkey")) { 6513 // pubkey 6514 tok = &parser->toks[i+1]; 6515 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6516 parsed |= NDB_PARSED_PUBKEY; 6517 ndb_builder_set_pubkey(&parser->builder, hexbuf); 6518 } else if (tok_len == 2 && start[0] == 'i' && start[1] == 'd') { 6519 // id 6520 tok = &parser->toks[i+1]; 6521 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6522 parsed |= NDB_PARSED_ID; 6523 ndb_builder_set_id(&parser->builder, hexbuf); 6524 } else if (tok_len == 3 && start[0] == 's' && start[1] == 'i' && start[2] == 'g') { 6525 // sig 6526 tok = &parser->toks[i+1]; 6527 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6528 parsed |= NDB_PARSED_SIG; 6529 ndb_builder_set_sig(&parser->builder, hexbuf); 6530 } else if (start[0] == 'k' && jsoneq(json, tok, tok_len, "kind")) { 6531 // kind 6532 tok = &parser->toks[i+1]; 6533 start = json + tok->start; 6534 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 6535 return 0; 6536 if (!parse_unsigned_int(start, toksize(tok), 6537 &parser->builder.note->kind)) 6538 return 0; 6539 parsed |= NDB_PARSED_KIND; 6540 } else if (start[0] == 'c') { 6541 if (jsoneq(json, tok, tok_len, "created_at")) { 6542 // created_at 6543 tok = &parser->toks[i+1]; 6544 start = json + tok->start; 6545 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 6546 return 0; 6547 // TODO: update to int64 in 2106 ... xD 6548 unsigned int bigi; 6549 if (!parse_unsigned_int(start, toksize(tok), &bigi)) 6550 return 0; 6551 parser->builder.note->created_at = bigi; 6552 parsed |= NDB_PARSED_CREATED_AT; 6553 } else if (jsoneq(json, tok, tok_len, "content")) { 6554 // content 6555 tok = &parser->toks[i+1]; 6556 union ndb_packed_str pstr; 6557 tok_len = toksize(tok); 6558 int written, pack_ids = 0; 6559 if (!ndb_builder_make_json_str(&parser->builder, 6560 json + tok->start, 6561 tok_len, &pstr, 6562 &written, pack_ids)) { 6563 ndb_debug("ndb_builder_make_json_str failed\n"); 6564 return 0; 6565 } 6566 parser->builder.note->content_length = written; 6567 parser->builder.note->content = pstr; 6568 parsed |= NDB_PARSED_CONTENT; 6569 } 6570 } else if (start[0] == 't' && jsoneq(json, tok, tok_len, "tags")) { 6571 tok = &parser->toks[i+1]; 6572 ndb_builder_process_json_tags(parser, tok); 6573 i += tok->size; 6574 parsed |= NDB_PARSED_TAGS; 6575 } 6576 } 6577 6578 //ndb_debug("parsed %d = %d, &->%d", parsed, NDB_PARSED_ALL, parsed & NDB_PARSED_ALL); 6579 if (parsed != NDB_PARSED_ALL) 6580 return 0; 6581 6582 return ndb_builder_finalize(&parser->builder, note, NULL); 6583 } 6584 6585 int ndb_filter_from_json(const char *json, int len, struct ndb_filter *filter, 6586 unsigned char *buf, int bufsize) 6587 { 6588 struct ndb_json_parser parser; 6589 int res; 6590 6591 if (filter->finalized) 6592 return 0; 6593 6594 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6595 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 6596 return res; 6597 6598 if (parser.num_tokens < 1) 6599 return 0; 6600 6601 return ndb_filter_parse_json(&parser, filter); 6602 } 6603 6604 int ndb_note_from_json(const char *json, int len, struct ndb_note **note, 6605 unsigned char *buf, int bufsize) 6606 { 6607 struct ndb_json_parser parser; 6608 int res; 6609 6610 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6611 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 6612 return res; 6613 6614 if (parser.num_tokens < 1) 6615 return 0; 6616 6617 return ndb_parse_json_note(&parser, note); 6618 } 6619 6620 void ndb_builder_set_pubkey(struct ndb_builder *builder, unsigned char *pubkey) 6621 { 6622 memcpy(builder->note->pubkey, pubkey, 32); 6623 } 6624 6625 void ndb_builder_set_id(struct ndb_builder *builder, unsigned char *id) 6626 { 6627 memcpy(builder->note->id, id, 32); 6628 } 6629 6630 void ndb_builder_set_sig(struct ndb_builder *builder, unsigned char *sig) 6631 { 6632 memcpy(builder->note->sig, sig, 64); 6633 } 6634 6635 void ndb_builder_set_kind(struct ndb_builder *builder, uint32_t kind) 6636 { 6637 builder->note->kind = kind; 6638 } 6639 6640 void ndb_builder_set_created_at(struct ndb_builder *builder, uint64_t created_at) 6641 { 6642 builder->note->created_at = created_at; 6643 } 6644 6645 int ndb_builder_new_tag(struct ndb_builder *builder) 6646 { 6647 builder->note->tags.count++; 6648 struct ndb_tag tag = {0}; 6649 builder->current_tag = (struct ndb_tag *)builder->note_cur.p; 6650 return cursor_push_tag(&builder->note_cur, &tag); 6651 } 6652 6653 void ndb_stat_counts_init(struct ndb_stat_counts *counts) 6654 { 6655 counts->count = 0; 6656 counts->key_size = 0; 6657 counts->value_size = 0; 6658 } 6659 6660 static void ndb_stat_init(struct ndb_stat *stat) 6661 { 6662 // init stats 6663 int i; 6664 6665 for (i = 0; i < NDB_CKIND_COUNT; i++) { 6666 ndb_stat_counts_init(&stat->common_kinds[i]); 6667 } 6668 6669 for (i = 0; i < NDB_DBS; i++) { 6670 ndb_stat_counts_init(&stat->dbs[i]); 6671 } 6672 6673 ndb_stat_counts_init(&stat->other_kinds); 6674 } 6675 6676 int ndb_stat(struct ndb *ndb, struct ndb_stat *stat) 6677 { 6678 int rc; 6679 MDB_cursor *cur; 6680 MDB_val k, v; 6681 MDB_dbi db; 6682 struct ndb_txn txn; 6683 struct ndb_note *note; 6684 int i; 6685 enum ndb_common_kind common_kind; 6686 6687 // initialize to 0 6688 ndb_stat_init(stat); 6689 6690 if (!ndb_begin_query(ndb, &txn)) { 6691 fprintf(stderr, "ndb_stat failed at ndb_begin_query\n"); 6692 return 0; 6693 } 6694 6695 // stat each dbi in the database 6696 for (i = 0; i < NDB_DBS; i++) 6697 { 6698 db = ndb->lmdb.dbs[i]; 6699 6700 if ((rc = mdb_cursor_open(txn.mdb_txn, db, &cur))) { 6701 fprintf(stderr, "ndb_stat: mdb_cursor_open failed, error '%s'\n", 6702 mdb_strerror(rc)); 6703 return 0; 6704 } 6705 6706 // loop over every entry and count kv sizes 6707 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6708 // we gather more detailed per-kind stats if we're in 6709 // the notes db 6710 if (i == NDB_DB_NOTE) { 6711 note = v.mv_data; 6712 common_kind = ndb_kind_to_common_kind(note->kind); 6713 6714 // uncommon kind? just count them in bulk 6715 if ((int)common_kind == -1) { 6716 stat->other_kinds.count++; 6717 stat->other_kinds.key_size += k.mv_size; 6718 stat->other_kinds.value_size += v.mv_size; 6719 } else { 6720 stat->common_kinds[common_kind].count++; 6721 stat->common_kinds[common_kind].key_size += k.mv_size; 6722 stat->common_kinds[common_kind].value_size += v.mv_size; 6723 } 6724 } 6725 6726 stat->dbs[i].count++; 6727 stat->dbs[i].key_size += k.mv_size; 6728 stat->dbs[i].value_size += v.mv_size; 6729 } 6730 6731 // close the cursor, they are per-dbi 6732 mdb_cursor_close(cur); 6733 } 6734 6735 ndb_end_query(&txn); 6736 6737 return 1; 6738 } 6739 6740 /// Push an element to the current tag 6741 /// 6742 /// Basic idea is to call ndb_builder_new_tag 6743 int ndb_builder_push_tag_str(struct ndb_builder *builder, 6744 const char *str, int len) 6745 { 6746 union ndb_packed_str pstr; 6747 int pack_ids = 1; 6748 if (!ndb_builder_make_str(builder, str, len, &pstr, pack_ids)) 6749 return 0; 6750 return ndb_builder_finalize_tag(builder, pstr); 6751 } 6752 6753 // 6754 // CONFIG 6755 // 6756 void ndb_default_config(struct ndb_config *config) 6757 { 6758 int cores = get_cpu_cores(); 6759 config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB 6760 config->ingester_threads = cores == -1 ? 4 : cores; 6761 config->flags = 0; 6762 config->ingest_filter = NULL; 6763 config->filter_context = NULL; 6764 config->sub_cb_ctx = NULL; 6765 config->sub_cb = NULL; 6766 } 6767 6768 void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *context) 6769 { 6770 config->sub_cb_ctx = context; 6771 config->sub_cb = fn; 6772 } 6773 6774 void ndb_config_set_ingest_threads(struct ndb_config *config, int threads) 6775 { 6776 config->ingester_threads = threads; 6777 } 6778 6779 void ndb_config_set_flags(struct ndb_config *config, int flags) 6780 { 6781 config->flags = flags; 6782 } 6783 6784 void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize) 6785 { 6786 config->mapsize = mapsize; 6787 } 6788 6789 void ndb_config_set_ingest_filter(struct ndb_config *config, 6790 ndb_ingest_filter_fn fn, void *filter_ctx) 6791 { 6792 config->ingest_filter = fn; 6793 config->filter_context = filter_ctx; 6794 } 6795 6796 int ndb_print_tag_index(struct ndb_txn *txn) 6797 { 6798 MDB_cursor *cur; 6799 MDB_val k, v; 6800 int i; 6801 6802 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TAGS], &cur)) 6803 return 0; 6804 6805 i = 1; 6806 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6807 printf("%d ", i); 6808 print_tag_kv(txn, &k, &v); 6809 i++; 6810 } 6811 6812 return 1; 6813 } 6814 6815 int ndb_print_kind_keys(struct ndb_txn *txn) 6816 { 6817 MDB_cursor *cur; 6818 MDB_val k, v; 6819 int i; 6820 struct ndb_u64_ts *tsid; 6821 6822 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_KIND], &cur)) 6823 return 0; 6824 6825 i = 1; 6826 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6827 tsid = k.mv_data; 6828 printf("%d note_kind %" PRIu64 " %" PRIu64 "\n", 6829 i, tsid->u64, tsid->timestamp); 6830 6831 i++; 6832 } 6833 6834 return 1; 6835 } 6836 6837 // used by ndb.c 6838 int ndb_print_search_keys(struct ndb_txn *txn) 6839 { 6840 MDB_cursor *cur; 6841 MDB_val k, v; 6842 int i; 6843 struct ndb_text_search_key search_key; 6844 6845 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TEXT], &cur)) 6846 return 0; 6847 6848 i = 1; 6849 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6850 if (!ndb_unpack_text_search_key(k.mv_data, k.mv_size, &search_key)) { 6851 fprintf(stderr, "error decoding key %d\n", i); 6852 continue; 6853 } 6854 6855 ndb_print_text_search_key(&search_key); 6856 printf("\n"); 6857 6858 i++; 6859 } 6860 6861 return 1; 6862 } 6863 6864 struct ndb_tags *ndb_note_tags(struct ndb_note *note) 6865 { 6866 return ¬e->tags; 6867 } 6868 6869 struct ndb_str ndb_note_str(struct ndb_note *note, union ndb_packed_str *pstr) 6870 { 6871 struct ndb_str str; 6872 str.flag = pstr->packed.flag; 6873 6874 if (str.flag == NDB_PACKED_STR) { 6875 str.str = pstr->packed.str; 6876 return str; 6877 } 6878 6879 str.str = ((const char *)note) + note->strings + (pstr->offset & 0xFFFFFF); 6880 return str; 6881 } 6882 6883 struct ndb_str ndb_tag_str(struct ndb_note *note, struct ndb_tag *tag, int ind) 6884 { 6885 return ndb_note_str(note, &tag->strs[ind]); 6886 } 6887 6888 int ndb_str_len(struct ndb_str *str) 6889 { 6890 if (str->flag == NDB_PACKED_ID) 6891 return 32; 6892 return strlen(str->str); 6893 } 6894 6895 struct ndb_str ndb_iter_tag_str(struct ndb_iterator *iter, int ind) 6896 { 6897 return ndb_tag_str(iter->note, iter->tag, ind); 6898 } 6899 6900 unsigned char * ndb_note_id(struct ndb_note *note) 6901 { 6902 return note->id; 6903 } 6904 6905 unsigned char * ndb_note_pubkey(struct ndb_note *note) 6906 { 6907 return note->pubkey; 6908 } 6909 6910 unsigned char * ndb_note_sig(struct ndb_note *note) 6911 { 6912 return note->sig; 6913 } 6914 6915 uint32_t ndb_note_created_at(struct ndb_note *note) 6916 { 6917 return note->created_at; 6918 } 6919 6920 uint32_t ndb_note_kind(struct ndb_note *note) 6921 { 6922 return note->kind; 6923 } 6924 6925 void _ndb_note_set_kind(struct ndb_note *note, uint32_t kind) 6926 { 6927 note->kind = kind; 6928 } 6929 6930 const char *ndb_note_content(struct ndb_note *note) 6931 { 6932 return ndb_note_str(note, ¬e->content).str; 6933 } 6934 6935 uint32_t ndb_note_content_length(struct ndb_note *note) 6936 { 6937 return note->content_length; 6938 } 6939 6940 struct ndb_note * ndb_note_from_bytes(unsigned char *bytes) 6941 { 6942 struct ndb_note *note = (struct ndb_note *)bytes; 6943 if (note->version != 1) 6944 return 0; 6945 return note; 6946 } 6947 6948 void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter) 6949 { 6950 iter->note = note; 6951 iter->tag = NULL; 6952 iter->index = -1; 6953 } 6954 6955 // Helper function to get a pointer to the nth tag 6956 static struct ndb_tag *ndb_tags_tag(struct ndb_tags *tags, size_t index) { 6957 return (struct ndb_tag *)((uint8_t *)tags + sizeof(struct ndb_tags) + index * sizeof(struct ndb_tag)); 6958 } 6959 6960 int ndb_tags_iterate_next(struct ndb_iterator *iter) 6961 { 6962 struct ndb_tags *tags; 6963 6964 if (iter->tag == NULL || iter->index == -1) { 6965 iter->tag = ndb_tags_tag(&iter->note->tags, 0); 6966 iter->index = 0; 6967 return iter->note->tags.count != 0; 6968 } 6969 6970 tags = &iter->note->tags; 6971 6972 if (++iter->index < tags->count) { 6973 uint32_t tag_data_size = iter->tag->count * sizeof(iter->tag->strs[0]); 6974 iter->tag = (struct ndb_tag *)(iter->tag->strs[0].bytes + tag_data_size); 6975 return 1; 6976 } 6977 6978 return 0; 6979 } 6980 6981 uint16_t ndb_tags_count(struct ndb_tags *tags) 6982 { 6983 return tags->count; 6984 } 6985 6986 uint16_t ndb_tag_count(struct ndb_tag *tags) 6987 { 6988 return tags->count; 6989 } 6990 6991 enum ndb_common_kind ndb_kind_to_common_kind(int kind) 6992 { 6993 switch (kind) 6994 { 6995 case 0: return NDB_CKIND_PROFILE; 6996 case 1: return NDB_CKIND_TEXT; 6997 case 3: return NDB_CKIND_CONTACTS; 6998 case 4: return NDB_CKIND_DM; 6999 case 5: return NDB_CKIND_DELETE; 7000 case 6: return NDB_CKIND_REPOST; 7001 case 7: return NDB_CKIND_REACTION; 7002 case 9735: return NDB_CKIND_ZAP; 7003 case 9734: return NDB_CKIND_ZAP_REQUEST; 7004 case 23194: return NDB_CKIND_NWC_REQUEST; 7005 case 23195: return NDB_CKIND_NWC_RESPONSE; 7006 case 27235: return NDB_CKIND_HTTP_AUTH; 7007 case 30000: return NDB_CKIND_LIST; 7008 case 30023: return NDB_CKIND_LONGFORM; 7009 case 30315: return NDB_CKIND_STATUS; 7010 } 7011 7012 return -1; 7013 } 7014 7015 const char *ndb_kind_name(enum ndb_common_kind ck) 7016 { 7017 switch (ck) { 7018 case NDB_CKIND_PROFILE: return "profile"; 7019 case NDB_CKIND_TEXT: return "text"; 7020 case NDB_CKIND_CONTACTS: return "contacts"; 7021 case NDB_CKIND_DM: return "dm"; 7022 case NDB_CKIND_DELETE: return "delete"; 7023 case NDB_CKIND_REPOST: return "repost"; 7024 case NDB_CKIND_REACTION: return "reaction"; 7025 case NDB_CKIND_ZAP: return "zap"; 7026 case NDB_CKIND_ZAP_REQUEST: return "zap_request"; 7027 case NDB_CKIND_NWC_REQUEST: return "nwc_request"; 7028 case NDB_CKIND_NWC_RESPONSE: return "nwc_response"; 7029 case NDB_CKIND_HTTP_AUTH: return "http_auth"; 7030 case NDB_CKIND_LIST: return "list"; 7031 case NDB_CKIND_LONGFORM: return "longform"; 7032 case NDB_CKIND_STATUS: return "status"; 7033 case NDB_CKIND_COUNT: return "unknown"; 7034 } 7035 7036 return "unknown"; 7037 } 7038 7039 const char *ndb_db_name(enum ndb_dbs db) 7040 { 7041 switch (db) { 7042 case NDB_DB_NOTE: 7043 return "note"; 7044 case NDB_DB_META: 7045 return "note_metadata"; 7046 case NDB_DB_PROFILE: 7047 return "profile"; 7048 case NDB_DB_NOTE_ID: 7049 return "note_index"; 7050 case NDB_DB_PROFILE_PK: 7051 return "profile_pubkey_index"; 7052 case NDB_DB_NDB_META: 7053 return "nostrdb_metadata"; 7054 case NDB_DB_PROFILE_SEARCH: 7055 return "profile_search"; 7056 case NDB_DB_PROFILE_LAST_FETCH: 7057 return "profile_last_fetch"; 7058 case NDB_DB_NOTE_KIND: 7059 return "note_kind_index"; 7060 case NDB_DB_NOTE_TEXT: 7061 return "note_fulltext"; 7062 case NDB_DB_NOTE_BLOCKS: 7063 return "note_blocks"; 7064 case NDB_DB_NOTE_TAGS: 7065 return "note_tags"; 7066 case NDB_DB_NOTE_PUBKEY: 7067 return "note_pubkey_index"; 7068 case NDB_DB_NOTE_PUBKEY_KIND: 7069 return "note_pubkey_kind_index"; 7070 case NDB_DBS: 7071 return "count"; 7072 } 7073 7074 return "unknown"; 7075 } 7076 7077 static struct ndb_blocks *ndb_note_to_blocks(struct ndb_note *note) 7078 { 7079 const char *content; 7080 size_t content_len; 7081 struct ndb_blocks *blocks; 7082 7083 content = ndb_note_content(note); 7084 content_len = ndb_note_content_length(note); 7085 7086 // something weird is going on 7087 if (content_len >= INT32_MAX) 7088 return NULL; 7089 7090 unsigned char *buffer = malloc(content_len); 7091 if (!buffer) 7092 return NULL; 7093 7094 if (!ndb_parse_content(buffer, content_len, content, content_len, &blocks)) { 7095 free(buffer); 7096 return NULL; 7097 } 7098 7099 //blocks = realloc(blocks, ndb_blocks_total_size(blocks)); 7100 //if (blocks == NULL) 7101 //return NULL; 7102 7103 blocks->flags |= NDB_BLOCK_FLAG_OWNED; 7104 7105 return blocks; 7106 } 7107 7108 struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, uint64_t note_key) 7109 { 7110 struct ndb_blocks *blocks, *blocks_to_writer; 7111 size_t blocks_size; 7112 struct ndb_note *note; 7113 size_t note_len; 7114 7115 if ((blocks = ndb_lookup_by_key(txn, note_key, NDB_DB_NOTE_BLOCKS, ¬e_len))) { 7116 return blocks; 7117 } 7118 7119 // If we don't have note blocks, let's lazily generate them. This is 7120 // migration-friendly instead of doing them all at once 7121 if (!(note = ndb_get_note_by_key(txn, note_key, ¬e_len))) { 7122 // no note found, can't return note blocks 7123 return NULL; 7124 } 7125 7126 if (!(blocks = ndb_note_to_blocks(note))) 7127 return NULL; 7128 7129 // send a copy to the writer 7130 blocks_size = ndb_blocks_total_size(blocks); 7131 blocks_to_writer = malloc(blocks_size); 7132 memcpy(blocks_to_writer, blocks, blocks_size); 7133 assert(blocks->flags & NDB_BLOCK_FLAG_OWNED); 7134 7135 // we generated new blocks, let's store them in the DB 7136 struct ndb_writer_blocks write_blocks = { 7137 .blocks = blocks_to_writer, 7138 .note_key = note_key 7139 }; 7140 7141 assert(write_blocks.blocks != blocks); 7142 7143 struct ndb_writer_msg msg = { .type = NDB_WRITER_BLOCKS }; 7144 msg.blocks = write_blocks; 7145 7146 ndb_writer_queue_msg(&ndb->writer, &msg); 7147 7148 return blocks; 7149 } 7150 7151 // please call ndb_monitor_lock before calling this 7152 static struct ndb_subscription * 7153 ndb_monitor_find_subscription(struct ndb_monitor *monitor, uint64_t subid, int *index) 7154 { 7155 struct ndb_subscription *sub, *tsub; 7156 int i; 7157 7158 for (i = 0, sub = NULL; i < monitor->num_subscriptions; i++) { 7159 tsub = &monitor->subscriptions[i]; 7160 if (tsub->subid == subid) { 7161 sub = tsub; 7162 if (index) 7163 *index = i; 7164 break; 7165 } 7166 } 7167 7168 return sub; 7169 } 7170 7171 int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, 7172 int note_id_capacity) 7173 { 7174 struct ndb_subscription *sub; 7175 int res; 7176 7177 if (subid == 0) 7178 return 0; 7179 7180 ndb_monitor_lock(&ndb->monitor); 7181 7182 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) 7183 res = 0; 7184 else 7185 res = prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity); 7186 7187 ndb_monitor_unlock(&ndb->monitor); 7188 7189 return res; 7190 } 7191 7192 int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, 7193 int note_id_capacity) 7194 { 7195 struct ndb_subscription *sub; 7196 struct prot_queue *queue_inbox; 7197 7198 // this is not a valid subscription id 7199 if (subid == 0) 7200 return 0; 7201 7202 ndb_monitor_lock(&ndb->monitor); 7203 7204 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) { 7205 ndb_monitor_unlock(&ndb->monitor); 7206 return 0; 7207 } 7208 7209 queue_inbox = &sub->inbox; 7210 7211 ndb_monitor_unlock(&ndb->monitor); 7212 7213 // there is technically a race condition if the thread yeilds at this 7214 // comment and a subscription is added/removed. A deadlock in the 7215 // writer queue would be much worse though. This function is dubious 7216 // anyways. 7217 7218 return prot_queue_pop_all(queue_inbox, note_ids, note_id_capacity); 7219 } 7220 7221 int ndb_unsubscribe(struct ndb *ndb, uint64_t subid) 7222 { 7223 struct ndb_subscription *sub; 7224 int index, res, elems_to_move; 7225 7226 ndb_monitor_lock(&ndb->monitor); 7227 7228 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, &index))) { 7229 res = 0; 7230 goto done; 7231 } 7232 7233 ndb_subscription_destroy(sub); 7234 7235 elems_to_move = (--ndb->monitor.num_subscriptions) - index; 7236 7237 memmove(&ndb->monitor.subscriptions[index], 7238 &ndb->monitor.subscriptions[index+1], 7239 elems_to_move * sizeof(*sub)); 7240 7241 res = 1; 7242 7243 done: 7244 ndb_monitor_unlock(&ndb->monitor); 7245 7246 return res; 7247 } 7248 7249 int ndb_num_subscriptions(struct ndb *ndb) 7250 { 7251 return ndb->monitor.num_subscriptions; 7252 } 7253 7254 uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filters) 7255 { 7256 static uint64_t subids = 0; 7257 struct ndb_subscription *sub; 7258 size_t buflen; 7259 uint64_t subid; 7260 char *buf; 7261 7262 ndb_monitor_lock(&ndb->monitor); 7263 7264 if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) { 7265 fprintf(stderr, "too many subscriptions\n"); 7266 subid = 0; 7267 goto done; 7268 } 7269 7270 sub = &ndb->monitor.subscriptions[ndb->monitor.num_subscriptions]; 7271 subid = ++subids; 7272 sub->subid = subid; 7273 7274 ndb_filter_group_init(&sub->group); 7275 if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) { 7276 subid = 0; 7277 goto done; 7278 } 7279 7280 // 500k ought to be enough for anyone 7281 buflen = sizeof(uint64_t) * DEFAULT_QUEUE_SIZE; 7282 buf = malloc(buflen); 7283 7284 if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) { 7285 fprintf(stderr, "failed to push prot queue\n"); 7286 subid = 0; 7287 goto done; 7288 } 7289 7290 ndb->monitor.num_subscriptions++; 7291 done: 7292 ndb_monitor_unlock(&ndb->monitor); 7293 7294 return subid; 7295 }