nostrdb.c (174139B)
1 2 #include "nostrdb.h" 3 #include "jsmn.h" 4 #include "hex.h" 5 #include "cursor.h" 6 #include "random.h" 7 #include "ccan/crypto/sha256/sha256.h" 8 #include "bolt11/bolt11.h" 9 #include "bolt11/amount.h" 10 #include "lmdb.h" 11 #include "util.h" 12 #include "cpu.h" 13 #include "block.h" 14 #include "threadpool.h" 15 #include "thread.h" 16 #include "protected_queue.h" 17 #include "memchr.h" 18 #include "print_util.h" 19 #include <stdlib.h> 20 #include <limits.h> 21 #include <assert.h> 22 23 #include "bindings/c/profile_json_parser.h" 24 #include "bindings/c/profile_builder.h" 25 #include "bindings/c/meta_builder.h" 26 #include "bindings/c/meta_reader.h" 27 #include "bindings/c/profile_verifier.h" 28 #include "secp256k1.h" 29 #include "secp256k1_ecdh.h" 30 #include "secp256k1_schnorrsig.h" 31 32 #define max(a,b) ((a) > (b) ? (a) : (b)) 33 #define min(a,b) ((a) < (b) ? (a) : (b)) 34 35 // the maximum number of things threads pop and push in bulk 36 #define THREAD_QUEUE_BATCH 4096 37 38 // maximum number of active subscriptions 39 #define MAX_SUBSCRIPTIONS 256 40 #define MAX_SCAN_CURSORS 12 41 #define MAX_FILTERS 16 42 43 // the maximum size of inbox queues 44 static const int DEFAULT_QUEUE_SIZE = 32768; 45 46 // increase if we need bigger filters 47 #define NDB_FILTER_PAGES 64 48 49 #define ndb_flag_set(flags, f) ((flags & f) == f) 50 51 #define NDB_PARSED_ID (1 << 0) 52 #define NDB_PARSED_PUBKEY (1 << 1) 53 #define NDB_PARSED_SIG (1 << 2) 54 #define NDB_PARSED_CREATED_AT (1 << 3) 55 #define NDB_PARSED_KIND (1 << 4) 56 #define NDB_PARSED_CONTENT (1 << 5) 57 #define NDB_PARSED_TAGS (1 << 6) 58 #define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS) 59 60 typedef int (*ndb_migrate_fn)(struct ndb_txn *); 61 typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len, 62 int word_index); 63 64 // these must be byte-aligned, they are directly accessing the serialized data 65 // representation 66 #pragma pack(push, 1) 67 68 union ndb_packed_str { 69 struct { 70 char str[3]; 71 // we assume little endian everywhere. sorry not sorry. 72 unsigned char flag; // NDB_PACKED_STR, etc 73 } packed; 74 75 uint32_t offset; 76 unsigned char bytes[4]; 77 }; 78 79 struct ndb_tag { 80 uint16_t count; 81 union ndb_packed_str strs[0]; 82 }; 83 84 struct ndb_tags { 85 uint16_t padding; 86 uint16_t count; 87 }; 88 89 // v1 90 struct ndb_note { 91 unsigned char version; // v=1 92 unsigned char padding[3]; // keep things aligned 93 unsigned char id[32]; 94 unsigned char pubkey[32]; 95 unsigned char sig[64]; 96 97 uint64_t created_at; 98 uint32_t kind; 99 uint32_t content_length; 100 union ndb_packed_str content; 101 uint32_t strings; 102 // nothing can come after tags since it contains variadic data 103 struct ndb_tags tags; 104 }; 105 106 #pragma pack(pop) 107 108 109 struct ndb_migration { 110 ndb_migrate_fn fn; 111 }; 112 113 struct ndb_profile_record_builder { 114 flatcc_builder_t *builder; 115 void *flatbuf; 116 }; 117 118 // controls whether to continue or stop the json parser 119 enum ndb_idres { 120 NDB_IDRES_CONT, 121 NDB_IDRES_STOP, 122 }; 123 124 // closure data for the id-detecting ingest controller 125 struct ndb_ingest_controller 126 { 127 MDB_txn *read_txn; 128 struct ndb_lmdb *lmdb; 129 }; 130 131 enum ndb_writer_msgtype { 132 NDB_WRITER_QUIT, // kill thread immediately 133 NDB_WRITER_NOTE, // write a note to the db 134 NDB_WRITER_PROFILE, // write a profile to the db 135 NDB_WRITER_DBMETA, // write ndb metadata 136 NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched 137 NDB_WRITER_BLOCKS, // write parsed note blocks 138 NDB_WRITER_MIGRATE, // migrate the database 139 }; 140 141 // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) 142 enum ndb_meta_key { 143 NDB_META_KEY_VERSION = 1 144 }; 145 146 struct ndb_json_parser { 147 const char *json; 148 int json_len; 149 struct ndb_builder builder; 150 jsmn_parser json_parser; 151 jsmntok_t *toks, *toks_end; 152 int i; 153 int num_tokens; 154 }; 155 156 // useful to pass to threads on its own 157 struct ndb_lmdb { 158 MDB_env *env; 159 MDB_dbi dbs[NDB_DBS]; 160 }; 161 162 struct ndb_writer { 163 struct ndb_lmdb *lmdb; 164 struct ndb_monitor *monitor; 165 166 uint32_t ndb_flags; 167 void *queue_buf; 168 int queue_buflen; 169 pthread_t thread_id; 170 171 struct prot_queue inbox; 172 }; 173 174 struct ndb_ingester { 175 struct ndb_lmdb *lmdb; 176 uint32_t flags; 177 struct threadpool tp; 178 struct prot_queue *writer_inbox; 179 void *filter_context; 180 ndb_ingest_filter_fn filter; 181 }; 182 183 struct ndb_filter_group { 184 struct ndb_filter filters[MAX_FILTERS]; 185 int num_filters; 186 }; 187 188 struct ndb_subscription { 189 uint64_t subid; 190 struct ndb_filter_group group; 191 struct prot_queue inbox; 192 }; 193 194 struct ndb_monitor { 195 struct ndb_subscription subscriptions[MAX_SUBSCRIPTIONS]; 196 ndb_sub_fn sub_cb; 197 void *sub_cb_ctx; 198 int num_subscriptions; 199 200 // monitor isn't a full inbox. We want pollers to be able to poll 201 // subscriptions efficiently without going through a message queue, so 202 // we use a simple mutex here. 203 pthread_mutex_t mutex; 204 }; 205 206 struct ndb { 207 struct ndb_lmdb lmdb; 208 struct ndb_ingester ingester; 209 struct ndb_monitor monitor; 210 struct ndb_writer writer; 211 int version; 212 uint32_t flags; // setting flags 213 // lmdb environ handles, etc 214 }; 215 216 /// 217 /// Query Plans 218 /// 219 /// There are general strategies for performing certain types of query 220 /// depending on the filter. For example, for large contact list queries 221 /// with many authors, we simply do a descending scan on created_at 222 /// instead of doing 1000s of pubkey scans. 223 /// 224 /// Query plans are calculated from filters via `ndb_filter_plan` 225 /// 226 enum ndb_query_plan { 227 NDB_PLAN_KINDS, 228 NDB_PLAN_IDS, 229 NDB_PLAN_AUTHORS, 230 NDB_PLAN_AUTHOR_KINDS, 231 NDB_PLAN_CREATED, 232 NDB_PLAN_TAGS, 233 }; 234 235 // A id + u64 + timestamp 236 struct ndb_id_u64_ts { 237 unsigned char id[32]; // pubkey, id, etc 238 uint64_t u64; // kind, etc 239 uint64_t timestamp; 240 }; 241 242 // A clustered key with an id and a timestamp 243 struct ndb_tsid { 244 unsigned char id[32]; 245 uint64_t timestamp; 246 }; 247 248 // A u64 + timestamp id. Just using this for kinds at the moment. 249 struct ndb_u64_ts { 250 uint64_t u64; // kind, etc 251 uint64_t timestamp; 252 }; 253 254 struct ndb_word 255 { 256 const char *word; 257 int word_len; 258 }; 259 260 struct ndb_search_words 261 { 262 struct ndb_word words[MAX_TEXT_SEARCH_WORDS]; 263 int num_words; 264 }; 265 266 // ndb_text_search_key 267 // 268 // This is compressed when in lmdb: 269 // 270 // note_id: varint 271 // strlen: varint 272 // str: cstr 273 // timestamp: varint 274 // word_index: varint 275 // 276 static int ndb_make_text_search_key(unsigned char *buf, int bufsize, 277 int word_index, int word_len, const char *str, 278 uint64_t timestamp, uint64_t note_id, 279 int *keysize) 280 { 281 struct cursor cur; 282 make_cursor(buf, buf + bufsize, &cur); 283 284 // TODO: need update this to uint64_t 285 // we push this first because our query function can pull this off 286 // quickly to check matches 287 if (!cursor_push_varint(&cur, (int32_t)note_id)) 288 return 0; 289 290 // string length 291 if (!cursor_push_varint(&cur, word_len)) 292 return 0; 293 294 // non-null terminated, lowercase string 295 if (!cursor_push_lowercase(&cur, str, word_len)) 296 return 0; 297 298 // TODO: need update this to uint64_t 299 if (!cursor_push_varint(&cur, (int)timestamp)) 300 return 0; 301 302 // the index of the word in the content so that we can do more accurate 303 // phrase searches 304 if (!cursor_push_varint(&cur, word_index)) 305 return 0; 306 307 // pad to 8-byte alignment 308 if (!cursor_align(&cur, 8)) 309 return 0; 310 311 *keysize = cur.p - cur.start; 312 assert((*keysize % 8) == 0); 313 314 return 1; 315 } 316 317 static int ndb_make_noted_text_search_key(unsigned char *buf, int bufsize, 318 int wordlen, const char *word, 319 uint64_t timestamp, uint64_t note_id, 320 int *keysize) 321 { 322 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 323 timestamp, note_id, keysize); 324 } 325 326 static int ndb_make_text_search_key_low(unsigned char *buf, int bufsize, 327 int wordlen, const char *word, 328 int *keysize) 329 { 330 uint64_t timestamp, note_id; 331 timestamp = 0; 332 note_id = 0; 333 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 334 timestamp, note_id, keysize); 335 } 336 337 static int ndb_make_text_search_key_high(unsigned char *buf, int bufsize, 338 int wordlen, const char *word, 339 int *keysize) 340 { 341 uint64_t timestamp, note_id; 342 timestamp = INT32_MAX; 343 note_id = INT32_MAX; 344 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 345 timestamp, note_id, keysize); 346 } 347 348 typedef int (*ndb_text_search_key_order_fn)(unsigned char *buf, int bufsize, int wordlen, const char *word, int *keysize); 349 350 /** From LMDB: Compare two items lexically */ 351 static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) { 352 int diff; 353 ssize_t len_diff; 354 unsigned int len; 355 356 len = a->mv_size; 357 len_diff = (ssize_t) a->mv_size - (ssize_t) b->mv_size; 358 if (len_diff > 0) { 359 len = b->mv_size; 360 len_diff = 1; 361 } 362 363 diff = memcmp(a->mv_data, b->mv_data, len); 364 return diff ? diff : len_diff<0 ? -1 : len_diff; 365 } 366 367 static int ndb_tag_key_compare(const MDB_val *a, const MDB_val *b) 368 { 369 MDB_val va, vb; 370 uint64_t ts_a, ts_b; 371 int cmp; 372 373 va.mv_data = a->mv_data; 374 va.mv_size = a->mv_size - 8; 375 376 vb.mv_data = b->mv_data; 377 vb.mv_size = b->mv_size - 8; 378 379 if ((cmp = mdb_cmp_memn(&va, &vb))) 380 return cmp; 381 382 ts_a = *(uint64_t*)((unsigned char *)va.mv_data + va.mv_size); 383 ts_b = *(uint64_t*)((unsigned char *)vb.mv_data + vb.mv_size); 384 385 if (ts_a < ts_b) 386 return -1; 387 else if (ts_a > ts_b) 388 return 1; 389 390 return 0; 391 } 392 393 static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b) 394 { 395 struct cursor ca, cb; 396 uint64_t sa, sb, nid_a, nid_b; 397 MDB_val a2, b2; 398 399 make_cursor(a->mv_data, (unsigned char *)a->mv_data + a->mv_size, &ca); 400 make_cursor(b->mv_data, (unsigned char *)b->mv_data + b->mv_size, &cb); 401 402 // note_id 403 if (unlikely(!cursor_pull_varint(&ca, &nid_a) || !cursor_pull_varint(&cb, &nid_b))) 404 return 0; 405 406 // string size 407 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 408 return 0; 409 410 a2.mv_data = ca.p; 411 a2.mv_size = sa; 412 413 b2.mv_data = cb.p; 414 b2.mv_size = sb; 415 416 int cmp = mdb_cmp_memn(&a2, &b2); 417 if (cmp) return cmp; 418 419 // skip over string 420 ca.p += sa; 421 cb.p += sb; 422 423 // timestamp 424 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 425 return 0; 426 427 if (sa < sb) return -1; 428 else if (sa > sb) return 1; 429 430 // note_id 431 if (nid_a < nid_b) return -1; 432 else if (nid_a > nid_b) return 1; 433 434 // word index 435 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 436 return 0; 437 438 if (sa < sb) return -1; 439 else if (sa > sb) return 1; 440 441 return 0; 442 } 443 444 static inline int ndb_unpack_text_search_key_noteid( 445 struct cursor *cur, uint64_t *note_id) 446 { 447 if (!cursor_pull_varint(cur, note_id)) 448 return 0; 449 450 return 1; 451 } 452 453 // faster peek of just the string instead of unpacking everything 454 // this is used to quickly discard range query matches if there is no 455 // common prefix 456 static inline int ndb_unpack_text_search_key_string(struct cursor *cur, 457 const char **str, 458 int *str_len) 459 { 460 uint64_t len; 461 462 if (!cursor_pull_varint(cur, &len)) 463 return 0; 464 465 *str_len = len; 466 467 *str = (const char *)cur->p; 468 469 if (!cursor_skip(cur, *str_len)) 470 return 0; 471 472 return 1; 473 } 474 475 // should be called after ndb_unpack_text_search_key_string. It continues 476 // the unpacking of a text search key if we've already started it. 477 static inline int 478 ndb_unpack_remaining_text_search_key(struct cursor *cur, 479 struct ndb_text_search_key *key) 480 { 481 if (!cursor_pull_varint(cur, &key->timestamp)) 482 return 0; 483 484 if (!cursor_pull_varint(cur, &key->word_index)) 485 return 0; 486 487 return 1; 488 } 489 490 // unpack a fulltext search key 491 // 492 // full version of string + unpack remaining. This is split up because text 493 // searching only requires to pull the string for prefix searching, and the 494 // remaining is optional 495 static inline int ndb_unpack_text_search_key(unsigned char *p, int len, 496 struct ndb_text_search_key *key) 497 { 498 struct cursor c; 499 make_cursor(p, p + len, &c); 500 501 if (!ndb_unpack_text_search_key_noteid(&c, &key->note_id)) 502 return 0; 503 504 if (!ndb_unpack_text_search_key_string(&c, &key->str, &key->str_len)) 505 return 0; 506 507 return ndb_unpack_remaining_text_search_key(&c, key); 508 } 509 510 // Copies only lowercase characters to the destination string and fills the rest with null bytes. 511 // `dst` and `src` are pointers to the destination and source strings, respectively. 512 // `n` is the maximum number of characters to copy. 513 static void lowercase_strncpy(char *dst, const char *src, int n) { 514 int j = 0, i = 0; 515 516 if (!dst || !src || n == 0) { 517 return; 518 } 519 520 while (src[i] != '\0' && j < n) { 521 dst[j++] = tolower(src[i++]); 522 } 523 524 // Null-terminate and fill the destination string 525 while (j < n) { 526 dst[j++] = '\0'; 527 } 528 } 529 530 static inline int ndb_filter_elem_is_ptr(struct ndb_filter_field *field) { 531 return field->elem_type == NDB_ELEMENT_STRING || field->elem_type == NDB_ELEMENT_ID; 532 } 533 534 // Copy the filter 535 int ndb_filter_clone(struct ndb_filter *dst, struct ndb_filter *src) 536 { 537 size_t src_size, elem_size, data_size; 538 539 memcpy(dst, src, sizeof(*src)); 540 541 elem_size = src->elem_buf.end - src->elem_buf.start; 542 data_size = src->data_buf.end - src->data_buf.start; 543 src_size = data_size + elem_size; 544 545 // let's only allow finalized filters to be cloned 546 if (!src || !src->finalized) 547 return 0; 548 549 dst->elem_buf.start = malloc(src_size); 550 dst->elem_buf.end = dst->elem_buf.start + elem_size; 551 dst->elem_buf.p = dst->elem_buf.end; 552 553 dst->data_buf.start = dst->elem_buf.start + elem_size; 554 dst->data_buf.end = dst->data_buf.start + data_size; 555 dst->data_buf.p = dst->data_buf.end; 556 557 if (dst->elem_buf.start == NULL) 558 return 0; 559 560 memcpy(dst->elem_buf.start, src->elem_buf.start, src_size); 561 562 return 1; 563 } 564 565 // "Finalize" the filter. This resizes the allocated heap buffers so that they 566 // are as small as possible. This also prevents new fields from being added 567 int ndb_filter_end(struct ndb_filter *filter) 568 { 569 #ifdef DEBUG 570 size_t orig_size; 571 #endif 572 size_t data_len, elem_len; 573 if (filter->finalized == 1) 574 return 0; 575 576 // move the data buffer to the end of the element buffer and update 577 // all of the element pointers accordingly 578 data_len = filter->data_buf.p - filter->data_buf.start; 579 elem_len = filter->elem_buf.p - filter->elem_buf.start; 580 #ifdef DEBUG 581 orig_size = filter->data_buf.end - filter->elem_buf.start; 582 #endif 583 584 // cap the elem buff 585 filter->elem_buf.end = filter->elem_buf.p; 586 587 // move the data buffer to the end of the element buffer 588 memmove(filter->elem_buf.p, filter->data_buf.start, data_len); 589 590 // realloc the whole thing 591 filter->elem_buf.start = realloc(filter->elem_buf.start, elem_len + data_len); 592 filter->elem_buf.end = filter->elem_buf.start + elem_len; 593 filter->elem_buf.p = filter->elem_buf.end; 594 595 filter->data_buf.start = filter->elem_buf.end; 596 filter->data_buf.end = filter->data_buf.start + data_len; 597 filter->data_buf.p = filter->data_buf.end; 598 599 filter->finalized = 1; 600 601 ndb_debug("ndb_filter_end: %ld -> %ld\n", orig_size, elem_len + data_len); 602 603 return 1; 604 } 605 606 static inline struct ndb_filter_elements * 607 ndb_filter_get_elements_by_offset(const struct ndb_filter *filter, int offset) 608 { 609 struct ndb_filter_elements *els; 610 611 if (offset < 0) 612 return NULL; 613 614 els = (struct ndb_filter_elements *)(filter->elem_buf.start + offset); 615 616 if ((unsigned char *)els > filter->elem_buf.p) 617 return NULL; 618 619 return els; 620 } 621 622 struct ndb_filter_elements * 623 ndb_filter_current_element(const struct ndb_filter *filter) 624 { 625 return ndb_filter_get_elements_by_offset(filter, filter->current); 626 } 627 628 struct ndb_filter_elements * 629 ndb_filter_get_elements(const struct ndb_filter *filter, int index) 630 { 631 if (filter->num_elements <= 0) 632 return NULL; 633 634 if (index > filter->num_elements-1) 635 return NULL; 636 637 return ndb_filter_get_elements_by_offset(filter, filter->elements[index]); 638 } 639 640 static inline unsigned char * 641 ndb_filter_elements_data(const struct ndb_filter *filter, int offset) 642 { 643 unsigned char *data; 644 645 if (offset < 0) 646 return NULL; 647 648 data = filter->data_buf.start + offset; 649 if (data > filter->data_buf.p) 650 return NULL; 651 652 return data; 653 } 654 655 unsigned char * 656 ndb_filter_get_id_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index) 657 { 658 return ndb_filter_elements_data(filter, els->elements[index]); 659 } 660 661 const char * 662 ndb_filter_get_string_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index) 663 { 664 return (const char *)ndb_filter_elements_data(filter, els->elements[index]); 665 } 666 667 uint64_t * 668 ndb_filter_get_int_element_ptr(struct ndb_filter_elements *els, int index) 669 { 670 return &els->elements[index]; 671 } 672 673 uint64_t 674 ndb_filter_get_int_element(const struct ndb_filter_elements *els, int index) 675 { 676 return els->elements[index]; 677 } 678 679 int ndb_filter_init(struct ndb_filter *filter) 680 { 681 struct cursor cur; 682 int page_size, elem_pages, data_pages, buf_size; 683 684 page_size = 4096; // assuming this, not a big deal if we're wrong 685 elem_pages = NDB_FILTER_PAGES / 4; 686 data_pages = NDB_FILTER_PAGES - elem_pages; 687 buf_size = page_size * NDB_FILTER_PAGES; 688 689 unsigned char *buf = malloc(buf_size); 690 if (!buf) 691 return 0; 692 693 // init memory arena for the cursor 694 make_cursor(buf, buf + buf_size, &cur); 695 696 cursor_slice(&cur, &filter->elem_buf, page_size * elem_pages); 697 cursor_slice(&cur, &filter->data_buf, page_size * data_pages); 698 699 // make sure we are fully allocated 700 assert(cur.p == cur.end); 701 702 // make sure elem_buf is the start of the buffer 703 assert(filter->elem_buf.start == cur.start); 704 705 filter->num_elements = 0; 706 filter->elements[0] = 0; 707 filter->current = -1; 708 filter->finalized = 0; 709 710 return 1; 711 } 712 713 void ndb_filter_destroy(struct ndb_filter *filter) 714 { 715 if (filter->elem_buf.start) 716 free(filter->elem_buf.start); 717 718 memset(filter, 0, sizeof(*filter)); 719 } 720 721 static const char *ndb_filter_field_name(enum ndb_filter_fieldtype field) 722 { 723 switch (field) { 724 case NDB_FILTER_IDS: return "ids"; 725 case NDB_FILTER_AUTHORS: return "authors"; 726 case NDB_FILTER_KINDS: return "kinds"; 727 case NDB_FILTER_TAGS: return "tags"; 728 case NDB_FILTER_SINCE: return "since"; 729 case NDB_FILTER_UNTIL: return "until"; 730 case NDB_FILTER_LIMIT: return "limit"; 731 } 732 733 return "unknown"; 734 } 735 736 static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filter_fieldtype field, char tag) 737 { 738 int i; 739 struct ndb_filter_elements *els, *el; 740 741 if (ndb_filter_current_element(filter)) { 742 fprintf(stderr, "ndb_filter_start_field: filter field already in progress, did you forget to call ndb_filter_end_field?\n"); 743 return 0; 744 } 745 746 // you can only start and end fields once 747 for (i = 0; i < filter->num_elements; i++) { 748 el = ndb_filter_get_elements(filter, i); 749 assert(el); 750 // TODO: fix this tags check to try to find matching tags 751 if (el->field.type == field && field != NDB_FILTER_TAGS) { 752 fprintf(stderr, "ndb_filter_start_field: field '%s' already exists\n", 753 ndb_filter_field_name(field)); 754 return 0; 755 } 756 } 757 758 filter->current = filter->elem_buf.p - filter->elem_buf.start; 759 els = ndb_filter_current_element(filter); 760 assert(els); 761 762 // advance elem buffer to the variable data section 763 if (!cursor_skip(&filter->elem_buf, sizeof(struct ndb_filter_elements))) { 764 fprintf(stderr, "ndb_filter_start_field: '%s' oom (todo: realloc?)\n", 765 ndb_filter_field_name(field)); 766 return 0; 767 } 768 769 els->field.type = field; 770 els->field.tag = tag; 771 els->field.elem_type = 0; 772 els->count = 0; 773 774 return 1; 775 } 776 777 int ndb_filter_start_field(struct ndb_filter *filter, enum ndb_filter_fieldtype field) 778 { 779 return ndb_filter_start_field_impl(filter, field, 0); 780 } 781 782 int ndb_filter_start_tag_field(struct ndb_filter *filter, char tag) 783 { 784 return ndb_filter_start_field_impl(filter, NDB_FILTER_TAGS, tag); 785 } 786 787 static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_element el) 788 { 789 struct ndb_filter_elements *current; 790 uint64_t offset; 791 792 if (!(current = ndb_filter_current_element(filter))) 793 return 0; 794 795 offset = filter->data_buf.p - filter->data_buf.start; 796 797 switch (current->field.type) { 798 case NDB_FILTER_IDS: 799 case NDB_FILTER_AUTHORS: 800 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 801 return 0; 802 break; 803 case NDB_FILTER_KINDS: 804 offset = el.integer; 805 break; 806 case NDB_FILTER_SINCE: 807 case NDB_FILTER_UNTIL: 808 case NDB_FILTER_LIMIT: 809 // only one allowed for since/until 810 if (current->count != 0) 811 return 0; 812 offset = el.integer; 813 break; 814 case NDB_FILTER_TAGS: 815 switch (current->field.elem_type) { 816 case NDB_ELEMENT_ID: 817 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 818 return 0; 819 break; 820 case NDB_ELEMENT_STRING: 821 if (!cursor_push(&filter->data_buf, (unsigned char *)el.string.string, el.string.len)) 822 return 0; 823 if (!cursor_push_byte(&filter->data_buf, 0)) 824 return 0; 825 break; 826 case NDB_ELEMENT_INT: 827 // ints are not allowed in tag filters 828 case NDB_ELEMENT_UNKNOWN: 829 return 0; 830 } 831 // push a pointer of the string in the databuf as an element 832 break; 833 } 834 835 if (!cursor_push(&filter->elem_buf, (unsigned char *)&offset, 836 sizeof(offset))) { 837 return 0; 838 } 839 840 current->count++; 841 842 return 1; 843 } 844 845 static int ndb_filter_set_elem_type(struct ndb_filter *filter, 846 enum ndb_generic_element_type elem_type) 847 { 848 enum ndb_generic_element_type current_elem_type; 849 struct ndb_filter_elements *current; 850 851 if (!(current = ndb_filter_current_element(filter))) 852 return 0; 853 854 current_elem_type = current->field.elem_type; 855 856 // element types must be uniform 857 if (current_elem_type != elem_type && current_elem_type != NDB_ELEMENT_UNKNOWN) { 858 fprintf(stderr, "ndb_filter_set_elem_type: element types must be uniform\n"); 859 return 0; 860 } 861 862 current->field.elem_type = elem_type; 863 864 return 1; 865 } 866 867 868 int ndb_filter_add_str_element_len(struct ndb_filter *filter, const char *str, int len) 869 { 870 union ndb_filter_element el; 871 struct ndb_filter_elements *current; 872 873 if (!(current = ndb_filter_current_element(filter))) 874 return 0; 875 876 // only generic queries are allowed to have strings 877 switch (current->field.type) { 878 case NDB_FILTER_SINCE: 879 case NDB_FILTER_UNTIL: 880 case NDB_FILTER_LIMIT: 881 case NDB_FILTER_IDS: 882 case NDB_FILTER_AUTHORS: 883 case NDB_FILTER_KINDS: 884 return 0; 885 case NDB_FILTER_TAGS: 886 break; 887 } 888 889 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_STRING)) 890 return 0; 891 892 el.string.string = str; 893 el.string.len = len; 894 895 return ndb_filter_add_element(filter, el); 896 } 897 898 int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str) 899 { 900 return ndb_filter_add_str_element_len(filter, str, strlen(str)); 901 } 902 903 int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer) 904 { 905 union ndb_filter_element el; 906 struct ndb_filter_elements *current; 907 if (!(current = ndb_filter_current_element(filter))) 908 return 0; 909 910 switch (current->field.type) { 911 case NDB_FILTER_IDS: 912 case NDB_FILTER_AUTHORS: 913 case NDB_FILTER_TAGS: 914 return 0; 915 case NDB_FILTER_KINDS: 916 case NDB_FILTER_SINCE: 917 case NDB_FILTER_UNTIL: 918 case NDB_FILTER_LIMIT: 919 break; 920 } 921 922 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_INT)) 923 return 0; 924 925 el.integer = integer; 926 927 return ndb_filter_add_element(filter, el); 928 } 929 930 int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id) 931 { 932 union ndb_filter_element el; 933 struct ndb_filter_elements *current; 934 935 if (!(current = ndb_filter_current_element(filter))) 936 return 0; 937 938 // only certain filter types allow pushing id elements 939 switch (current->field.type) { 940 case NDB_FILTER_SINCE: 941 case NDB_FILTER_UNTIL: 942 case NDB_FILTER_LIMIT: 943 case NDB_FILTER_KINDS: 944 return 0; 945 case NDB_FILTER_IDS: 946 case NDB_FILTER_AUTHORS: 947 case NDB_FILTER_TAGS: 948 break; 949 } 950 951 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_ID)) 952 return 0; 953 954 // this is needed so that generic filters know its an id 955 el.id = id; 956 957 return ndb_filter_add_element(filter, el); 958 } 959 960 static int ndb_tag_filter_matches(struct ndb_filter *filter, 961 struct ndb_filter_elements *els, 962 struct ndb_note *note) 963 { 964 int i; 965 const unsigned char *id; 966 const char *el_str; 967 struct ndb_iterator iter, *it = &iter; 968 struct ndb_str str; 969 970 ndb_tags_iterate_start(note, it); 971 972 while (ndb_tags_iterate_next(it)) { 973 // we're looking for tags with 2 or more entries: ["p", id], etc 974 if (it->tag->count < 2) 975 continue; 976 977 str = ndb_tag_str(note, it->tag, 0); 978 979 // we only care about packed strings (single char, etc) 980 if (str.flag != NDB_PACKED_STR) 981 continue; 982 983 // do we have #e matching e (or p, etc) 984 if (str.str[0] != els->field.tag || str.str[1] != 0) 985 continue; 986 987 str = ndb_tag_str(note, it->tag, 1); 988 989 switch (els->field.elem_type) { 990 case NDB_ELEMENT_ID: 991 // if our filter element type is an id, then we 992 // expect a packed id in the tag, otherwise skip 993 if (str.flag != NDB_PACKED_ID) 994 continue; 995 break; 996 case NDB_ELEMENT_STRING: 997 // if our filter element type is a string, then 998 // we should not expect an id 999 if (str.flag == NDB_PACKED_ID) 1000 continue; 1001 1002 break; 1003 case NDB_ELEMENT_UNKNOWN: 1004 default: 1005 // For some reason the element type is not set. It's 1006 // possible nothing was added to the generic filter? 1007 // Let's just fail here and log a note for debugging 1008 fprintf(stderr, "UNUSUAL ndb_tag_filter_matches: have unknown element type %d\n", els->field.elem_type); 1009 return 0; 1010 } 1011 1012 for (i = 0; i < els->count; i++) { 1013 switch (els->field.elem_type) { 1014 case NDB_ELEMENT_ID: 1015 id = ndb_filter_get_id_element(filter, els, i); 1016 if (!memcmp(id, str.id, 32)) 1017 return 1; 1018 break; 1019 case NDB_ELEMENT_STRING: 1020 el_str = ndb_filter_get_string_element(filter, els, i); 1021 if (!strcmp(el_str, str.str)) 1022 return 1; 1023 break; 1024 case NDB_ELEMENT_INT: 1025 // int elements int tag queries are not supported 1026 case NDB_ELEMENT_UNKNOWN: 1027 return 0; 1028 } 1029 } 1030 } 1031 1032 return 0; 1033 } 1034 1035 struct search_id_state { 1036 struct ndb_filter *filter; 1037 struct ndb_filter_elements *els; 1038 unsigned char *key; 1039 }; 1040 1041 static int compare_ids(const void *pa, const void *pb) 1042 { 1043 unsigned char *a = *(unsigned char **)pa; 1044 unsigned char *b = *(unsigned char **)pb; 1045 1046 return memcmp(a, b, 32); 1047 } 1048 1049 static int search_ids(const void *ctx, const void *mid_ptr) 1050 { 1051 struct search_id_state *state; 1052 unsigned char *mid_id; 1053 uint32_t mid; 1054 1055 state = (struct search_id_state *)ctx; 1056 mid = *(uint32_t *)mid_ptr; 1057 1058 mid_id = ndb_filter_elements_data(state->filter, mid); 1059 assert(mid_id); 1060 1061 return memcmp(state->key, mid_id, 32); 1062 } 1063 1064 static int compare_kinds(const void *pa, const void *pb) 1065 { 1066 1067 // NOTE: this should match type in `union ndb_filter_element` 1068 uint64_t a = *(uint64_t *)pa; 1069 uint64_t b = *(uint64_t *)pb; 1070 1071 if (a < b) { 1072 return -1; 1073 } else if (a > b) { 1074 return 1; 1075 } else { 1076 return 0; 1077 } 1078 } 1079 1080 // 1081 // returns 1 if a filter matches a note 1082 static int ndb_filter_matches_with(struct ndb_filter *filter, 1083 struct ndb_note *note, int already_matched) 1084 { 1085 int i, j; 1086 struct ndb_filter_elements *els; 1087 struct search_id_state state; 1088 1089 state.filter = filter; 1090 1091 for (i = 0; i < filter->num_elements; i++) { 1092 els = ndb_filter_get_elements(filter, i); 1093 state.els = els; 1094 assert(els); 1095 1096 // if we know we already match from a query scan result, 1097 // we can skip this check 1098 if ((1 << els->field.type) & already_matched) 1099 continue; 1100 1101 switch (els->field.type) { 1102 case NDB_FILTER_KINDS: 1103 for (j = 0; j < els->count; j++) { 1104 if ((unsigned int)els->elements[j] == note->kind) 1105 goto cont; 1106 } 1107 break; 1108 case NDB_FILTER_IDS: 1109 state.key = ndb_note_id(note); 1110 if (bsearch(&state, &els->elements[0], els->count, 1111 sizeof(els->elements[0]), search_ids)) { 1112 continue; 1113 } 1114 break; 1115 case NDB_FILTER_AUTHORS: 1116 state.key = ndb_note_pubkey(note); 1117 if (bsearch(&state, &els->elements[0], els->count, 1118 sizeof(els->elements[0]), search_ids)) { 1119 continue; 1120 } 1121 break; 1122 case NDB_FILTER_TAGS: 1123 if (ndb_tag_filter_matches(filter, els, note)) 1124 continue; 1125 break; 1126 case NDB_FILTER_SINCE: 1127 assert(els->count == 1); 1128 if (note->created_at >= els->elements[0]) 1129 continue; 1130 break; 1131 case NDB_FILTER_UNTIL: 1132 assert(els->count == 1); 1133 if (note->created_at < els->elements[0]) 1134 continue; 1135 case NDB_FILTER_LIMIT: 1136 cont: 1137 continue; 1138 } 1139 1140 // all need to match 1141 return 0; 1142 } 1143 1144 return 1; 1145 } 1146 1147 int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note) 1148 { 1149 return ndb_filter_matches_with(filter, note, 0); 1150 } 1151 1152 // because elements are stored as offsets and qsort doesn't support context, 1153 // we do a dumb thing where we convert elements to pointers and back if we 1154 // are doing an id or string sort 1155 static void sort_filter_elements(struct ndb_filter *filter, 1156 struct ndb_filter_elements *els, 1157 int (*cmp)(const void *, const void *)) 1158 { 1159 int i; 1160 1161 assert(ndb_filter_elem_is_ptr(&els->field)); 1162 1163 for (i = 0; i < els->count; i++) 1164 els->elements[i] += (uint64_t)filter->data_buf.start; 1165 1166 qsort(&els->elements[0], els->count, sizeof(els->elements[0]), cmp); 1167 1168 for (i = 0; i < els->count; i++) 1169 els->elements[i] -= (uint64_t)filter->data_buf.start; 1170 } 1171 1172 static int ndb_filter_field_eq(struct ndb_filter *a_filt, 1173 struct ndb_filter_elements *a_field, 1174 struct ndb_filter *b_filt, 1175 struct ndb_filter_elements *b_field) 1176 { 1177 int i; 1178 const char *a_str, *b_str; 1179 unsigned char *a_id, *b_id; 1180 uint64_t a_int, b_int; 1181 1182 if (a_field->count != b_field->count) 1183 return 0; 1184 1185 if (a_field->field.type != b_field->field.type) { 1186 ndb_debug("UNUSUAL: field types do not match in ndb_filter_field_eq\n"); 1187 return 0; 1188 } 1189 1190 if (a_field->field.elem_type != b_field->field.elem_type) { 1191 ndb_debug("UNUSUAL: field element types do not match in ndb_filter_field_eq\n"); 1192 return 0; 1193 } 1194 1195 if (a_field->field.elem_type == NDB_ELEMENT_UNKNOWN) { 1196 ndb_debug("UNUSUAL: field element types are unknown\n"); 1197 return 0; 1198 } 1199 1200 for (i = 0; i < a_field->count; i++) { 1201 switch (a_field->field.elem_type) { 1202 case NDB_ELEMENT_UNKNOWN: 1203 return 0; 1204 case NDB_ELEMENT_STRING: 1205 a_str = ndb_filter_get_string_element(a_filt, a_field, i); 1206 b_str = ndb_filter_get_string_element(b_filt, b_field, i); 1207 if (strcmp(a_str, b_str)) 1208 return 0; 1209 break; 1210 case NDB_ELEMENT_ID: 1211 a_id = ndb_filter_get_id_element(a_filt, a_field, i); 1212 b_id = ndb_filter_get_id_element(b_filt, b_field, i); 1213 if (memcmp(a_id, b_id, 32)) 1214 return 0; 1215 break; 1216 case NDB_ELEMENT_INT: 1217 a_int = ndb_filter_get_int_element(a_field, i); 1218 b_int = ndb_filter_get_int_element(b_field, i); 1219 if (a_int != b_int) 1220 return 0; 1221 break; 1222 } 1223 } 1224 1225 return 1; 1226 } 1227 1228 void ndb_filter_end_field(struct ndb_filter *filter) 1229 { 1230 int cur_offset; 1231 struct ndb_filter_elements *cur; 1232 1233 cur_offset = filter->current; 1234 1235 if (!(cur = ndb_filter_current_element(filter))) 1236 return; 1237 1238 filter->elements[filter->num_elements++] = cur_offset; 1239 1240 // sort elements for binary search 1241 switch (cur->field.type) { 1242 case NDB_FILTER_IDS: 1243 case NDB_FILTER_AUTHORS: 1244 sort_filter_elements(filter, cur, compare_ids); 1245 break; 1246 case NDB_FILTER_KINDS: 1247 qsort(&cur->elements[0], cur->count, 1248 sizeof(cur->elements[0]), compare_kinds); 1249 break; 1250 case NDB_FILTER_TAGS: 1251 // TODO: generic tag search sorting 1252 break; 1253 case NDB_FILTER_SINCE: 1254 case NDB_FILTER_UNTIL: 1255 case NDB_FILTER_LIMIT: 1256 // don't need to sort these 1257 break; 1258 } 1259 1260 filter->current = -1; 1261 1262 } 1263 1264 static void ndb_filter_group_init(struct ndb_filter_group *group) 1265 { 1266 group->num_filters = 0; 1267 } 1268 1269 static int ndb_filter_group_add(struct ndb_filter_group *group, 1270 struct ndb_filter *filter) 1271 { 1272 if (group->num_filters + 1 > MAX_FILTERS) 1273 return 0; 1274 1275 return ndb_filter_clone(&group->filters[group->num_filters++], filter); 1276 } 1277 1278 static int ndb_filter_group_matches(struct ndb_filter_group *group, 1279 struct ndb_note *note) 1280 { 1281 int i; 1282 struct ndb_filter *filter; 1283 1284 if (group->num_filters == 0) 1285 return 1; 1286 1287 for (i = 0; i < group->num_filters; i++) { 1288 filter = &group->filters[i]; 1289 1290 if (ndb_filter_matches(filter, note)) 1291 return 1; 1292 } 1293 1294 return 0; 1295 } 1296 1297 static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id, 1298 uint64_t timestamp, const char *search) 1299 { 1300 memcpy(key->id, id, 32); 1301 key->timestamp = timestamp; 1302 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 1303 key->search[sizeof(key->search) - 1] = '\0'; 1304 } 1305 1306 static int ndb_write_profile_search_index(struct ndb_txn *txn, 1307 struct ndb_search_key *index_key, 1308 uint64_t profile_key) 1309 { 1310 int rc; 1311 MDB_val key, val; 1312 1313 key.mv_data = index_key; 1314 key.mv_size = sizeof(*index_key); 1315 val.mv_data = &profile_key; 1316 val.mv_size = sizeof(profile_key); 1317 1318 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 1319 &key, &val, 0))) 1320 { 1321 ndb_debug("ndb_write_profile_search_index failed: %s\n", 1322 mdb_strerror(rc)); 1323 return 0; 1324 } 1325 1326 return 1; 1327 } 1328 1329 1330 // map usernames and display names to profile keys for user searching 1331 static int ndb_write_profile_search_indices(struct ndb_txn *txn, 1332 struct ndb_note *note, 1333 uint64_t profile_key, 1334 void *profile_root) 1335 { 1336 struct ndb_search_key index; 1337 NdbProfileRecord_table_t profile_record; 1338 NdbProfile_table_t profile; 1339 1340 profile_record = NdbProfileRecord_as_root(profile_root); 1341 profile = NdbProfileRecord_profile_get(profile_record); 1342 1343 const char *name = NdbProfile_name_get(profile); 1344 const char *display_name = NdbProfile_display_name_get(profile); 1345 1346 // words + pubkey + created 1347 if (name) { 1348 ndb_make_search_key(&index, note->pubkey, note->created_at, 1349 name); 1350 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 1351 return 0; 1352 } 1353 1354 if (display_name) { 1355 // don't write the same name/display_name twice 1356 if (name && !strcmp(display_name, name)) { 1357 return 1; 1358 } 1359 ndb_make_search_key(&index, note->pubkey, note->created_at, 1360 display_name); 1361 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 1362 return 0; 1363 } 1364 1365 return 1; 1366 } 1367 1368 static inline void ndb_tsid_init(struct ndb_tsid *key, unsigned char *id, 1369 uint64_t timestamp) 1370 { 1371 memcpy(key->id, id, 32); 1372 key->timestamp = timestamp; 1373 } 1374 1375 static inline void ndb_tsid_low(struct ndb_tsid *key, unsigned char *id) 1376 { 1377 memcpy(key->id, id, 32); 1378 key->timestamp = 0; 1379 } 1380 1381 static inline void ndb_u64_ts_init(struct ndb_u64_ts *key, uint64_t integer, 1382 uint64_t timestamp) 1383 { 1384 key->u64 = integer; 1385 key->timestamp = timestamp; 1386 } 1387 1388 // useful for range-searching for the latest key with a clustered created_at timen 1389 static inline void ndb_tsid_high(struct ndb_tsid *key, const unsigned char *id) 1390 { 1391 memcpy(key->id, id, 32); 1392 key->timestamp = UINT64_MAX; 1393 } 1394 1395 static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags) 1396 { 1397 txn->lmdb = &ndb->lmdb; 1398 MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn; 1399 if (!txn->lmdb->env) 1400 return 0; 1401 return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0; 1402 } 1403 1404 int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn) 1405 { 1406 return _ndb_begin_query(ndb, txn, MDB_RDONLY); 1407 } 1408 1409 // this should only be used in migrations, etc 1410 static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn) 1411 { 1412 return _ndb_begin_query(ndb, txn, 0); 1413 } 1414 1415 static int ndb_db_is_index(enum ndb_dbs index) 1416 { 1417 switch (index) { 1418 case NDB_DB_NOTE: 1419 case NDB_DB_META: 1420 case NDB_DB_PROFILE: 1421 case NDB_DB_NOTE_ID: 1422 case NDB_DB_NDB_META: 1423 case NDB_DB_PROFILE_SEARCH: 1424 case NDB_DB_PROFILE_LAST_FETCH: 1425 case NDB_DBS: 1426 return 0; 1427 case NDB_DB_PROFILE_PK: 1428 case NDB_DB_NOTE_KIND: 1429 case NDB_DB_NOTE_TEXT: 1430 case NDB_DB_NOTE_BLOCKS: 1431 case NDB_DB_NOTE_TAGS: 1432 case NDB_DB_NOTE_PUBKEY: 1433 case NDB_DB_NOTE_PUBKEY_KIND: 1434 return 1; 1435 } 1436 1437 return 0; 1438 } 1439 1440 static inline void ndb_id_u64_ts_init(struct ndb_id_u64_ts *key, 1441 unsigned char *id, uint64_t iu64, 1442 uint64_t timestamp) 1443 { 1444 memcpy(key->id, id, 32); 1445 key->u64 = iu64; 1446 key->timestamp = timestamp; 1447 } 1448 1449 static int ndb_write_note_pubkey_index(struct ndb_txn *txn, struct ndb_note *note, 1450 uint64_t note_key) 1451 { 1452 int rc; 1453 struct ndb_tsid key; 1454 MDB_val k, v; 1455 1456 ndb_tsid_init(&key, ndb_note_pubkey(note), ndb_note_created_at(note)); 1457 1458 k.mv_data = &key; 1459 k.mv_size = sizeof(key); 1460 1461 v.mv_data = ¬e_key; 1462 v.mv_size = sizeof(note_key); 1463 1464 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY], &k, &v, 0))) { 1465 fprintf(stderr, "write note pubkey index failed: %s\n", 1466 mdb_strerror(rc)); 1467 return 0; 1468 } 1469 1470 return 1; 1471 } 1472 1473 static int ndb_write_note_pubkey_kind_index(struct ndb_txn *txn, 1474 struct ndb_note *note, 1475 uint64_t note_key) 1476 { 1477 int rc; 1478 struct ndb_id_u64_ts key; 1479 MDB_val k, v; 1480 1481 ndb_id_u64_ts_init(&key, ndb_note_pubkey(note), ndb_note_kind(note), 1482 ndb_note_created_at(note)); 1483 1484 k.mv_data = &key; 1485 k.mv_size = sizeof(key); 1486 1487 v.mv_data = ¬e_key; 1488 v.mv_size = sizeof(note_key); 1489 1490 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND], &k, &v, 0))) { 1491 fprintf(stderr, "write note pubkey_kind index failed: %s\n", 1492 mdb_strerror(rc)); 1493 return 0; 1494 } 1495 1496 return 1; 1497 } 1498 1499 1500 static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, int num_indices) 1501 { 1502 MDB_val k, v; 1503 MDB_cursor *cur; 1504 int i, drop_dbi, count, rc; 1505 uint64_t note_key; 1506 struct ndb_note *note; 1507 enum ndb_dbs index; 1508 1509 // 0 means empty, not delete the dbi 1510 drop_dbi = 0; 1511 1512 // ensure they are all index dbs 1513 for (i = 0; i < num_indices; i++) { 1514 if (!ndb_db_is_index(indices[i])) { 1515 fprintf(stderr, "ndb_rebuild_note_index: %s is not an index db\n", ndb_db_name(index)); 1516 return -1; 1517 } 1518 } 1519 1520 // empty the index dbs before we rebuild 1521 for (i = 0; i < num_indices; i++) { 1522 index = indices[i]; 1523 if (mdb_drop(txn->mdb_txn, index, drop_dbi)) { 1524 fprintf(stderr, "ndb_rebuild_pubkey_index: mdb_drop failed for %s\n", ndb_db_name(index)); 1525 return -1; 1526 } 1527 } 1528 1529 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE], &cur))) { 1530 fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); 1531 return -1; 1532 } 1533 1534 count = 0; 1535 1536 // loop through all notes and write search indices 1537 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1538 note = v.mv_data; 1539 note_key = *((uint64_t*)k.mv_data); 1540 1541 for (i = 0; i < num_indices; i++) { 1542 index = indices[i]; 1543 switch (index) { 1544 case NDB_DB_NOTE: 1545 case NDB_DB_META: 1546 case NDB_DB_PROFILE: 1547 case NDB_DB_NOTE_ID: 1548 case NDB_DB_NDB_META: 1549 case NDB_DB_PROFILE_SEARCH: 1550 case NDB_DB_PROFILE_LAST_FETCH: 1551 case NDB_DBS: 1552 // this should never happen since we check at 1553 // the start 1554 count = -1; 1555 goto cleanup; 1556 case NDB_DB_PROFILE_PK: 1557 case NDB_DB_NOTE_KIND: 1558 case NDB_DB_NOTE_TEXT: 1559 case NDB_DB_NOTE_BLOCKS: 1560 case NDB_DB_NOTE_TAGS: 1561 fprintf(stderr, "%s index rebuild not supported yet. sorry.\n", ndb_db_name(index)); 1562 count = -1; 1563 goto cleanup; 1564 case NDB_DB_NOTE_PUBKEY: 1565 if (!ndb_write_note_pubkey_index(txn, note, note_key)) { 1566 count = -1; 1567 goto cleanup; 1568 } 1569 break; 1570 case NDB_DB_NOTE_PUBKEY_KIND: 1571 if (!ndb_write_note_pubkey_kind_index(txn, note, note_key)) { 1572 count = -1; 1573 goto cleanup; 1574 } 1575 break; 1576 } 1577 } 1578 1579 count++; 1580 } 1581 1582 cleanup: 1583 mdb_cursor_close(cur); 1584 1585 return count; 1586 } 1587 1588 1589 // Migrations 1590 // 1591 1592 // This was before we had note_profile_pubkey{,_kind} indices. Let's create them. 1593 static int ndb_migrate_profile_indices(struct ndb_txn *txn) 1594 { 1595 int count; 1596 1597 enum ndb_dbs indices[] = {NDB_DB_NOTE_PUBKEY, NDB_DB_NOTE_PUBKEY_KIND}; 1598 if ((count = ndb_rebuild_note_indices(txn, indices, 2)) != -1) { 1599 fprintf(stderr, "migrated %d notes to have pubkey and pubkey_kind indices\n", count); 1600 return 1; 1601 } else { 1602 fprintf(stderr, "error migrating notes to have pubkey and pubkey_kind indices, aborting.\n"); 1603 return 0; 1604 } 1605 } 1606 1607 static int ndb_migrate_user_search_indices(struct ndb_txn *txn) 1608 { 1609 int rc; 1610 MDB_cursor *cur; 1611 MDB_val k, v; 1612 void *profile_root; 1613 NdbProfileRecord_table_t record; 1614 struct ndb_note *note; 1615 uint64_t note_key, profile_key; 1616 size_t len; 1617 int count; 1618 1619 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { 1620 fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); 1621 return 0; 1622 } 1623 1624 count = 0; 1625 1626 // loop through all profiles and write search indices 1627 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1628 profile_root = v.mv_data; 1629 profile_key = *((uint64_t*)k.mv_data); 1630 record = NdbProfileRecord_as_root(profile_root); 1631 note_key = NdbProfileRecord_note_key(record); 1632 note = ndb_get_note_by_key(txn, note_key, &len); 1633 1634 if (note == NULL) { 1635 continue; 1636 } 1637 1638 if (!ndb_write_profile_search_indices(txn, note, profile_key, 1639 profile_root)) { 1640 fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n"); 1641 continue; 1642 } 1643 1644 count++; 1645 } 1646 1647 fprintf(stderr, "migrated %d profiles to include search indices\n", count); 1648 1649 mdb_cursor_close(cur); 1650 1651 return 1; 1652 } 1653 1654 static int ndb_migrate_lower_user_search_indices(struct ndb_txn *txn) 1655 { 1656 // just drop the search db so we can rebuild it 1657 if (mdb_drop(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 0)) { 1658 fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n"); 1659 return 0; 1660 } 1661 1662 return ndb_migrate_user_search_indices(txn); 1663 } 1664 1665 int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile); 1666 1667 1668 int ndb_db_version(struct ndb_txn *txn) 1669 { 1670 uint64_t version, version_key; 1671 MDB_val k, v; 1672 1673 version_key = NDB_META_KEY_VERSION; 1674 k.mv_data = &version_key; 1675 k.mv_size = sizeof(version_key); 1676 1677 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &k, &v)) { 1678 version = -1; 1679 } else { 1680 if (v.mv_size != 8) { 1681 fprintf(stderr, "run_migrations: invalid version size?"); 1682 return 0; 1683 } 1684 version = *((uint64_t*)v.mv_data); 1685 } 1686 1687 return version; 1688 } 1689 1690 // custom pubkey+kind+timestamp comparison function. This is used by lmdb to 1691 // perform b+ tree searches over the pubkey+kind+timestamp index 1692 static int ndb_id_u64_ts_compare(const MDB_val *a, const MDB_val *b) 1693 { 1694 struct ndb_id_u64_ts *tsa, *tsb; 1695 MDB_val a2 = *a, b2 = *b; 1696 1697 a2.mv_size = sizeof(tsa->id); 1698 b2.mv_size = sizeof(tsb->id); 1699 1700 int cmp = mdb_cmp_memn(&a2, &b2); 1701 if (cmp) return cmp; 1702 1703 tsa = a->mv_data; 1704 tsb = b->mv_data; 1705 1706 if (tsa->u64 < tsb->u64) 1707 return -1; 1708 else if (tsa->u64 > tsb->u64) 1709 return 1; 1710 1711 if (tsa->timestamp < tsb->timestamp) 1712 return -1; 1713 else if (tsa->timestamp > tsb->timestamp) 1714 return 1; 1715 1716 return 0; 1717 } 1718 1719 // custom kind+timestamp comparison function. This is used by lmdb to perform 1720 // b+ tree searches over the kind+timestamp index 1721 static int ndb_u64_ts_compare(const MDB_val *a, const MDB_val *b) 1722 { 1723 struct ndb_u64_ts *tsa, *tsb; 1724 tsa = a->mv_data; 1725 tsb = b->mv_data; 1726 1727 if (tsa->u64 < tsb->u64) 1728 return -1; 1729 else if (tsa->u64 > tsb->u64) 1730 return 1; 1731 1732 if (tsa->timestamp < tsb->timestamp) 1733 return -1; 1734 else if (tsa->timestamp > tsb->timestamp) 1735 return 1; 1736 1737 return 0; 1738 } 1739 1740 static int ndb_tsid_compare(const MDB_val *a, const MDB_val *b) 1741 { 1742 struct ndb_tsid *tsa, *tsb; 1743 MDB_val a2 = *a, b2 = *b; 1744 1745 a2.mv_size = sizeof(tsa->id); 1746 b2.mv_size = sizeof(tsb->id); 1747 1748 int cmp = mdb_cmp_memn(&a2, &b2); 1749 if (cmp) return cmp; 1750 1751 tsa = a->mv_data; 1752 tsb = b->mv_data; 1753 1754 if (tsa->timestamp < tsb->timestamp) 1755 return -1; 1756 else if (tsa->timestamp > tsb->timestamp) 1757 return 1; 1758 return 0; 1759 } 1760 1761 enum ndb_ingester_msgtype { 1762 NDB_INGEST_EVENT, // write json to the ingester queue for processing 1763 NDB_INGEST_QUIT, // kill ingester thread immediately 1764 }; 1765 1766 struct ndb_ingester_event { 1767 char *json; 1768 unsigned client : 1; // ["EVENT", {...}] messages 1769 unsigned len : 31; 1770 }; 1771 1772 struct ndb_writer_note { 1773 struct ndb_note *note; 1774 size_t note_len; 1775 }; 1776 1777 struct ndb_writer_profile { 1778 struct ndb_writer_note note; 1779 struct ndb_profile_record_builder record; 1780 }; 1781 1782 struct ndb_ingester_msg { 1783 enum ndb_ingester_msgtype type; 1784 union { 1785 struct ndb_ingester_event event; 1786 }; 1787 }; 1788 1789 struct ndb_writer_ndb_meta { 1790 // these are 64 bit because I'm paranoid of db-wide alignment issues 1791 uint64_t version; 1792 }; 1793 1794 // Used in the writer thread when writing ndb_profile_fetch_record's 1795 // kv = pubkey: recor 1796 struct ndb_writer_last_fetch { 1797 unsigned char pubkey[32]; 1798 uint64_t fetched_at; 1799 }; 1800 1801 // write note blocks 1802 struct ndb_writer_blocks { 1803 struct ndb_blocks *blocks; 1804 uint64_t note_key; 1805 }; 1806 1807 // The different types of messages that the writer thread can write to the 1808 // database 1809 struct ndb_writer_msg { 1810 enum ndb_writer_msgtype type; 1811 union { 1812 struct ndb_writer_note note; 1813 struct ndb_writer_profile profile; 1814 struct ndb_writer_ndb_meta ndb_meta; 1815 struct ndb_writer_last_fetch last_fetch; 1816 struct ndb_writer_blocks blocks; 1817 }; 1818 }; 1819 1820 static inline int ndb_writer_queue_msg(struct ndb_writer *writer, 1821 struct ndb_writer_msg *msg) 1822 { 1823 return prot_queue_push(&writer->inbox, msg); 1824 } 1825 1826 static uint64_t ndb_write_note_and_profile(struct ndb_txn *txn, struct ndb_writer_profile *profile, unsigned char *scratch, size_t scratch_size, uint32_t ndb_flags); 1827 static int ndb_migrate_utf8_profile_names(struct ndb_txn *txn) 1828 { 1829 int rc; 1830 MDB_cursor *cur; 1831 MDB_val k, v; 1832 void *profile_root; 1833 NdbProfileRecord_table_t record; 1834 struct ndb_note *note, *copied_note; 1835 uint64_t note_key; 1836 size_t len; 1837 int count, failed, ret; 1838 struct ndb_writer_profile profile; 1839 1840 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { 1841 fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc); 1842 return 0; 1843 } 1844 1845 size_t scratch_size = 8 * 1024 * 1024; 1846 unsigned char *scratch = malloc(scratch_size); 1847 1848 ret = 1; 1849 count = 0; 1850 failed = 0; 1851 1852 // loop through all profiles and write search indices 1853 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1854 profile_root = v.mv_data; 1855 record = NdbProfileRecord_as_root(profile_root); 1856 note_key = NdbProfileRecord_note_key(record); 1857 note = ndb_get_note_by_key(txn, note_key, &len); 1858 1859 if (note == NULL) { 1860 failed++; 1861 continue; 1862 } 1863 1864 struct ndb_profile_record_builder *b = &profile.record; 1865 1866 // reprocess profile 1867 if (!ndb_process_profile_note(note, b)) { 1868 failed++; 1869 continue; 1870 } 1871 1872 // the writer needs to own this note, and its expected to free it 1873 copied_note = malloc(len); 1874 memcpy(copied_note, note, len); 1875 1876 profile.note.note = copied_note; 1877 profile.note.note_len = len; 1878 1879 // we don't pass in flags when migrating... a bit sketchy but 1880 // whatever. noone is using this to customize nostrdb atm 1881 if (ndb_write_note_and_profile(txn, &profile, scratch, scratch_size, 0)) { 1882 count++; 1883 } 1884 } 1885 1886 fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count); 1887 1888 if (failed != 0) { 1889 fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed); 1890 } 1891 1892 free(scratch); 1893 mdb_cursor_close(cur); 1894 1895 return ret; 1896 } 1897 1898 static struct ndb_migration MIGRATIONS[] = { 1899 { .fn = ndb_migrate_user_search_indices }, 1900 { .fn = ndb_migrate_lower_user_search_indices }, 1901 { .fn = ndb_migrate_utf8_profile_names }, 1902 { .fn = ndb_migrate_profile_indices }, 1903 }; 1904 1905 1906 int ndb_end_query(struct ndb_txn *txn) 1907 { 1908 // this works on read or write queries. 1909 return mdb_txn_commit(txn->mdb_txn) == 0; 1910 } 1911 1912 int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32], 1913 unsigned char sig[64]) 1914 { 1915 secp256k1_xonly_pubkey xonly_pubkey; 1916 int ok; 1917 1918 ok = secp256k1_xonly_pubkey_parse((secp256k1_context*)ctx, &xonly_pubkey, 1919 pubkey) != 0; 1920 if (!ok) return 0; 1921 1922 ok = secp256k1_schnorrsig_verify((secp256k1_context*)ctx, sig, id, 32, 1923 &xonly_pubkey) > 0; 1924 if (!ok) return 0; 1925 1926 return 1; 1927 } 1928 1929 static int ndb_writer_queue_note(struct ndb_writer *writer, 1930 struct ndb_note *note, size_t note_len) 1931 { 1932 struct ndb_writer_msg msg; 1933 msg.type = NDB_WRITER_NOTE; 1934 1935 msg.note.note = note; 1936 msg.note.note_len = note_len; 1937 1938 return prot_queue_push(&writer->inbox, &msg); 1939 } 1940 1941 static void ndb_writer_last_profile_fetch(struct ndb_txn *txn, 1942 const unsigned char *pubkey, 1943 uint64_t fetched_at) 1944 { 1945 int rc; 1946 MDB_val key, val; 1947 1948 key.mv_data = (unsigned char*)pubkey; 1949 key.mv_size = 32; 1950 val.mv_data = &fetched_at; 1951 val.mv_size = sizeof(fetched_at); 1952 1953 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], 1954 &key, &val, 0))) 1955 { 1956 ndb_debug("write version to ndb_meta failed: %s\n", 1957 mdb_strerror(rc)); 1958 return; 1959 } 1960 1961 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 1962 } 1963 1964 1965 // We just received a profile that we haven't processed yet, but it could 1966 // be an older one! Make sure we only write last fetched profile if it's a new 1967 // one 1968 // 1969 // To do this, we first check the latest profile in the database. If the 1970 // created_date for this profile note is newer, then we write a 1971 // last_profile_fetch record, otherwise we do not. 1972 // 1973 // WARNING: This function is only valid when called from the writer thread 1974 static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn, 1975 struct ndb_note *note) 1976 { 1977 size_t len; 1978 uint64_t profile_key, note_key; 1979 void *root; 1980 struct ndb_note *last_profile; 1981 NdbProfileRecord_table_t record; 1982 1983 if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) { 1984 record = NdbProfileRecord_as_root(root); 1985 note_key = NdbProfileRecord_note_key(record); 1986 last_profile = ndb_get_note_by_key(txn, note_key, &len); 1987 if (last_profile == NULL) { 1988 return 0; 1989 } 1990 1991 // found profile, let's see if it's newer than ours 1992 if (note->created_at > last_profile->created_at) { 1993 // this is a new profile note, record last fetched time 1994 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 1995 } 1996 } else { 1997 // couldn't fetch profile. record last fetched time 1998 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 1999 } 2000 2001 return 1; 2002 } 2003 2004 int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, 2005 uint64_t fetched_at) 2006 { 2007 struct ndb_writer_msg msg; 2008 msg.type = NDB_WRITER_PROFILE_LAST_FETCH; 2009 memcpy(&msg.last_fetch.pubkey[0], pubkey, 32); 2010 msg.last_fetch.fetched_at = fetched_at; 2011 2012 return ndb_writer_queue_msg(&ndb->writer, &msg); 2013 } 2014 2015 2016 // When doing cursor scans from greatest to lowest, this function positions the 2017 // cursor at the first element before descending. MDB_SET_RANGE puts us right 2018 // after the first element, so we have to go back one. 2019 static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v) 2020 { 2021 // Position cursor at the next key greater than or equal to the 2022 // specified key 2023 if (mdb_cursor_get(cur, k, v, MDB_SET_RANGE)) { 2024 // Failed :(. It could be the last element? 2025 if (mdb_cursor_get(cur, k, v, MDB_LAST)) 2026 return 0; 2027 } else { 2028 // if set range worked and our key exists, it should be 2029 // the one right before this one 2030 if (mdb_cursor_get(cur, k, v, MDB_PREV)) 2031 return 0; 2032 } 2033 2034 return 1; 2035 } 2036 2037 // get some value based on a clustered id key 2038 int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id, 2039 MDB_val *val) 2040 { 2041 MDB_val k, v; 2042 MDB_cursor *cur; 2043 int success = 0, rc; 2044 struct ndb_tsid tsid; 2045 2046 // position at the most recent 2047 ndb_tsid_high(&tsid, id); 2048 2049 k.mv_data = &tsid; 2050 k.mv_size = sizeof(tsid); 2051 2052 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur))) { 2053 ndb_debug("ndb_get_tsid: failed to open cursor: '%s'\n", mdb_strerror(rc)); 2054 return 0; 2055 } 2056 2057 if (!ndb_cursor_start(cur, &k, &v)) 2058 goto cleanup; 2059 2060 if (memcmp(k.mv_data, id, 32) == 0) { 2061 *val = v; 2062 success = 1; 2063 } 2064 2065 cleanup: 2066 mdb_cursor_close(cur); 2067 return success; 2068 } 2069 2070 static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key, 2071 enum ndb_dbs store, size_t *len) 2072 { 2073 MDB_val k, v; 2074 2075 k.mv_data = &key; 2076 k.mv_size = sizeof(key); 2077 2078 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 2079 ndb_debug("ndb_lookup_by_key: mdb_get note failed\n"); 2080 return NULL; 2081 } 2082 2083 if (len) 2084 *len = v.mv_size; 2085 2086 return v.mv_data; 2087 } 2088 2089 static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind, 2090 enum ndb_dbs store, const unsigned char *pk, 2091 size_t *len, uint64_t *primkey) 2092 { 2093 MDB_val k, v; 2094 void *res = NULL; 2095 if (len) 2096 *len = 0; 2097 2098 if (!ndb_get_tsid(txn, ind, pk, &k)) { 2099 //ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n"); 2100 return 0; 2101 } 2102 2103 if (primkey) 2104 *primkey = *(uint64_t*)k.mv_data; 2105 2106 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 2107 ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n"); 2108 return 0; 2109 } 2110 2111 res = v.mv_data; 2112 assert(((uint64_t)res % 4) == 0); 2113 if (len) 2114 *len = v.mv_size; 2115 return res; 2116 } 2117 2118 void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pk, size_t *len, uint64_t *key) 2119 { 2120 return ndb_lookup_tsid(txn, NDB_DB_PROFILE_PK, NDB_DB_PROFILE, pk, len, key); 2121 } 2122 2123 struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *key) 2124 { 2125 return ndb_lookup_tsid(txn, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len, key); 2126 } 2127 2128 static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn, 2129 enum ndb_dbs db, 2130 const unsigned char *id) 2131 { 2132 MDB_val k; 2133 2134 if (!ndb_get_tsid(txn, db, id, &k)) 2135 return 0; 2136 2137 return *(uint32_t*)k.mv_data; 2138 } 2139 2140 uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id) 2141 { 2142 return ndb_get_indexkey_by_id(txn, NDB_DB_NOTE_ID, id); 2143 } 2144 2145 uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id) 2146 { 2147 return ndb_get_indexkey_by_id(txn, NDB_DB_PROFILE_PK, id); 2148 } 2149 2150 struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 2151 { 2152 return ndb_lookup_by_key(txn, key, NDB_DB_NOTE, len); 2153 } 2154 2155 void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 2156 { 2157 return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len); 2158 } 2159 2160 uint64_t 2161 ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey) 2162 { 2163 MDB_val k, v; 2164 2165 k.mv_data = (unsigned char*)pubkey; 2166 k.mv_size = 32; 2167 2168 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) { 2169 //ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n"); 2170 return 0; 2171 } 2172 2173 return *((uint64_t*)v.mv_data); 2174 } 2175 2176 2177 static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id) 2178 { 2179 MDB_val val; 2180 2181 if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val)) 2182 return 0; 2183 2184 return 1; 2185 } 2186 2187 static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb, 2188 MDB_txn *mdb_txn) 2189 { 2190 txn->lmdb = lmdb; 2191 txn->mdb_txn = mdb_txn; 2192 } 2193 2194 static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid) 2195 { 2196 unsigned char id[32]; 2197 struct ndb_ingest_controller *c = data; 2198 struct ndb_txn txn; 2199 2200 hex_decode(hexid, 64, id, sizeof(id)); 2201 2202 // let's see if we already have it 2203 2204 ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn); 2205 if (!ndb_has_note(&txn, id)) 2206 return NDB_IDRES_CONT; 2207 2208 return NDB_IDRES_STOP; 2209 } 2210 2211 static int ndbprofile_parse_json(flatcc_builder_t *B, 2212 const char *buf, size_t bufsiz, int flags, NdbProfile_ref_t *profile) 2213 { 2214 flatcc_json_parser_t parser, *ctx = &parser; 2215 flatcc_json_parser_init(ctx, B, buf, buf + bufsiz, flags); 2216 2217 if (flatcc_builder_start_buffer(B, 0, 0, 0)) 2218 return 0; 2219 2220 NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile); 2221 if (ctx->error) 2222 return 0; 2223 2224 if (!flatcc_builder_end_buffer(B, *profile)) 2225 return 0; 2226 2227 ctx->end_loc = buf; 2228 2229 2230 return 1; 2231 } 2232 2233 void ndb_profile_record_builder_init(struct ndb_profile_record_builder *b) 2234 { 2235 b->builder = malloc(sizeof(*b->builder)); 2236 b->flatbuf = NULL; 2237 } 2238 2239 void ndb_profile_record_builder_free(struct ndb_profile_record_builder *b) 2240 { 2241 if (b->builder) 2242 free(b->builder); 2243 if (b->flatbuf) 2244 free(b->flatbuf); 2245 2246 b->builder = NULL; 2247 b->flatbuf = NULL; 2248 } 2249 2250 int ndb_process_profile_note(struct ndb_note *note, 2251 struct ndb_profile_record_builder *profile) 2252 { 2253 int res; 2254 2255 NdbProfile_ref_t profile_table; 2256 flatcc_builder_t *builder; 2257 2258 ndb_profile_record_builder_init(profile); 2259 builder = profile->builder; 2260 flatcc_builder_init(builder); 2261 2262 NdbProfileRecord_start_as_root(builder); 2263 2264 //printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note)); 2265 if (!(res = ndbprofile_parse_json(builder, ndb_note_content(note), 2266 note->content_length, 2267 flatcc_json_parser_f_skip_unknown, 2268 &profile_table))) 2269 { 2270 ndb_debug("profile_parse_json failed %d '%.*s'\n", res, 2271 note->content_length, ndb_note_content(note)); 2272 ndb_profile_record_builder_free(profile); 2273 return 0; 2274 } 2275 2276 uint64_t received_at = time(NULL); 2277 const char *lnurl = "fixme"; 2278 2279 NdbProfileRecord_profile_add(builder, profile_table); 2280 NdbProfileRecord_received_at_add(builder, received_at); 2281 2282 flatcc_builder_ref_t lnurl_off; 2283 lnurl_off = flatcc_builder_create_string_str(builder, lnurl); 2284 2285 NdbProfileRecord_lnurl_add(builder, lnurl_off); 2286 2287 //*profile = flatcc_builder_finalize_aligned_buffer(builder, profile_len); 2288 return 1; 2289 } 2290 2291 static int ndb_ingester_queue_event(struct ndb_ingester *ingester, 2292 char *json, unsigned len, unsigned client) 2293 { 2294 struct ndb_ingester_msg msg; 2295 msg.type = NDB_INGEST_EVENT; 2296 2297 msg.event.json = json; 2298 msg.event.len = len; 2299 msg.event.client = client; 2300 2301 return threadpool_dispatch(&ingester->tp, &msg); 2302 } 2303 2304 2305 static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json, 2306 int len, unsigned client) 2307 { 2308 // Since we need to return as soon as possible, and we're not 2309 // making any assumptions about the lifetime of the string, we 2310 // definitely need to copy the json here. In the future once we 2311 // have our thread that manages a websocket connection, we can 2312 // avoid the copy and just use the buffer we get from that 2313 // thread. 2314 char *json_copy = strdupn(json, len); 2315 if (json_copy == NULL) 2316 return 0; 2317 2318 return ndb_ingester_queue_event(ingester, json_copy, len, client); 2319 } 2320 2321 2322 static int ndb_ingester_process_note(secp256k1_context *ctx, 2323 struct ndb_note *note, 2324 size_t note_size, 2325 struct ndb_writer_msg *out, 2326 struct ndb_ingester *ingester) 2327 { 2328 enum ndb_ingest_filter_action action; 2329 action = NDB_INGEST_ACCEPT; 2330 2331 if (ingester->filter) 2332 action = ingester->filter(ingester->filter_context, note); 2333 2334 if (action == NDB_INGEST_REJECT) 2335 return 0; 2336 2337 // some special situations we might want to skip sig validation, 2338 // like during large imports 2339 if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) { 2340 // if we're skipping validation we don't need to verify 2341 } else { 2342 // verify! If it's an invalid note we don't need to 2343 // bother writing it to the database 2344 if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) { 2345 ndb_debug("signature verification failed\n"); 2346 return 0; 2347 } 2348 } 2349 2350 // we didn't find anything. let's send it 2351 // to the writer thread 2352 note = realloc(note, note_size); 2353 assert(((uint64_t)note % 4) == 0); 2354 2355 if (note->kind == 0) { 2356 struct ndb_profile_record_builder *b = 2357 &out->profile.record; 2358 2359 ndb_process_profile_note(note, b); 2360 2361 out->type = NDB_WRITER_PROFILE; 2362 out->profile.note.note = note; 2363 out->profile.note.note_len = note_size; 2364 return 1; 2365 } else if (note->kind == 6) { 2366 // process the repost if we have a repost event 2367 ndb_debug("processing kind 6 repost\n"); 2368 ndb_ingest_event(ingester, ndb_note_content(note), 2369 ndb_note_content_length(note), 0); 2370 } 2371 2372 out->type = NDB_WRITER_NOTE; 2373 out->note.note = note; 2374 out->note.note_len = note_size; 2375 2376 return 1; 2377 } 2378 2379 2380 static int ndb_ingester_process_event(secp256k1_context *ctx, 2381 struct ndb_ingester *ingester, 2382 struct ndb_ingester_event *ev, 2383 struct ndb_writer_msg *out, 2384 MDB_txn *read_txn 2385 ) 2386 { 2387 struct ndb_tce tce; 2388 struct ndb_fce fce; 2389 struct ndb_note *note; 2390 struct ndb_ingest_controller controller; 2391 struct ndb_id_cb cb; 2392 void *buf; 2393 int ok; 2394 size_t bufsize, note_size; 2395 2396 ok = 0; 2397 2398 // we will use this to check if we already have it in the DB during 2399 // ID parsing 2400 controller.read_txn = read_txn; 2401 controller.lmdb = ingester->lmdb; 2402 cb.fn = ndb_ingester_json_controller; 2403 cb.data = &controller; 2404 2405 // since we're going to be passing this allocated note to a different 2406 // thread, we can't use thread-local buffers. just allocate a block 2407 bufsize = max(ev->len * 8.0, 4096); 2408 buf = malloc(bufsize); 2409 if (!buf) { 2410 ndb_debug("couldn't malloc buf\n"); 2411 return 0; 2412 } 2413 2414 note_size = 2415 ev->client ? 2416 ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) : 2417 ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb); 2418 2419 if ((int)note_size == -42) { 2420 // we already have this! 2421 //ndb_debug("already have id??\n"); 2422 goto cleanup; 2423 } else if (note_size == 0) { 2424 ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json); 2425 goto cleanup; 2426 } 2427 2428 //ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json); 2429 2430 if (ev->client) { 2431 switch (fce.evtype) { 2432 case NDB_FCE_EVENT: 2433 note = fce.event.note; 2434 if (note != buf) { 2435 ndb_debug("note buffer not equal to malloc'd buffer\n"); 2436 goto cleanup; 2437 } 2438 2439 if (!ndb_ingester_process_note(ctx, note, note_size, 2440 out, ingester)) { 2441 ndb_debug("failed to process note\n"); 2442 goto cleanup; 2443 } else { 2444 // we're done with the original json, free it 2445 free(ev->json); 2446 return 1; 2447 } 2448 } 2449 } else { 2450 switch (tce.evtype) { 2451 case NDB_TCE_AUTH: goto cleanup; 2452 case NDB_TCE_NOTICE: goto cleanup; 2453 case NDB_TCE_EOSE: goto cleanup; 2454 case NDB_TCE_OK: goto cleanup; 2455 case NDB_TCE_EVENT: 2456 note = tce.event.note; 2457 if (note != buf) { 2458 ndb_debug("note buffer not equal to malloc'd buffer\n"); 2459 goto cleanup; 2460 } 2461 2462 if (!ndb_ingester_process_note(ctx, note, note_size, 2463 out, ingester)) { 2464 ndb_debug("failed to process note\n"); 2465 goto cleanup; 2466 } else { 2467 // we're done with the original json, free it 2468 free(ev->json); 2469 return 1; 2470 } 2471 } 2472 } 2473 2474 2475 cleanup: 2476 free(ev->json); 2477 free(buf); 2478 2479 return ok; 2480 } 2481 2482 static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db) 2483 { 2484 MDB_cursor *mc; 2485 MDB_val key, val; 2486 2487 if (mdb_cursor_open(txn, db, &mc)) 2488 return 0; 2489 2490 if (mdb_cursor_get(mc, &key, &val, MDB_LAST)) { 2491 mdb_cursor_close(mc); 2492 return 0; 2493 } 2494 2495 mdb_cursor_close(mc); 2496 2497 assert(key.mv_size == 8); 2498 return *((uint64_t*)key.mv_data); 2499 } 2500 2501 // 2502 // make a search key meant for user queries without any other note info 2503 static void ndb_make_search_key_low(struct ndb_search_key *key, const char *search) 2504 { 2505 memset(key->id, 0, sizeof(key->id)); 2506 key->timestamp = 0; 2507 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 2508 key->search[sizeof(key->search) - 1] = '\0'; 2509 } 2510 2511 int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query) 2512 { 2513 int rc; 2514 struct ndb_search_key s; 2515 MDB_val k, v; 2516 search->cursor = NULL; 2517 2518 MDB_cursor **cursor = (MDB_cursor **)&search->cursor; 2519 2520 ndb_make_search_key_low(&s, query); 2521 2522 k.mv_data = &s; 2523 k.mv_size = sizeof(s); 2524 2525 if ((rc = mdb_cursor_open(txn->mdb_txn, 2526 txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 2527 cursor))) { 2528 printf("search_profile: cursor opened failed: %s\n", 2529 mdb_strerror(rc)); 2530 return 0; 2531 } 2532 2533 // Position cursor at the next key greater than or equal to the specified key 2534 if (mdb_cursor_get(search->cursor, &k, &v, MDB_SET_RANGE)) { 2535 printf("search_profile: cursor get failed\n"); 2536 goto cleanup; 2537 } else { 2538 search->key = k.mv_data; 2539 assert(v.mv_size == 8); 2540 search->profile_key = *((uint64_t*)v.mv_data); 2541 return 1; 2542 } 2543 2544 cleanup: 2545 mdb_cursor_close(search->cursor); 2546 search->cursor = NULL; 2547 return 0; 2548 } 2549 2550 void ndb_search_profile_end(struct ndb_search *search) 2551 { 2552 if (search->cursor) 2553 mdb_cursor_close(search->cursor); 2554 } 2555 2556 int ndb_search_profile_next(struct ndb_search *search) 2557 { 2558 int rc; 2559 MDB_val k, v; 2560 unsigned char *init_id; 2561 2562 init_id = search->key->id; 2563 k.mv_data = search->key; 2564 k.mv_size = sizeof(*search->key); 2565 2566 retry: 2567 if ((rc = mdb_cursor_get(search->cursor, &k, &v, MDB_NEXT))) { 2568 ndb_debug("ndb_search_profile_next: %s\n", 2569 mdb_strerror(rc)); 2570 return 0; 2571 } else { 2572 search->key = k.mv_data; 2573 assert(v.mv_size == 8); 2574 search->profile_key = *((uint64_t*)v.mv_data); 2575 2576 // skip duplicate pubkeys 2577 if (!memcmp(init_id, search->key->id, 32)) 2578 goto retry; 2579 } 2580 2581 return 1; 2582 } 2583 2584 static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b) 2585 { 2586 int cmp; 2587 struct ndb_search_key *ska, *skb; 2588 2589 ska = a->mv_data; 2590 skb = b->mv_data; 2591 2592 MDB_val a2 = *a; 2593 MDB_val b2 = *b; 2594 2595 a2.mv_data = ska->search; 2596 a2.mv_size = sizeof(ska->search) + sizeof(ska->id); 2597 2598 cmp = mdb_cmp_memn(&a2, &b2); 2599 if (cmp) return cmp; 2600 2601 if (ska->timestamp < skb->timestamp) 2602 return -1; 2603 else if (ska->timestamp > skb->timestamp) 2604 return 1; 2605 2606 return 0; 2607 } 2608 2609 static int ndb_write_profile_pk_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key) 2610 2611 { 2612 MDB_val key, val; 2613 int rc; 2614 struct ndb_tsid tsid; 2615 MDB_dbi pk_db; 2616 2617 pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK]; 2618 2619 // write profile_pk + created_at index 2620 ndb_tsid_init(&tsid, note->pubkey, note->created_at); 2621 2622 key.mv_data = &tsid; 2623 key.mv_size = sizeof(tsid); 2624 val.mv_data = &profile_key; 2625 val.mv_size = sizeof(profile_key); 2626 2627 if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) { 2628 ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n", 2629 profile_key, mdb_strerror(rc)); 2630 return 0; 2631 } 2632 2633 return 1; 2634 } 2635 2636 static int ndb_write_profile(struct ndb_txn *txn, 2637 struct ndb_writer_profile *profile, 2638 uint64_t note_key) 2639 { 2640 uint64_t profile_key; 2641 struct ndb_note *note; 2642 void *flatbuf; 2643 size_t flatbuf_len; 2644 int rc; 2645 2646 MDB_val key, val; 2647 MDB_dbi profile_db; 2648 2649 note = profile->note.note; 2650 2651 // add note_key to profile record 2652 NdbProfileRecord_note_key_add(profile->record.builder, note_key); 2653 NdbProfileRecord_end_as_root(profile->record.builder); 2654 2655 flatbuf = profile->record.flatbuf = 2656 flatcc_builder_finalize_aligned_buffer(profile->record.builder, &flatbuf_len); 2657 2658 assert(((uint64_t)flatbuf % 8) == 0); 2659 2660 // TODO: this may not be safe!? 2661 flatbuf_len = (flatbuf_len + 7) & ~7; 2662 2663 //assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0); 2664 2665 // get dbs 2666 profile_db = txn->lmdb->dbs[NDB_DB_PROFILE]; 2667 2668 // get new key 2669 profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1; 2670 2671 // write profile to profile store 2672 key.mv_data = &profile_key; 2673 key.mv_size = sizeof(profile_key); 2674 val.mv_data = flatbuf; 2675 val.mv_size = flatbuf_len; 2676 2677 if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) { 2678 ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc)); 2679 return 0; 2680 } 2681 2682 // write last fetched record 2683 if (!ndb_maybe_write_last_profile_fetch(txn, note)) { 2684 ndb_debug("failed to write last profile fetched record\n"); 2685 } 2686 2687 // write profile pubkey index 2688 if (!ndb_write_profile_pk_index(txn, note, profile_key)) { 2689 ndb_debug("failed to write profile pubkey index\n"); 2690 return 0; 2691 } 2692 2693 // write name, display_name profile search indices 2694 if (!ndb_write_profile_search_indices(txn, note, profile_key, 2695 flatbuf)) { 2696 ndb_debug("failed to write profile search indices\n"); 2697 return 0; 2698 } 2699 2700 return 1; 2701 } 2702 2703 // find the last id tag in a note (e, p, etc) 2704 static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type) 2705 { 2706 unsigned char *last = NULL; 2707 struct ndb_iterator iter; 2708 struct ndb_str str; 2709 2710 // get the liked event id (last id) 2711 ndb_tags_iterate_start(note, &iter); 2712 2713 while (ndb_tags_iterate_next(&iter)) { 2714 if (iter.tag->count < 2) 2715 continue; 2716 2717 str = ndb_tag_str(note, iter.tag, 0); 2718 2719 // assign liked to the last e tag 2720 if (str.flag == NDB_PACKED_STR && str.str[0] == type) { 2721 str = ndb_tag_str(note, iter.tag, 1); 2722 if (str.flag == NDB_PACKED_ID) 2723 last = str.id; 2724 } 2725 } 2726 2727 return last; 2728 } 2729 2730 void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len) 2731 { 2732 MDB_val k, v; 2733 2734 k.mv_data = (unsigned char*)id; 2735 k.mv_size = 32; 2736 2737 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) { 2738 //ndb_debug("ndb_get_note_meta: mdb_get note failed\n"); 2739 return NULL; 2740 } 2741 2742 if (len) 2743 *len = v.mv_size; 2744 2745 return v.mv_data; 2746 } 2747 2748 // When receiving a reaction note, look for the liked id and increase the 2749 // reaction counter in the note metadata database 2750 static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note) 2751 { 2752 size_t len; 2753 void *root; 2754 int reactions, rc; 2755 MDB_val key, val; 2756 NdbEventMeta_table_t meta; 2757 unsigned char *liked = ndb_note_last_id_tag(note, 'e'); 2758 2759 if (liked == NULL) 2760 return 0; 2761 2762 root = ndb_get_note_meta(txn, liked, &len); 2763 2764 flatcc_builder_t builder; 2765 flatcc_builder_init(&builder); 2766 NdbEventMeta_start_as_root(&builder); 2767 2768 // no meta record, let's make one 2769 if (root == NULL) { 2770 NdbEventMeta_reactions_add(&builder, 1); 2771 } else { 2772 // clone existing and add to it 2773 meta = NdbEventMeta_as_root(root); 2774 2775 reactions = NdbEventMeta_reactions_get(meta); 2776 NdbEventMeta_clone(&builder, meta); 2777 NdbEventMeta_reactions_add(&builder, reactions + 1); 2778 } 2779 2780 NdbProfileRecord_end_as_root(&builder); 2781 root = flatcc_builder_finalize_aligned_buffer(&builder, &len); 2782 assert(((uint64_t)root % 8) == 0); 2783 2784 if (root == NULL) { 2785 ndb_debug("failed to create note metadata record\n"); 2786 return 0; 2787 } 2788 2789 // metadata is keyed on id because we want to collect stats regardless 2790 // if we have the note yet or not 2791 key.mv_data = liked; 2792 key.mv_size = 32; 2793 2794 val.mv_data = root; 2795 val.mv_size = len; 2796 2797 // write the new meta record 2798 //ndb_debug("writing stats record for "); 2799 //print_hex(liked, 32); 2800 //ndb_debug("\n"); 2801 2802 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) { 2803 ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc)); 2804 free(root); 2805 return 0; 2806 } 2807 2808 free(root); 2809 2810 return 1; 2811 } 2812 2813 2814 static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note, 2815 uint64_t note_key) 2816 2817 { 2818 struct ndb_tsid tsid; 2819 int rc; 2820 MDB_val key, val; 2821 MDB_dbi id_db; 2822 2823 ndb_tsid_init(&tsid, note->id, note->created_at); 2824 2825 key.mv_data = &tsid; 2826 key.mv_size = sizeof(tsid); 2827 val.mv_data = ¬e_key; 2828 val.mv_size = sizeof(note_key); 2829 2830 id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 2831 2832 if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) { 2833 ndb_debug("write note id index to db failed: %s\n", 2834 mdb_strerror(rc)); 2835 return 0; 2836 } 2837 2838 return 1; 2839 } 2840 2841 static int ndb_filter_group_add_filters(struct ndb_filter_group *group, 2842 struct ndb_filter *filters, 2843 int num_filters) 2844 { 2845 int i; 2846 2847 for (i = 0; i < num_filters; i++) { 2848 if (!ndb_filter_group_add(group, &filters[i])) 2849 return 0; 2850 } 2851 2852 return 1; 2853 } 2854 2855 2856 static struct ndb_filter_elements * 2857 ndb_filter_find_elements(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) 2858 { 2859 int i; 2860 struct ndb_filter_elements *els; 2861 2862 for (i = 0; i < filter->num_elements; i++) { 2863 els = ndb_filter_get_elements(filter, i); 2864 assert(els); 2865 if (els->field.type == typ) { 2866 return els; 2867 } 2868 } 2869 2870 return NULL; 2871 } 2872 2873 int ndb_filter_is_subset_of(const struct ndb_filter *a, const struct ndb_filter *b) 2874 { 2875 int i; 2876 struct ndb_filter_elements *b_field, *a_field; 2877 2878 // Everything is always a subset of {} 2879 if (b->num_elements == 0) 2880 return 1; 2881 2882 // We can't be a subset if the number of elements in the other 2883 // filter is larger then the number of elements we have. 2884 if (b->num_elements > a->num_elements) 2885 return 0; 2886 2887 // If our filter count matches, we can only be a subset if we are 2888 // equal 2889 if (b->num_elements == a->num_elements) 2890 return ndb_filter_eq(a, b); 2891 2892 // If our element count is larger than the other filter, then we 2893 // must see if every element in the other filter exists in ours. If 2894 // so, then we are a subset of the other. 2895 // 2896 // eg: B={k:1, a:b} <- A={t:x, k:1, a:b} 2897 // 2898 // A is a subset of B because `k:1` and `a:b` both exist in A 2899 2900 for (i = 0; i < b->num_elements; i++) { 2901 b_field = ndb_filter_get_elements((struct ndb_filter*)b, i); 2902 a_field = ndb_filter_find_elements((struct ndb_filter*)a, 2903 b_field->field.type); 2904 2905 if (a_field == NULL) 2906 return 0; 2907 2908 if (!ndb_filter_field_eq((struct ndb_filter*)a, a_field, 2909 (struct ndb_filter*)b, b_field)) 2910 return 0; 2911 } 2912 2913 return 1; 2914 } 2915 2916 int ndb_filter_eq(const struct ndb_filter *a, const struct ndb_filter *b) 2917 { 2918 int i; 2919 struct ndb_filter_elements *a_els, *b_els; 2920 2921 if (a->num_elements != b->num_elements) 2922 return 0; 2923 2924 for (i = 0; i < a->num_elements; i++) { 2925 a_els = ndb_filter_get_elements((struct ndb_filter*)a, i); 2926 b_els = ndb_filter_find_elements((struct ndb_filter *)b, 2927 a_els->field.type); 2928 2929 if (b_els == NULL) 2930 return 0; 2931 2932 if (!ndb_filter_field_eq((struct ndb_filter*)a, a_els, 2933 (struct ndb_filter*)b, b_els)) 2934 return 0; 2935 } 2936 2937 return 1; 2938 } 2939 2940 2941 static uint64_t * 2942 ndb_filter_get_elem(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) 2943 { 2944 struct ndb_filter_elements *els; 2945 if ((els = ndb_filter_find_elements(filter, typ))) 2946 return &els->elements[0]; 2947 return NULL; 2948 } 2949 2950 static uint64_t *ndb_filter_get_int(struct ndb_filter *filter, 2951 enum ndb_filter_fieldtype typ) 2952 { 2953 uint64_t *el; 2954 if (NULL == (el = ndb_filter_get_elem(filter, typ))) 2955 return NULL; 2956 return el; 2957 } 2958 2959 static inline int push_query_result(struct ndb_query_results *results, 2960 struct ndb_query_result *result) 2961 { 2962 return cursor_push(&results->cur, (unsigned char*)result, sizeof(*result)); 2963 } 2964 2965 static int compare_query_results(const void *pa, const void *pb) 2966 { 2967 struct ndb_query_result *a, *b; 2968 2969 a = (struct ndb_query_result *)pa; 2970 b = (struct ndb_query_result *)pb; 2971 2972 if (a->note->created_at == b->note->created_at) { 2973 return 0; 2974 } else if (a->note->created_at > b->note->created_at) { 2975 return -1; 2976 } else { 2977 return 1; 2978 } 2979 } 2980 2981 static void ndb_query_result_init(struct ndb_query_result *res, 2982 struct ndb_note *note, 2983 uint64_t note_size, 2984 uint64_t note_id) 2985 { 2986 *res = (struct ndb_query_result){ 2987 .note_id = note_id, 2988 .note_size = note_size, 2989 .note = note, 2990 }; 2991 } 2992 2993 static int query_is_full(struct ndb_query_results *results, int limit) 2994 { 2995 if (results->cur.p >= results->cur.end) 2996 return 1; 2997 2998 return cursor_count(&results->cur, sizeof(struct ndb_query_result)) >= limit; 2999 } 3000 3001 static int ndb_query_plan_execute_ids(struct ndb_txn *txn, 3002 struct ndb_filter *filter, 3003 struct ndb_query_results *results, 3004 int limit 3005 ) 3006 { 3007 MDB_cursor *cur; 3008 MDB_dbi db; 3009 MDB_val k, v; 3010 int rc, i; 3011 struct ndb_filter_elements *ids; 3012 struct ndb_note *note; 3013 struct ndb_query_result res; 3014 struct ndb_tsid tsid, *ptsid; 3015 uint64_t note_id, until, *pint; 3016 size_t note_size; 3017 unsigned char *id; 3018 3019 until = UINT64_MAX; 3020 3021 if (!(ids = ndb_filter_find_elements(filter, NDB_FILTER_IDS))) 3022 return 0; 3023 3024 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3025 until = *pint; 3026 3027 db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 3028 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3029 return 0; 3030 3031 // for each id in our ids filter, find in the db 3032 for (i = 0; i < ids->count; i++) { 3033 if (query_is_full(results, limit)) 3034 break; 3035 3036 id = ndb_filter_get_id_element(filter, ids, i); 3037 ndb_tsid_init(&tsid, (unsigned char *)id, until); 3038 3039 k.mv_data = &tsid; 3040 k.mv_size = sizeof(tsid); 3041 3042 if (!ndb_cursor_start(cur, &k, &v)) 3043 continue; 3044 3045 ptsid = (struct ndb_tsid *)k.mv_data; 3046 note_id = *(uint64_t*)v.mv_data; 3047 3048 if (memcmp(id, ptsid->id, 32)) 3049 continue; 3050 3051 // get the note because we need it to match against the filter 3052 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3053 continue; 3054 3055 // Sure this particular lookup matched the index query, but 3056 // does it match the entire filter? Check! We also pass in 3057 // things we've already matched via the filter so we don't have 3058 // to check again. This can be pretty important for filters 3059 // with a large number of entries. 3060 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_IDS)) 3061 continue; 3062 3063 ndb_query_result_init(&res, note, note_size, note_id); 3064 if (!push_query_result(results, &res)) 3065 break; 3066 } 3067 3068 mdb_cursor_close(cur); 3069 return 1; 3070 } 3071 3072 // 3073 // encode a tag index key 3074 // 3075 // consists of: 3076 // 3077 // u8 tag 3078 // u8 tag_val_len 3079 // [u8] tag_val_bytes 3080 // u64 created_at 3081 // 3082 static int ndb_encode_tag_key(unsigned char *buf, int buf_size, 3083 char tag, const unsigned char *val, 3084 unsigned char val_len, 3085 uint64_t timestamp) 3086 { 3087 struct cursor writer; 3088 int ok; 3089 3090 // quick exit for obvious case where it will be too big. There can be 3091 // values of val_len that still fail, but we just let the writer handle 3092 // those failure cases 3093 if (val_len >= buf_size) 3094 return 0; 3095 3096 make_cursor(buf, buf + buf_size, &writer); 3097 3098 ok = 3099 cursor_push_byte(&writer, tag) && 3100 cursor_push(&writer, (unsigned char*)val, val_len) && 3101 cursor_push(&writer, (unsigned char*)×tamp, sizeof(timestamp)); 3102 3103 if (!ok) 3104 return 0; 3105 3106 return writer.p - writer.start; 3107 } 3108 3109 static int ndb_query_plan_execute_authors(struct ndb_txn *txn, 3110 struct ndb_filter *filter, 3111 struct ndb_query_results *results, 3112 int limit) 3113 { 3114 MDB_val k, v; 3115 MDB_cursor *cur; 3116 int rc, i; 3117 uint64_t *pint, until, since, note_key; 3118 unsigned char *author; 3119 struct ndb_note *note; 3120 size_t note_size; 3121 struct ndb_filter_elements *authors; 3122 struct ndb_query_result res; 3123 struct ndb_tsid tsid, *ptsid; 3124 enum ndb_dbs db; 3125 3126 db = txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY]; 3127 3128 if (!(authors = ndb_filter_find_elements(filter, NDB_FILTER_AUTHORS))) 3129 return 0; 3130 3131 until = UINT64_MAX; 3132 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3133 until = *pint; 3134 3135 since = 0; 3136 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3137 since = *pint; 3138 3139 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3140 return 0; 3141 3142 for (i = 0; i < authors->count; i++) { 3143 author = ndb_filter_get_id_element(filter, authors, i); 3144 3145 ndb_tsid_init(&tsid, author, until); 3146 3147 k.mv_data = &tsid; 3148 k.mv_size = sizeof(tsid); 3149 3150 if (!ndb_cursor_start(cur, &k, &v)) 3151 continue; 3152 3153 // for each id in our ids filter, find in the db 3154 while (!query_is_full(results, limit)) { 3155 ptsid = (struct ndb_tsid *)k.mv_data; 3156 note_key = *(uint64_t*)v.mv_data; 3157 3158 // don't continue the scan if we're below `since` 3159 if (ptsid->timestamp < since) 3160 break; 3161 3162 // our author should match, if not bail 3163 if (memcmp(author, ptsid->id, 32)) 3164 break; 3165 3166 // fetch the note, we need it for our query results 3167 // and to match further against the filter 3168 if (!(note = ndb_get_note_by_key(txn, note_key, ¬e_size))) 3169 goto next; 3170 3171 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_AUTHORS)) 3172 goto next; 3173 3174 ndb_query_result_init(&res, note, note_size, note_key); 3175 if (!push_query_result(results, &res)) 3176 break; 3177 3178 next: 3179 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3180 break; 3181 } 3182 } 3183 3184 mdb_cursor_close(cur); 3185 return 1; 3186 } 3187 3188 static int ndb_query_plan_execute_created_at(struct ndb_txn *txn, 3189 struct ndb_filter *filter, 3190 struct ndb_query_results *results, 3191 int limit) 3192 { 3193 MDB_dbi db; 3194 MDB_val k, v; 3195 MDB_cursor *cur; 3196 int rc; 3197 struct ndb_note *note; 3198 struct ndb_tsid key, *pkey; 3199 uint64_t *pint, until, since, note_id; 3200 size_t note_size; 3201 struct ndb_query_result res; 3202 unsigned char high_key[32] = {0xFF}; 3203 3204 db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 3205 3206 until = UINT64_MAX; 3207 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3208 until = *pint; 3209 3210 since = 0; 3211 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3212 since = *pint; 3213 3214 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3215 return 0; 3216 3217 // if we have until, start there, otherwise just use max 3218 ndb_tsid_init(&key, high_key, until); 3219 k.mv_data = &key; 3220 k.mv_size = sizeof(key); 3221 3222 if (!ndb_cursor_start(cur, &k, &v)) 3223 return 1; 3224 3225 while (!query_is_full(results, limit)) { 3226 pkey = (struct ndb_tsid *)k.mv_data; 3227 note_id = *(uint64_t*)v.mv_data; 3228 assert(v.mv_size == 8); 3229 3230 // don't continue the scan if we're below `since` 3231 if (pkey->timestamp < since) 3232 break; 3233 3234 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3235 goto next; 3236 3237 // does this entry match our filter? 3238 if (!ndb_filter_matches_with(filter, note, 0)) 3239 goto next; 3240 3241 ndb_query_result_init(&res, note, (uint64_t)note_size, note_id); 3242 if (!push_query_result(results, &res)) 3243 break; 3244 next: 3245 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3246 break; 3247 } 3248 3249 mdb_cursor_close(cur); 3250 return 1; 3251 } 3252 3253 static int ndb_query_plan_execute_tags(struct ndb_txn *txn, 3254 struct ndb_filter *filter, 3255 struct ndb_query_results *results, 3256 int limit) 3257 { 3258 MDB_cursor *cur; 3259 MDB_dbi db; 3260 MDB_val k, v; 3261 int len, taglen, rc, i; 3262 uint64_t *pint, until, note_id; 3263 size_t note_size; 3264 unsigned char key_buffer[255]; 3265 struct ndb_note *note; 3266 struct ndb_filter_elements *tags; 3267 unsigned char *tag; 3268 struct ndb_query_result res; 3269 3270 db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; 3271 3272 if (!(tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS))) 3273 return 0; 3274 3275 until = UINT64_MAX; 3276 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3277 until = *pint; 3278 3279 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3280 return 0; 3281 3282 for (i = 0; i < tags->count; i++) { 3283 tag = ndb_filter_get_id_element(filter, tags, i); 3284 3285 taglen = tags->field.elem_type == NDB_ELEMENT_ID 3286 ? 32 : strlen((const char*)tag); 3287 3288 if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), 3289 tags->field.tag, tag, taglen, 3290 until))) 3291 return 0; 3292 3293 k.mv_data = key_buffer; 3294 k.mv_size = len; 3295 3296 if (!ndb_cursor_start(cur, &k, &v)) 3297 continue; 3298 3299 // for each id in our ids filter, find in the db 3300 while (!query_is_full(results, limit)) { 3301 // check if tag value matches, bail if not 3302 if (((unsigned char *)k.mv_data)[0] != tags->field.tag) 3303 break; 3304 3305 // check if tag value matches, bail if not 3306 if (taglen != k.mv_size - 9) 3307 break; 3308 3309 if (memcmp((unsigned char *)k.mv_data+1, tag, k.mv_size-9)) 3310 break; 3311 3312 note_id = *(uint64_t*)v.mv_data; 3313 3314 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3315 goto next; 3316 3317 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_TAGS)) 3318 goto next; 3319 3320 ndb_query_result_init(&res, note, note_size, note_id); 3321 if (!push_query_result(results, &res)) 3322 break; 3323 3324 next: 3325 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3326 break; 3327 } 3328 } 3329 3330 mdb_cursor_close(cur); 3331 return 1; 3332 } 3333 3334 static int ndb_query_plan_execute_kinds(struct ndb_txn *txn, 3335 struct ndb_filter *filter, 3336 struct ndb_query_results *results, 3337 int limit) 3338 { 3339 MDB_cursor *cur; 3340 MDB_dbi db; 3341 MDB_val k, v; 3342 struct ndb_note *note; 3343 struct ndb_u64_ts tsid, *ptsid; 3344 struct ndb_filter_elements *kinds; 3345 struct ndb_query_result res; 3346 uint64_t kind, note_id, until, since, *pint; 3347 size_t note_size; 3348 int i, rc; 3349 3350 // we should have kinds in a kinds filter! 3351 if (!(kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS))) 3352 return 0; 3353 3354 until = UINT64_MAX; 3355 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3356 until = *pint; 3357 3358 since = 0; 3359 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3360 since = *pint; 3361 3362 db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 3363 3364 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3365 return 0; 3366 3367 for (i = 0; i < kinds->count; i++) { 3368 if (query_is_full(results, limit)) 3369 break; 3370 3371 kind = kinds->elements[i]; 3372 ndb_debug("kind %" PRIu64 "\n", kind); 3373 ndb_u64_ts_init(&tsid, kind, until); 3374 3375 k.mv_data = &tsid; 3376 k.mv_size = sizeof(tsid); 3377 3378 if (!ndb_cursor_start(cur, &k, &v)) 3379 continue; 3380 3381 // for each id in our ids filter, find in the db 3382 while (!query_is_full(results, limit)) { 3383 ptsid = (struct ndb_u64_ts *)k.mv_data; 3384 if (ptsid->u64 != kind) 3385 break; 3386 3387 // don't continue the scan if we're below `since` 3388 if (ptsid->timestamp < since) 3389 break; 3390 3391 note_id = *(uint64_t*)v.mv_data; 3392 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3393 goto next; 3394 3395 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_KINDS)) 3396 goto next; 3397 3398 ndb_query_result_init(&res, note, note_size, note_id); 3399 if (!push_query_result(results, &res)) 3400 break; 3401 3402 next: 3403 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3404 break; 3405 } 3406 } 3407 3408 mdb_cursor_close(cur); 3409 return 1; 3410 } 3411 3412 static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter) 3413 { 3414 struct ndb_filter_elements *ids, *kinds, *authors, *tags; 3415 3416 ids = ndb_filter_find_elements(filter, NDB_FILTER_IDS); 3417 kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS); 3418 authors = ndb_filter_find_elements(filter, NDB_FILTER_AUTHORS); 3419 tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS); 3420 3421 // this is rougly similar to the heuristic in strfry's dbscan 3422 if (ids) { 3423 return NDB_PLAN_IDS; 3424 } else if (kinds && authors && authors->count <= 10) { 3425 return NDB_PLAN_AUTHOR_KINDS; 3426 } else if (authors && authors->count <= 10) { 3427 return NDB_PLAN_AUTHORS; 3428 } else if (tags && tags->count <= 10) { 3429 return NDB_PLAN_TAGS; 3430 } else if (kinds) { 3431 return NDB_PLAN_KINDS; 3432 } 3433 3434 return NDB_PLAN_CREATED; 3435 } 3436 3437 static const char *ndb_query_plan_name(int plan_id) 3438 { 3439 switch (plan_id) { 3440 case NDB_PLAN_IDS: return "ids"; 3441 case NDB_PLAN_KINDS: return "kinds"; 3442 case NDB_PLAN_TAGS: return "tags"; 3443 case NDB_PLAN_CREATED: return "created"; 3444 case NDB_PLAN_AUTHORS: return "authors"; 3445 } 3446 3447 return "unknown"; 3448 } 3449 3450 static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter, 3451 struct ndb_query_result *res, int capacity, 3452 int *results_out) 3453 { 3454 struct ndb_query_results results; 3455 uint64_t limit, *pint; 3456 enum ndb_query_plan plan; 3457 limit = capacity; 3458 3459 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT))) 3460 limit = *pint; 3461 3462 limit = min(capacity, limit); 3463 make_cursor((unsigned char *)res, 3464 ((unsigned char *)res) + limit * sizeof(*res), 3465 &results.cur); 3466 3467 plan = ndb_filter_plan(filter); 3468 ndb_debug("using query plan '%s'\n", ndb_query_plan_name(plan)); 3469 switch (plan) { 3470 // We have a list of ids, just open a cursor and jump to each once 3471 case NDB_PLAN_IDS: 3472 if (!ndb_query_plan_execute_ids(txn, filter, &results, limit)) 3473 return 0; 3474 break; 3475 3476 // We have just kinds, just scan the kind index 3477 case NDB_PLAN_KINDS: 3478 if (!ndb_query_plan_execute_kinds(txn, filter, &results, limit)) 3479 return 0; 3480 break; 3481 3482 case NDB_PLAN_TAGS: 3483 if (!ndb_query_plan_execute_tags(txn, filter, &results, limit)) 3484 return 0; 3485 break; 3486 case NDB_PLAN_CREATED: 3487 if (!ndb_query_plan_execute_created_at(txn, filter, &results, limit)) 3488 return 0; 3489 break; 3490 case NDB_PLAN_AUTHORS: 3491 if (!ndb_query_plan_execute_authors(txn, filter, &results, limit)) 3492 return 0; 3493 break; 3494 case NDB_PLAN_AUTHOR_KINDS: 3495 /* TODO: author kinds 3496 if (!ndb_query_plan_execute_author_kinds(txn, filter, &results, limit)) 3497 return 0; 3498 */ 3499 if (!ndb_query_plan_execute_authors(txn, filter, &results, limit)) 3500 return 0; 3501 break; 3502 } 3503 3504 *results_out = cursor_count(&results.cur, sizeof(*res)); 3505 return 1; 3506 } 3507 3508 int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters, 3509 struct ndb_query_result *results, int result_capacity, int *count) 3510 { 3511 int i, out; 3512 struct ndb_query_result *p = results; 3513 3514 out = 0; 3515 *count = 0; 3516 3517 for (i = 0; i < num_filters; i++) { 3518 if (!ndb_query_filter(txn, &filters[i], p, 3519 result_capacity, &out)) { 3520 return 0; 3521 } 3522 3523 *count += out; 3524 p += out; 3525 result_capacity -= out; 3526 if (result_capacity <= 0) 3527 break; 3528 } 3529 3530 // sort results 3531 qsort(results, *count, sizeof(*results), compare_query_results); 3532 return 1; 3533 } 3534 3535 static int ndb_write_note_tag_index(struct ndb_txn *txn, struct ndb_note *note, 3536 uint64_t note_key) 3537 { 3538 unsigned char key_buffer[255]; 3539 struct ndb_iterator iter; 3540 struct ndb_str tkey, tval; 3541 char tchar; 3542 int len, rc; 3543 MDB_val key, val; 3544 MDB_dbi tags_db; 3545 3546 tags_db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; 3547 3548 ndb_tags_iterate_start(note, &iter); 3549 3550 while (ndb_tags_iterate_next(&iter)) { 3551 if (iter.tag->count < 2) 3552 continue; 3553 3554 tkey = ndb_tag_str(note, iter.tag, 0); 3555 3556 // we only write indices for 1-char tags. 3557 tchar = tkey.str[0]; 3558 if (tchar == 0 || tkey.str[1] != 0) 3559 continue; 3560 3561 tval = ndb_tag_str(note, iter.tag, 1); 3562 len = ndb_str_len(&tval); 3563 3564 if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), 3565 tchar, tval.id, (unsigned char)len, 3566 ndb_note_created_at(note)))) { 3567 // this will fail when we try to encode a key that is 3568 // too big 3569 continue; 3570 } 3571 3572 //ndb_debug("writing tag '%c':'data:%d' to index\n", tchar, len); 3573 3574 key.mv_data = key_buffer; 3575 key.mv_size = len; 3576 3577 val.mv_data = ¬e_key; 3578 val.mv_size = sizeof(note_key); 3579 3580 if ((rc = mdb_put(txn->mdb_txn, tags_db, &key, &val, 0))) { 3581 ndb_debug("write note tag index to db failed: %s\n", 3582 mdb_strerror(rc)); 3583 return 0; 3584 } 3585 } 3586 3587 return 1; 3588 } 3589 3590 static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note, 3591 uint64_t note_key) 3592 { 3593 struct ndb_u64_ts tsid; 3594 int rc; 3595 MDB_val key, val; 3596 MDB_dbi kind_db; 3597 3598 ndb_u64_ts_init(&tsid, note->kind, note->created_at); 3599 3600 key.mv_data = &tsid; 3601 key.mv_size = sizeof(tsid); 3602 val.mv_data = ¬e_key; 3603 val.mv_size = sizeof(note_key); 3604 3605 kind_db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 3606 3607 if ((rc = mdb_put(txn->mdb_txn, kind_db, &key, &val, 0))) { 3608 ndb_debug("write note kind index to db failed: %s\n", 3609 mdb_strerror(rc)); 3610 return 0; 3611 } 3612 3613 return 1; 3614 } 3615 3616 static int ndb_write_word_to_index(struct ndb_txn *txn, const char *word, 3617 int word_len, int word_index, 3618 uint64_t timestamp, uint64_t note_id) 3619 { 3620 // cap to some reasonable key size 3621 unsigned char buffer[1024]; 3622 int keysize, rc; 3623 MDB_val k, v; 3624 MDB_dbi text_db; 3625 3626 // build our compressed text index key 3627 if (!ndb_make_text_search_key(buffer, sizeof(buffer), word_index, 3628 word_len, word, timestamp, note_id, 3629 &keysize)) { 3630 // probably too big 3631 3632 return 0; 3633 } 3634 3635 k.mv_data = buffer; 3636 k.mv_size = keysize; 3637 3638 v.mv_data = NULL; 3639 v.mv_size = 0; 3640 3641 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 3642 3643 if ((rc = mdb_put(txn->mdb_txn, text_db, &k, &v, 0))) { 3644 ndb_debug("write note text index to db failed: %s\n", 3645 mdb_strerror(rc)); 3646 return 0; 3647 } 3648 3649 return 1; 3650 } 3651 3652 3653 3654 // break a string into individual words for querying or for building the 3655 // fulltext search index. This is callback based so we don't need to 3656 // build up an intermediate structure 3657 static int ndb_parse_words(struct cursor *cur, void *ctx, ndb_word_parser_fn fn) 3658 { 3659 int word_len, words; 3660 const char *word; 3661 3662 words = 0; 3663 3664 while (cur->p < cur->end) { 3665 consume_whitespace_or_punctuation(cur); 3666 if (cur->p >= cur->end) 3667 break; 3668 word = (const char *)cur->p; 3669 3670 if (!consume_until_boundary(cur)) 3671 break; 3672 3673 // start of word or end 3674 word_len = cur->p - (unsigned char *)word; 3675 if (word_len == 0 && cur->p >= cur->end) 3676 break; 3677 3678 if (word_len == 0) { 3679 if (!cursor_skip(cur, 1)) 3680 break; 3681 continue; 3682 } 3683 3684 //ndb_debug("writing word index '%.*s'\n", word_len, word); 3685 3686 if (!fn(ctx, word, word_len, words)) 3687 continue; 3688 3689 words++; 3690 } 3691 3692 return 1; 3693 } 3694 3695 struct ndb_word_writer_ctx 3696 { 3697 struct ndb_txn *txn; 3698 struct ndb_note *note; 3699 uint64_t note_id; 3700 }; 3701 3702 static int ndb_fulltext_word_writer(void *ctx, 3703 const char *word, int word_len, int words) 3704 { 3705 struct ndb_word_writer_ctx *wctx = ctx; 3706 3707 if (!ndb_write_word_to_index(wctx->txn, word, word_len, words, 3708 wctx->note->created_at, wctx->note_id)) { 3709 // too big to write this one, just skip it 3710 ndb_debug("failed to write word '%.*s' to index\n", word_len, word); 3711 3712 return 0; 3713 } 3714 3715 //fprintf(stderr, "wrote '%.*s' to note text index\n", word_len, word); 3716 return 1; 3717 } 3718 3719 static int ndb_write_note_fulltext_index(struct ndb_txn *txn, 3720 struct ndb_note *note, 3721 uint64_t note_id) 3722 { 3723 struct cursor cur; 3724 unsigned char *content; 3725 struct ndb_str str; 3726 struct ndb_word_writer_ctx ctx; 3727 3728 str = ndb_note_str(note, ¬e->content); 3729 // I don't think this should happen? 3730 if (unlikely(str.flag == NDB_PACKED_ID)) 3731 return 0; 3732 3733 content = (unsigned char *)str.str; 3734 3735 make_cursor(content, content + note->content_length, &cur); 3736 3737 ctx.txn = txn; 3738 ctx.note = note; 3739 ctx.note_id = note_id; 3740 3741 ndb_parse_words(&cur, &ctx, ndb_fulltext_word_writer); 3742 3743 return 1; 3744 } 3745 3746 static int ndb_parse_search_words(void *ctx, const char *word_str, int word_len, int word_index) 3747 { 3748 (void)word_index; 3749 struct ndb_search_words *words = ctx; 3750 struct ndb_word *word; 3751 3752 if (words->num_words + 1 > MAX_TEXT_SEARCH_WORDS) 3753 return 0; 3754 3755 word = &words->words[words->num_words++]; 3756 word->word = word_str; 3757 word->word_len = word_len; 3758 3759 return 1; 3760 } 3761 3762 static void ndb_search_words_init(struct ndb_search_words *words) 3763 { 3764 words->num_words = 0; 3765 } 3766 3767 static int prefix_count(const char *str1, int len1, const char *str2, int len2) { 3768 int i, count = 0; 3769 int min_len = len1 < len2 ? len1 : len2; 3770 3771 for (i = 0; i < min_len; i++) { 3772 // case insensitive 3773 if (tolower(str1[i]) == tolower(str2[i])) 3774 count++; 3775 else 3776 break; 3777 } 3778 3779 return count; 3780 } 3781 3782 static int ndb_prefix_matches(struct ndb_text_search_result *result, 3783 struct ndb_word *search_word) 3784 { 3785 // Empty strings shouldn't happen but let's 3786 if (result->key.str_len < 2 || search_word->word_len < 2) 3787 return 0; 3788 3789 // make sure we at least have two matching prefix characters. exact 3790 // matches are nice but range searches allow us to match prefixes as 3791 // well. A double-char prefix is suffient, but maybe we could up this 3792 // in the future. 3793 // 3794 // TODO: How are we handling utf-8 prefix matches like 3795 // japanese? 3796 // 3797 if ( result->key.str[0] != tolower(search_word->word[0]) 3798 && result->key.str[1] != tolower(search_word->word[1]) 3799 ) 3800 return 0; 3801 3802 // count the number of prefix-matched characters. This will be used 3803 // for ranking search results 3804 result->prefix_chars = prefix_count(result->key.str, 3805 result->key.str_len, 3806 search_word->word, 3807 search_word->word_len); 3808 3809 if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5)) 3810 return 0; 3811 3812 return 1; 3813 } 3814 3815 // This is called when scanning the full text search index. Scanning stops 3816 // when we no longer have a prefix match for the word 3817 static int ndb_text_search_next_word(MDB_cursor *cursor, MDB_cursor_op op, 3818 MDB_val *k, struct ndb_word *search_word, 3819 struct ndb_text_search_result *last_result, 3820 struct ndb_text_search_result *result, 3821 MDB_cursor_op order_op) 3822 { 3823 struct cursor key_cursor; 3824 //struct ndb_text_search_key search_key; 3825 MDB_val v; 3826 int retries; 3827 retries = -1; 3828 3829 make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor); 3830 3831 // When op is MDB_SET_RANGE, this initializes the search. Position 3832 // the cursor at the next key greater than or equal to the specified 3833 // key. 3834 // 3835 // Subsequent searches should use MDB_NEXT 3836 if (mdb_cursor_get(cursor, k, &v, op)) { 3837 // we should only do this if we're going in reverse 3838 if (op == MDB_SET_RANGE && order_op == MDB_PREV) { 3839 // if set range worked and our key exists, it should be 3840 // the one right before this one 3841 if (mdb_cursor_get(cursor, k, &v, MDB_PREV)) 3842 return 0; 3843 } else { 3844 return 0; 3845 } 3846 } 3847 3848 retry: 3849 retries++; 3850 /* 3851 printf("continuing from "); 3852 if (ndb_unpack_text_search_key(k->mv_data, k->mv_size, &search_key)) { 3853 ndb_print_text_search_key(&search_key); 3854 } else { printf("??"); } 3855 printf("\n"); 3856 */ 3857 3858 make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor); 3859 3860 if (unlikely(!ndb_unpack_text_search_key_noteid(&key_cursor, &result->key.note_id))) { 3861 fprintf(stderr, "UNUSUAL: failed to unpack text search key note_id\n"); 3862 return 0; 3863 } 3864 3865 if (last_result) { 3866 if (last_result->key.note_id != result->key.note_id) 3867 return 0; 3868 } 3869 3870 // On success, this could still be not related at all. 3871 // It could just be adjacent to the word. Let's check 3872 // if we have a matching prefix at least. 3873 3874 // Before we unpack the entire key, let's quickly 3875 // unpack just the string to check the prefix. We don't 3876 // need to unpack the entire key if the prefix doesn't 3877 // match 3878 if (!ndb_unpack_text_search_key_string(&key_cursor, 3879 &result->key.str, 3880 &result->key.str_len)) { 3881 // this should never happen 3882 fprintf(stderr, "UNUSUAL: failed to unpack text search key string\n"); 3883 return 0; 3884 } 3885 3886 if (!ndb_prefix_matches(result, search_word)) { 3887 /* 3888 printf("result prefix '%.*s' didn't match search word '%.*s'\n", 3889 result->key.str_len, result->key.str, 3890 search_word->word_len, search_word->word); 3891 */ 3892 // we should only do this if we're going in reverse 3893 if (retries == 0 && op == MDB_SET_RANGE && order_op == MDB_PREV) { 3894 // if set range worked and our key exists, it should be 3895 // the one right before this one 3896 mdb_cursor_get(cursor, k, &v, MDB_PREV); 3897 goto retry; 3898 } else { 3899 return 0; 3900 } 3901 } 3902 3903 // Unpack the remaining text search key, we will need this information 3904 // when building up our search results. 3905 if (!ndb_unpack_remaining_text_search_key(&key_cursor, &result->key)) { 3906 // This should never happen 3907 fprintf(stderr, "UNUSUAL: failed to unpack text search key\n"); 3908 return 0; 3909 } 3910 3911 /* 3912 if (last_result) { 3913 if (result->key.word_index < last_result->key.word_index) { 3914 fprintf(stderr, "skipping '%.*s' because it is before last result '%.*s'\n", 3915 result->key.str_len, result->key.str, 3916 last_result->key.str_len, last_result->key.str); 3917 return 0; 3918 } 3919 } 3920 */ 3921 3922 return 1; 3923 } 3924 3925 static void ndb_text_search_results_init( 3926 struct ndb_text_search_results *results) { 3927 results->num_results = 0; 3928 } 3929 3930 void ndb_default_text_search_config(struct ndb_text_search_config *cfg) 3931 { 3932 cfg->order = NDB_ORDER_DESCENDING; 3933 cfg->limit = MAX_TEXT_SEARCH_RESULTS; 3934 } 3935 3936 void ndb_text_search_config_set_order(struct ndb_text_search_config *cfg, 3937 enum ndb_search_order order) 3938 { 3939 cfg->order = order; 3940 } 3941 3942 void ndb_text_search_config_set_limit(struct ndb_text_search_config *cfg, int limit) 3943 { 3944 cfg->limit = limit; 3945 } 3946 3947 int ndb_text_search(struct ndb_txn *txn, const char *query, 3948 struct ndb_text_search_results *results, 3949 struct ndb_text_search_config *config) 3950 { 3951 unsigned char buffer[1024], *buf; 3952 unsigned char saved_buf[1024], *saved; 3953 struct ndb_text_search_result *result, *last_result; 3954 struct ndb_text_search_result candidate, last_candidate; 3955 struct ndb_search_words search_words; 3956 //struct ndb_text_search_key search_key; 3957 struct ndb_word *search_word; 3958 struct cursor cur; 3959 ndb_text_search_key_order_fn key_order_fn; 3960 MDB_dbi text_db; 3961 MDB_cursor *cursor; 3962 MDB_val k, v; 3963 int i, j, keysize, saved_size, limit; 3964 MDB_cursor_op op, order_op; 3965 3966 saved = NULL; 3967 ndb_text_search_results_init(results); 3968 ndb_search_words_init(&search_words); 3969 3970 // search config 3971 limit = MAX_TEXT_SEARCH_RESULTS; 3972 order_op = MDB_PREV; 3973 key_order_fn = ndb_make_text_search_key_high; 3974 if (config) { 3975 if (config->order == NDB_ORDER_ASCENDING) { 3976 order_op = MDB_NEXT; 3977 key_order_fn = ndb_make_text_search_key_low; 3978 } 3979 limit = min(limit, config->limit); 3980 } 3981 // end search config 3982 3983 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 3984 make_cursor((unsigned char *)query, (unsigned char *)query + strlen(query), &cur); 3985 3986 ndb_parse_words(&cur, &search_words, ndb_parse_search_words); 3987 if (search_words.num_words == 0) 3988 return 0; 3989 3990 if ((i = mdb_cursor_open(txn->mdb_txn, text_db, &cursor))) { 3991 fprintf(stderr, "nd_text_search: mdb_cursor_open failed, error %d\n", i); 3992 return 0; 3993 } 3994 3995 // for each word, we recursively find all of the submatches 3996 while (results->num_results < limit) { 3997 last_result = NULL; 3998 result = &results->results[results->num_results]; 3999 4000 // if we have saved, then we continue from the last root search 4001 // sequence 4002 if (saved) { 4003 buf = saved_buf; 4004 saved = NULL; 4005 keysize = saved_size; 4006 4007 k.mv_data = buf; 4008 k.mv_size = saved_size; 4009 4010 // reposition the cursor so we can continue 4011 if (mdb_cursor_get(cursor, &k, &v, MDB_SET_RANGE)) 4012 break; 4013 4014 op = order_op; 4015 } else { 4016 // construct a packed fulltext search key using this 4017 // word this key doesn't contain any timestamp or index 4018 // info, so it should range match instead of exact 4019 // match 4020 if (!key_order_fn(buffer, sizeof(buffer), 4021 search_words.words[0].word_len, 4022 search_words.words[0].word, &keysize)) 4023 { 4024 // word is too big to fit in 1024-sized key 4025 continue; 4026 } 4027 4028 buf = buffer; 4029 op = MDB_SET_RANGE; 4030 } 4031 4032 for (j = 0; j < search_words.num_words; j++) { 4033 search_word = &search_words.words[j]; 4034 4035 // shouldn't happen but let's be defensive a bit 4036 if (search_word->word_len == 0) 4037 continue; 4038 4039 // if we already matched a note in this phrase, make 4040 // sure we're including the note id in the query 4041 if (last_result) { 4042 // we are narrowing down a search. 4043 // if we already have this note id, just continue 4044 for (i = 0; i < results->num_results; i++) { 4045 if (results->results[i].key.note_id == last_result->key.note_id) 4046 goto cont; 4047 } 4048 4049 if (!ndb_make_noted_text_search_key( 4050 buffer, sizeof(buffer), 4051 search_word->word_len, 4052 search_word->word, 4053 last_result->key.timestamp, 4054 last_result->key.note_id, 4055 &keysize)) 4056 { 4057 continue; 4058 } 4059 4060 buf = buffer; 4061 } 4062 4063 k.mv_data = buf; 4064 k.mv_size = keysize; 4065 4066 if (!ndb_text_search_next_word(cursor, op, &k, 4067 search_word, 4068 last_result, 4069 &candidate, 4070 order_op)) { 4071 break; 4072 } 4073 4074 *result = candidate; 4075 op = MDB_SET_RANGE; 4076 4077 // save the first key match, since we will continue from 4078 // this on the next root word result 4079 if (j == 0 && !saved) { 4080 memcpy(saved_buf, k.mv_data, k.mv_size); 4081 saved = saved_buf; 4082 saved_size = k.mv_size; 4083 } 4084 4085 last_candidate = *result; 4086 last_result = &last_candidate; 4087 } 4088 4089 cont: 4090 // we matched all of the queries! 4091 if (j == search_words.num_words) { 4092 results->num_results++; 4093 } else if (j == 0) { 4094 break; 4095 } 4096 4097 } 4098 4099 mdb_cursor_close(cursor); 4100 4101 return 1; 4102 } 4103 4104 static void ndb_write_blocks(struct ndb_txn *txn, uint64_t note_key, 4105 struct ndb_blocks *blocks) 4106 { 4107 int rc; 4108 MDB_val key, val; 4109 4110 // make sure we're not writing the owned flag to the db 4111 blocks->flags &= ~NDB_BLOCK_FLAG_OWNED; 4112 4113 key.mv_data = ¬e_key; 4114 key.mv_size = sizeof(note_key); 4115 val.mv_data = blocks; 4116 val.mv_size = ndb_blocks_total_size(blocks); 4117 assert((val.mv_size % 8) == 0); 4118 4119 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_BLOCKS], &key, &val, 0))) { 4120 ndb_debug("write version to note_blocks failed: %s\n", 4121 mdb_strerror(rc)); 4122 return; 4123 } 4124 } 4125 4126 static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note, 4127 uint64_t note_key, unsigned char *scratch, 4128 size_t scratch_size) 4129 { 4130 size_t content_len; 4131 const char *content; 4132 struct ndb_blocks *blocks; 4133 4134 content_len = ndb_note_content_length(note); 4135 content = ndb_note_content(note); 4136 4137 if (!ndb_parse_content(scratch, scratch_size, content, content_len, &blocks)) { 4138 //ndb_debug("failed to parse content '%.*s'\n", content_len, content); 4139 return 0; 4140 } 4141 4142 ndb_write_blocks(txn, note_key, blocks); 4143 return 1; 4144 } 4145 4146 static uint64_t ndb_write_note(struct ndb_txn *txn, 4147 struct ndb_writer_note *note, 4148 unsigned char *scratch, size_t scratch_size, 4149 uint32_t ndb_flags) 4150 { 4151 int rc; 4152 uint64_t note_key; 4153 MDB_dbi note_db; 4154 MDB_val key, val; 4155 4156 // let's quickly sanity check if we already have this note 4157 if (ndb_get_notekey_by_id(txn, note->note->id)) 4158 return 0; 4159 4160 // get dbs 4161 note_db = txn->lmdb->dbs[NDB_DB_NOTE]; 4162 4163 // get new key 4164 note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1; 4165 4166 // write note to event store 4167 key.mv_data = ¬e_key; 4168 key.mv_size = sizeof(note_key); 4169 val.mv_data = note->note; 4170 val.mv_size = note->note_len; 4171 4172 if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) { 4173 ndb_debug("write note to db failed: %s\n", mdb_strerror(rc)); 4174 return 0; 4175 } 4176 4177 ndb_write_note_id_index(txn, note->note, note_key); 4178 ndb_write_note_kind_index(txn, note->note, note_key); 4179 ndb_write_note_tag_index(txn, note->note, note_key); 4180 ndb_write_note_pubkey_index(txn, note->note, note_key); 4181 ndb_write_note_pubkey_kind_index(txn, note->note, note_key); 4182 4183 // only parse content and do fulltext index on text and longform notes 4184 if (note->note->kind == 1 || note->note->kind == 30023) { 4185 if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_FULLTEXT)) { 4186 if (!ndb_write_note_fulltext_index(txn, note->note, note_key)) 4187 return 0; 4188 } 4189 4190 // write note blocks 4191 if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_NOTE_BLOCKS)) { 4192 ndb_write_new_blocks(txn, note->note, note_key, scratch, scratch_size); 4193 } 4194 } 4195 4196 if (note->note->kind == 7 && !ndb_flag_set(ndb_flags, NDB_FLAG_NO_STATS)) { 4197 ndb_write_reaction_stats(txn, note->note); 4198 } 4199 4200 return note_key; 4201 } 4202 4203 static void ndb_monitor_lock(struct ndb_monitor *mon) { 4204 pthread_mutex_lock(&mon->mutex); 4205 } 4206 4207 static void ndb_monitor_unlock(struct ndb_monitor *mon) { 4208 pthread_mutex_unlock(&mon->mutex); 4209 } 4210 4211 struct written_note { 4212 uint64_t note_id; 4213 struct ndb_writer_note *note; 4214 }; 4215 4216 // When the data has been committed to the database, take all of the written 4217 // notes, check them against subscriptions, and then write to the subscription 4218 // inbox for all matching notes 4219 static void ndb_notify_subscriptions(struct ndb_monitor *monitor, 4220 struct written_note *wrote, int num_notes) 4221 { 4222 int i, k; 4223 int pushed; 4224 struct written_note *written; 4225 struct ndb_note *note; 4226 struct ndb_subscription *sub; 4227 4228 ndb_monitor_lock(monitor); 4229 4230 for (i = 0; i < monitor->num_subscriptions; i++) { 4231 sub = &monitor->subscriptions[i]; 4232 ndb_debug("checking subscription %d, %d notes\n", i, num_notes); 4233 4234 pushed = 0; 4235 for (k = 0; k < num_notes; k++) { 4236 written = &wrote[k]; 4237 note = written->note->note; 4238 4239 if (ndb_filter_group_matches(&sub->group, note)) { 4240 ndb_debug("pushing note\n"); 4241 4242 if (!prot_queue_push(&sub->inbox, &written->note_id)) { 4243 ndb_debug("couldn't push note to subscriber"); 4244 } else { 4245 pushed++; 4246 } 4247 } else { 4248 ndb_debug("not pushing note\n"); 4249 } 4250 } 4251 4252 // After pushing all of the matching notes, check to see if we 4253 // have a registered subscription callback. If so, we call it. 4254 // The callback needs to call ndb_poll_for_notes to pull data 4255 // that was just pushed to the queue in the for loop above. 4256 if (monitor->sub_cb != NULL && pushed > 0) { 4257 monitor->sub_cb(monitor->sub_cb_ctx, sub->subid); 4258 } 4259 } 4260 4261 ndb_monitor_unlock(monitor); 4262 } 4263 4264 uint64_t ndb_write_note_and_profile( 4265 struct ndb_txn *txn, 4266 struct ndb_writer_profile *profile, 4267 unsigned char *scratch, 4268 size_t scratch_size, 4269 uint32_t ndb_flags) 4270 { 4271 uint64_t note_nkey; 4272 4273 note_nkey = ndb_write_note(txn, &profile->note, scratch, scratch_size, ndb_flags); 4274 4275 if (profile->record.builder) { 4276 // only write if parsing didn't fail 4277 ndb_write_profile(txn, profile, note_nkey); 4278 } 4279 4280 return note_nkey; 4281 } 4282 4283 // only to be called from the writer thread 4284 static int ndb_write_version(struct ndb_txn *txn, uint64_t version) 4285 { 4286 int rc; 4287 MDB_val key, val; 4288 uint64_t version_key; 4289 4290 version_key = NDB_META_KEY_VERSION; 4291 4292 key.mv_data = &version_key; 4293 key.mv_size = sizeof(version_key); 4294 val.mv_data = &version; 4295 val.mv_size = sizeof(version); 4296 4297 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { 4298 ndb_debug("write version to ndb_meta failed: %s\n", 4299 mdb_strerror(rc)); 4300 return 0; 4301 } 4302 4303 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 4304 return 1; 4305 } 4306 4307 4308 static int ndb_run_migrations(struct ndb_txn *txn) 4309 { 4310 int64_t version, latest_version, i; 4311 4312 latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); 4313 4314 if ((version = ndb_db_version(txn)) == -1) { 4315 ndb_debug("run_migrations: no version found, assuming new db\n"); 4316 version = latest_version; 4317 4318 // no version found. fresh db? 4319 if (!ndb_write_version(txn, version)) { 4320 fprintf(stderr, "run_migrations: failed writing db version"); 4321 return 0; 4322 } 4323 4324 return 1; 4325 } else { 4326 ndb_debug("ndb: version %" PRIu64 " found\n", version); 4327 } 4328 4329 if (version < latest_version) 4330 fprintf(stderr, "nostrdb: migrating v%d -> v%d\n", 4331 (int)version, (int)latest_version); 4332 4333 for (i = version; i < latest_version; i++) { 4334 if (!MIGRATIONS[i].fn(txn)) { 4335 fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); 4336 return 0; 4337 } 4338 4339 if (!ndb_write_version(txn, i+1)) { 4340 fprintf(stderr, "run_migrations: failed writing db version"); 4341 return 0; 4342 } 4343 4344 version = i+1; 4345 } 4346 4347 return 1; 4348 } 4349 4350 4351 static void *ndb_writer_thread(void *data) 4352 { 4353 ndb_debug("started writer thread\n"); 4354 struct ndb_writer *writer = data; 4355 struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; 4356 struct written_note written_notes[THREAD_QUEUE_BATCH]; 4357 size_t scratch_size; 4358 int i, popped, done, needs_commit, num_notes; 4359 uint64_t note_nkey; 4360 struct ndb_txn txn; 4361 unsigned char *scratch; 4362 4363 // 8mb scratch buffer for parsing note content 4364 scratch_size = 8 * 1024 * 1024; 4365 scratch = malloc(scratch_size); 4366 MDB_txn *mdb_txn = NULL; 4367 ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn); 4368 4369 done = 0; 4370 while (!done) { 4371 txn.mdb_txn = NULL; 4372 num_notes = 0; 4373 ndb_debug("writer waiting for items\n"); 4374 popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); 4375 ndb_debug("writer popped %d items\n", popped); 4376 4377 needs_commit = 0; 4378 for (i = 0 ; i < popped; i++) { 4379 msg = &msgs[i]; 4380 switch (msg->type) { 4381 case NDB_WRITER_NOTE: needs_commit = 1; break; 4382 case NDB_WRITER_PROFILE: needs_commit = 1; break; 4383 case NDB_WRITER_DBMETA: needs_commit = 1; break; 4384 case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break; 4385 case NDB_WRITER_BLOCKS: needs_commit = 1; break; 4386 case NDB_WRITER_MIGRATE: needs_commit = 1; break; 4387 case NDB_WRITER_QUIT: break; 4388 } 4389 } 4390 4391 if (needs_commit && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn)) 4392 { 4393 fprintf(stderr, "writer thread txn_begin failed"); 4394 // should definitely not happen unless DB is full 4395 // or something ? 4396 continue; 4397 } 4398 4399 for (i = 0; i < popped; i++) { 4400 msg = &msgs[i]; 4401 4402 switch (msg->type) { 4403 case NDB_WRITER_QUIT: 4404 // quits are handled before this 4405 ndb_debug("writer thread got quit message\n"); 4406 done = 1; 4407 continue; 4408 case NDB_WRITER_PROFILE: 4409 note_nkey = 4410 ndb_write_note_and_profile( 4411 &txn, 4412 &msg->profile, 4413 scratch, 4414 scratch_size, 4415 writer->ndb_flags); 4416 4417 if (note_nkey > 0) { 4418 written_notes[num_notes++] = 4419 (struct written_note){ 4420 .note_id = note_nkey, 4421 .note = &msg->profile.note, 4422 }; 4423 } else { 4424 ndb_debug("failed to write note\n"); 4425 } 4426 break; 4427 case NDB_WRITER_NOTE: 4428 note_nkey = ndb_write_note(&txn, &msg->note, 4429 scratch, 4430 scratch_size, 4431 writer->ndb_flags); 4432 4433 if (note_nkey > 0) { 4434 written_notes[num_notes++] = (struct written_note){ 4435 .note_id = note_nkey, 4436 .note = &msg->note, 4437 }; 4438 } 4439 break; 4440 case NDB_WRITER_DBMETA: 4441 ndb_write_version(&txn, msg->ndb_meta.version); 4442 break; 4443 case NDB_WRITER_BLOCKS: 4444 ndb_write_blocks(&txn, msg->blocks.note_key, 4445 msg->blocks.blocks); 4446 break; 4447 case NDB_WRITER_MIGRATE: 4448 if (!ndb_run_migrations(&txn)) { 4449 mdb_txn_abort(txn.mdb_txn); 4450 goto bail; 4451 } 4452 break; 4453 case NDB_WRITER_PROFILE_LAST_FETCH: 4454 ndb_writer_last_profile_fetch(&txn, 4455 msg->last_fetch.pubkey, 4456 msg->last_fetch.fetched_at 4457 ); 4458 break; 4459 } 4460 } 4461 4462 // commit writes 4463 if (needs_commit) { 4464 if (!ndb_end_query(&txn)) { 4465 ndb_debug("writer thread txn commit failed\n"); 4466 } else { 4467 ndb_debug("notifying subscriptions, %d notes\n", num_notes); 4468 ndb_notify_subscriptions(writer->monitor, 4469 written_notes, 4470 num_notes); 4471 // update subscriptions 4472 } 4473 } 4474 4475 // free notes 4476 for (i = 0; i < popped; i++) { 4477 msg = &msgs[i]; 4478 if (msg->type == NDB_WRITER_NOTE) { 4479 free(msg->note.note); 4480 } else if (msg->type == NDB_WRITER_PROFILE) { 4481 free(msg->profile.note.note); 4482 //ndb_profile_record_builder_free(&msg->profile.record); 4483 } else if (msg->type == NDB_WRITER_BLOCKS) { 4484 ndb_blocks_free(msg->blocks.blocks); 4485 } 4486 } 4487 } 4488 4489 bail: 4490 free(scratch); 4491 ndb_debug("quitting writer thread\n"); 4492 return NULL; 4493 } 4494 4495 static void *ndb_ingester_thread(void *data) 4496 { 4497 secp256k1_context *ctx; 4498 struct thread *thread = data; 4499 struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx; 4500 struct ndb_lmdb *lmdb = ingester->lmdb; 4501 struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg; 4502 struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out; 4503 int i, to_write, popped, done, any_event; 4504 MDB_txn *read_txn = NULL; 4505 int rc; 4506 4507 ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); 4508 ndb_debug("started ingester thread\n"); 4509 4510 done = 0; 4511 while (!done) { 4512 to_write = 0; 4513 any_event = 0; 4514 4515 popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH); 4516 ndb_debug("ingester popped %d items\n", popped); 4517 4518 for (i = 0; i < popped; i++) { 4519 msg = &msgs[i]; 4520 if (msg->type == NDB_INGEST_EVENT) { 4521 any_event = 1; 4522 break; 4523 } 4524 } 4525 4526 if (any_event && (rc = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn))) { 4527 // this is bad 4528 fprintf(stderr, "UNUSUAL ndb_ingester: mdb_txn_begin failed: '%s'\n", 4529 mdb_strerror(rc)); 4530 continue; 4531 } 4532 4533 for (i = 0; i < popped; i++) { 4534 msg = &msgs[i]; 4535 switch (msg->type) { 4536 case NDB_INGEST_QUIT: 4537 done = 1; 4538 break; 4539 4540 case NDB_INGEST_EVENT: 4541 out = &outs[to_write]; 4542 if (ndb_ingester_process_event(ctx, ingester, 4543 &msg->event, out, 4544 read_txn)) { 4545 to_write++; 4546 } 4547 } 4548 } 4549 4550 if (any_event) 4551 mdb_txn_abort(read_txn); 4552 4553 if (to_write > 0) { 4554 ndb_debug("pushing %d events to write queue\n", to_write); 4555 if (!prot_queue_push_all(ingester->writer_inbox, outs, to_write)) { 4556 ndb_debug("failed pushing %d events to write queue\n", to_write); 4557 } 4558 } 4559 } 4560 4561 ndb_debug("quitting ingester thread\n"); 4562 secp256k1_context_destroy(ctx); 4563 return NULL; 4564 } 4565 4566 4567 static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb, 4568 struct ndb_monitor *monitor, uint32_t ndb_flags) 4569 { 4570 writer->lmdb = lmdb; 4571 writer->monitor = monitor; 4572 writer->ndb_flags = ndb_flags; 4573 writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE; 4574 writer->queue_buf = malloc(writer->queue_buflen); 4575 if (writer->queue_buf == NULL) { 4576 fprintf(stderr, "ndb: failed to allocate space for writer queue"); 4577 return 0; 4578 } 4579 4580 // init the writer queue. 4581 prot_queue_init(&writer->inbox, writer->queue_buf, 4582 writer->queue_buflen, sizeof(struct ndb_writer_msg)); 4583 4584 // spin up the writer thread 4585 if (THREAD_CREATE(writer->thread_id, ndb_writer_thread, writer)) 4586 { 4587 fprintf(stderr, "ndb writer thread failed to create\n"); 4588 return 0; 4589 } 4590 4591 return 1; 4592 } 4593 4594 // initialize the ingester queue and then spawn the thread 4595 static int ndb_ingester_init(struct ndb_ingester *ingester, 4596 struct ndb_lmdb *lmdb, 4597 struct prot_queue *writer_inbox, 4598 const struct ndb_config *config) 4599 { 4600 int elem_size, num_elems; 4601 static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT }; 4602 4603 // TODO: configurable queue sizes 4604 elem_size = sizeof(struct ndb_ingester_msg); 4605 num_elems = DEFAULT_QUEUE_SIZE; 4606 4607 ingester->writer_inbox = writer_inbox; 4608 ingester->lmdb = lmdb; 4609 ingester->flags = config->flags; 4610 ingester->filter = config->ingest_filter; 4611 ingester->filter_context = config->filter_context; 4612 4613 if (!threadpool_init(&ingester->tp, config->ingester_threads, 4614 elem_size, num_elems, &quit_msg, ingester, 4615 ndb_ingester_thread)) 4616 { 4617 fprintf(stderr, "ndb ingester threadpool failed to init\n"); 4618 return 0; 4619 } 4620 4621 return 1; 4622 } 4623 4624 static int ndb_writer_destroy(struct ndb_writer *writer) 4625 { 4626 struct ndb_writer_msg msg; 4627 4628 // kill thread 4629 msg.type = NDB_WRITER_QUIT; 4630 ndb_debug("writer: pushing quit message\n"); 4631 if (!prot_queue_push(&writer->inbox, &msg)) { 4632 // queue is too full to push quit message. just kill it. 4633 ndb_debug("writer: terminating thread\n"); 4634 THREAD_TERMINATE(writer->thread_id); 4635 } else { 4636 ndb_debug("writer: joining thread\n"); 4637 THREAD_FINISH(writer->thread_id); 4638 } 4639 4640 // cleanup 4641 ndb_debug("writer: cleaning up protected queue\n"); 4642 prot_queue_destroy(&writer->inbox); 4643 4644 free(writer->queue_buf); 4645 4646 return 1; 4647 } 4648 4649 static int ndb_ingester_destroy(struct ndb_ingester *ingester) 4650 { 4651 threadpool_destroy(&ingester->tp); 4652 return 1; 4653 } 4654 4655 static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize) 4656 { 4657 int rc; 4658 MDB_txn *txn; 4659 4660 if ((rc = mdb_env_create(&lmdb->env))) { 4661 fprintf(stderr, "mdb_env_create failed, error %d\n", rc); 4662 return 0; 4663 } 4664 4665 if ((rc = mdb_env_set_mapsize(lmdb->env, mapsize))) { 4666 fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc); 4667 return 0; 4668 } 4669 4670 if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) { 4671 fprintf(stderr, "mdb_env_set_maxdbs failed, error %d\n", rc); 4672 return 0; 4673 } 4674 4675 if ((rc = mdb_env_open(lmdb->env, filename, 0, 0664))) { 4676 fprintf(stderr, "mdb_env_open failed, error %d\n", rc); 4677 return 0; 4678 } 4679 4680 // Initialize DBs 4681 if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) { 4682 fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc); 4683 return 0; 4684 } 4685 4686 // note flatbuffer db 4687 if ((rc = mdb_dbi_open(txn, "note", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NOTE]))) { 4688 fprintf(stderr, "mdb_dbi_open event failed, error %d\n", rc); 4689 return 0; 4690 } 4691 4692 // note metadata db 4693 if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) { 4694 fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc); 4695 return 0; 4696 } 4697 4698 // profile flatbuffer db 4699 if ((rc = mdb_dbi_open(txn, "profile", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_PROFILE]))) { 4700 fprintf(stderr, "mdb_dbi_open profile failed, error %d\n", rc); 4701 return 0; 4702 } 4703 4704 // profile search db 4705 if ((rc = mdb_dbi_open(txn, "profile_search", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_SEARCH]))) { 4706 fprintf(stderr, "mdb_dbi_open profile_search failed, error %d\n", rc); 4707 return 0; 4708 } 4709 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], ndb_search_key_cmp); 4710 4711 // ndb metadata (db version, etc) 4712 if ((rc = mdb_dbi_open(txn, "ndb_meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NDB_META]))) { 4713 fprintf(stderr, "mdb_dbi_open ndb_meta failed, error %d\n", rc); 4714 return 0; 4715 } 4716 4717 // profile last fetches 4718 if ((rc = mdb_dbi_open(txn, "profile_last_fetch", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH]))) { 4719 fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); 4720 return 0; 4721 } 4722 4723 // id+ts index flags 4724 unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; 4725 4726 // index dbs 4727 if ((rc = mdb_dbi_open(txn, "note_id", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_ID]))) { 4728 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 4729 return 0; 4730 } 4731 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare); 4732 4733 if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) { 4734 fprintf(stderr, "mdb_dbi_open profile_pk failed: %s\n", mdb_strerror(rc)); 4735 return 0; 4736 } 4737 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare); 4738 4739 if ((rc = mdb_dbi_open(txn, "note_kind", 4740 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 4741 &lmdb->dbs[NDB_DB_NOTE_KIND]))) { 4742 fprintf(stderr, "mdb_dbi_open note_kind failed: %s\n", mdb_strerror(rc)); 4743 return 0; 4744 } 4745 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_ts_compare); 4746 4747 if ((rc = mdb_dbi_open(txn, "note_pubkey", 4748 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 4749 &lmdb->dbs[NDB_DB_NOTE_PUBKEY]))) { 4750 fprintf(stderr, "mdb_dbi_open note_pubkey failed: %s\n", mdb_strerror(rc)); 4751 return 0; 4752 } 4753 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_PUBKEY], ndb_tsid_compare); 4754 4755 if ((rc = mdb_dbi_open(txn, "note_pubkey_kind", 4756 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 4757 &lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND]))) { 4758 fprintf(stderr, "mdb_dbi_open note_pubkey_kind failed: %s\n", mdb_strerror(rc)); 4759 return 0; 4760 } 4761 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND], ndb_id_u64_ts_compare); 4762 4763 if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT, 4764 &lmdb->dbs[NDB_DB_NOTE_TEXT]))) { 4765 fprintf(stderr, "mdb_dbi_open note_text failed: %s\n", mdb_strerror(rc)); 4766 return 0; 4767 } 4768 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare); 4769 4770 if ((rc = mdb_dbi_open(txn, "note_blocks", MDB_CREATE | MDB_INTEGERKEY, 4771 &lmdb->dbs[NDB_DB_NOTE_BLOCKS]))) { 4772 fprintf(stderr, "mdb_dbi_open note_blocks failed: %s\n", mdb_strerror(rc)); 4773 return 0; 4774 } 4775 4776 if ((rc = mdb_dbi_open(txn, "note_tags", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, 4777 &lmdb->dbs[NDB_DB_NOTE_TAGS]))) { 4778 fprintf(stderr, "mdb_dbi_open note_tags failed: %s\n", mdb_strerror(rc)); 4779 return 0; 4780 } 4781 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TAGS], ndb_tag_key_compare); 4782 4783 // Commit the transaction 4784 if ((rc = mdb_txn_commit(txn))) { 4785 fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc); 4786 return 0; 4787 } 4788 4789 return 1; 4790 } 4791 4792 static int ndb_queue_write_version(struct ndb *ndb, uint64_t version) 4793 { 4794 struct ndb_writer_msg msg; 4795 msg.type = NDB_WRITER_DBMETA; 4796 msg.ndb_meta.version = version; 4797 return ndb_writer_queue_msg(&ndb->writer, &msg); 4798 } 4799 4800 static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb, 4801 void *sub_cb_ctx) 4802 { 4803 monitor->num_subscriptions = 0; 4804 monitor->sub_cb = cb; 4805 monitor->sub_cb_ctx = sub_cb_ctx; 4806 pthread_mutex_init(&monitor->mutex, NULL); 4807 } 4808 4809 void ndb_filter_group_destroy(struct ndb_filter_group *group) 4810 { 4811 struct ndb_filter *filter; 4812 int i; 4813 for (i = 0; i < group->num_filters; i++) { 4814 filter = &group->filters[i]; 4815 ndb_filter_destroy(filter); 4816 } 4817 } 4818 4819 static void ndb_subscription_destroy(struct ndb_subscription *sub) 4820 { 4821 ndb_filter_group_destroy(&sub->group); 4822 // this was malloc'd inside ndb_subscribe 4823 free(sub->inbox.buf); 4824 prot_queue_destroy(&sub->inbox); 4825 sub->subid = 0; 4826 } 4827 4828 static void ndb_monitor_destroy(struct ndb_monitor *monitor) 4829 { 4830 int i; 4831 4832 ndb_monitor_lock(monitor); 4833 4834 for (i = 0; i < monitor->num_subscriptions; i++) { 4835 ndb_subscription_destroy(&monitor->subscriptions[i]); 4836 } 4837 4838 monitor->num_subscriptions = 0; 4839 4840 ndb_monitor_unlock(monitor); 4841 4842 pthread_mutex_destroy(&monitor->mutex); 4843 } 4844 4845 int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config) 4846 { 4847 struct ndb *ndb; 4848 //MDB_dbi ind_id; // TODO: ind_pk, etc 4849 4850 ndb = *pndb = calloc(1, sizeof(struct ndb)); 4851 ndb->flags = config->flags; 4852 4853 if (ndb == NULL) { 4854 fprintf(stderr, "ndb_init: malloc failed\n"); 4855 return 0; 4856 } 4857 4858 if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize)) 4859 return 0; 4860 4861 ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx); 4862 4863 if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor, ndb->flags)) { 4864 fprintf(stderr, "ndb_writer_init failed\n"); 4865 return 0; 4866 } 4867 4868 if (!ndb_ingester_init(&ndb->ingester, &ndb->lmdb, &ndb->writer.inbox, config)) { 4869 fprintf(stderr, "failed to initialize %d ingester thread(s)\n", 4870 config->ingester_threads); 4871 return 0; 4872 } 4873 4874 if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE)) { 4875 struct ndb_writer_msg msg = { .type = NDB_WRITER_MIGRATE }; 4876 ndb_writer_queue_msg(&ndb->writer, &msg); 4877 } 4878 4879 // Initialize LMDB environment and spin up threads 4880 return 1; 4881 } 4882 4883 void ndb_destroy(struct ndb *ndb) 4884 { 4885 if (ndb == NULL) 4886 return; 4887 4888 // ingester depends on writer and must be destroyed first 4889 ndb_debug("destroying ingester\n"); 4890 ndb_ingester_destroy(&ndb->ingester); 4891 ndb_debug("destroying writer\n"); 4892 ndb_writer_destroy(&ndb->writer); 4893 ndb_debug("destroying monitor\n"); 4894 ndb_monitor_destroy(&ndb->monitor); 4895 4896 ndb_debug("closing env\n"); 4897 mdb_env_close(ndb->lmdb.env); 4898 4899 ndb_debug("ndb destroyed\n"); 4900 free(ndb); 4901 } 4902 4903 // Process a nostr event from a client 4904 // 4905 // ie: ["EVENT", {"content":"..."} ...] 4906 // 4907 // The client-sent variation of ndb_process_event 4908 int ndb_process_client_event(struct ndb *ndb, const char *json, int len) 4909 { 4910 return ndb_ingest_event(&ndb->ingester, json, len, 1); 4911 } 4912 4913 // Process anostr event from a relay, 4914 // 4915 // ie: ["EVENT", "subid", {"content":"..."}...] 4916 // 4917 // This function returns as soon as possible, first copying the passed 4918 // json and then queueing it up for processing. Worker threads then take 4919 // the json and process it. 4920 // 4921 // Processing: 4922 // 4923 // 1. The event is parsed into ndb_notes and the signature is validated 4924 // 2. A quick lookup is made on the database to see if we already have 4925 // the note id, if we do we don't need to waste time on json parsing 4926 // or note validation. 4927 // 3. Once validation is done we pass it to the writer queue for writing 4928 // to LMDB. 4929 // 4930 int ndb_process_event(struct ndb *ndb, const char *json, int json_len) 4931 { 4932 return ndb_ingest_event(&ndb->ingester, json, json_len, 0); 4933 } 4934 4935 4936 int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client) 4937 { 4938 const char *start, *end, *very_end; 4939 start = ldjson; 4940 end = start + json_len; 4941 very_end = ldjson + json_len; 4942 int (* process)(struct ndb *, const char *, int); 4943 #if DEBUG 4944 int processed = 0; 4945 #endif 4946 process = client ? ndb_process_client_event : ndb_process_event; 4947 4948 while ((end = fast_strchr(start, '\n', very_end - start))) { 4949 //printf("processing '%.*s'\n", (int)(end-start), start); 4950 if (!process(ndb, start, end - start)) { 4951 ndb_debug("ndb_process_client_event failed\n"); 4952 return 0; 4953 } 4954 start = end + 1; 4955 #if DEBUG 4956 processed++; 4957 #endif 4958 } 4959 4960 #if DEBUG 4961 ndb_debug("ndb_process_events: processed %d events\n", processed); 4962 #endif 4963 4964 return 1; 4965 } 4966 4967 #ifndef _WIN32 4968 // TODO: windows 4969 int ndb_process_events_stream(struct ndb *ndb, FILE* fp) 4970 { 4971 char *line = NULL; 4972 size_t len = 0; 4973 ssize_t nread; 4974 4975 while ((nread = getline(&line, &len, fp)) != -1) { 4976 if (line == NULL) 4977 break; 4978 ndb_process_event(ndb, line, len); 4979 } 4980 4981 if (line) 4982 free(line); 4983 4984 return 1; 4985 } 4986 #endif 4987 4988 int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len) 4989 { 4990 return _ndb_process_events(ndb, ldjson, json_len, 1); 4991 } 4992 4993 int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len) 4994 { 4995 return _ndb_process_events(ndb, ldjson, json_len, 0); 4996 } 4997 4998 static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag) 4999 { 5000 return cursor_push_u16(cur, tag->count); 5001 } 5002 5003 int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf, 5004 size_t bufsize) 5005 { 5006 struct ndb_note *note; 5007 int half, size, str_indices_size; 5008 5009 // come on bruh 5010 if (bufsize < sizeof(struct ndb_note) * 2) 5011 return 0; 5012 5013 str_indices_size = bufsize / 32; 5014 size = bufsize - str_indices_size; 5015 half = size / 2; 5016 5017 //debug("size %d half %d str_indices %d\n", size, half, str_indices_size); 5018 5019 // make a safe cursor of our available memory 5020 make_cursor(buf, buf + bufsize, &builder->mem); 5021 5022 note = builder->note = (struct ndb_note *)buf; 5023 5024 // take slices of the memory into subcursors 5025 if (!(cursor_slice(&builder->mem, &builder->note_cur, half) && 5026 cursor_slice(&builder->mem, &builder->strings, half) && 5027 cursor_slice(&builder->mem, &builder->str_indices, str_indices_size))) { 5028 return 0; 5029 } 5030 5031 memset(note, 0, sizeof(*note)); 5032 builder->note_cur.p += sizeof(*note); 5033 5034 note->strings = builder->strings.start - buf; 5035 note->version = 1; 5036 5037 return 1; 5038 } 5039 5040 5041 5042 static inline int ndb_json_parser_init(struct ndb_json_parser *p, 5043 const char *json, int json_len, 5044 unsigned char *buf, int bufsize) 5045 { 5046 int half = bufsize / 2; 5047 5048 unsigned char *tok_start = buf + half; 5049 unsigned char *tok_end = buf + bufsize; 5050 5051 p->toks = (jsmntok_t*)tok_start; 5052 p->toks_end = (jsmntok_t*)tok_end; 5053 p->num_tokens = 0; 5054 p->json = json; 5055 p->json_len = json_len; 5056 5057 // ndb_builder gets the first half of the buffer, and jsmn gets the 5058 // second half. I like this way of alloating memory (without actually 5059 // dynamically allocating memory). You get one big chunk upfront and 5060 // then submodules can recursively subdivide it. Maybe you could do 5061 // something even more clever like golden-ratio style subdivision where 5062 // the more important stuff gets a larger chunk and then it spirals 5063 // downward into smaller chunks. Thanks for coming to my TED talk. 5064 5065 if (!ndb_builder_init(&p->builder, buf, half)) 5066 return 0; 5067 5068 jsmn_init(&p->json_parser); 5069 5070 return 1; 5071 } 5072 5073 static inline int ndb_json_parser_parse(struct ndb_json_parser *p, 5074 struct ndb_id_cb *cb) 5075 { 5076 jsmntok_t *tok; 5077 int cap = ((unsigned char *)p->toks_end - (unsigned char*)p->toks)/sizeof(*p->toks); 5078 int res = 5079 jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap, cb != NULL); 5080 5081 // got an ID! 5082 if (res == -42) { 5083 tok = &p->toks[p->json_parser.toknext-1]; 5084 5085 switch (cb->fn(cb->data, p->json + tok->start)) { 5086 case NDB_IDRES_CONT: 5087 res = jsmn_parse(&p->json_parser, p->json, p->json_len, 5088 p->toks, cap, 0); 5089 break; 5090 case NDB_IDRES_STOP: 5091 return -42; 5092 } 5093 } else if (res == 0) { 5094 return 0; 5095 } 5096 5097 p->num_tokens = res; 5098 p->i = 0; 5099 5100 return 1; 5101 } 5102 5103 static inline int toksize(jsmntok_t *tok) 5104 { 5105 return tok->end - tok->start; 5106 } 5107 5108 5109 5110 static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2) 5111 { 5112 switch (c2) { 5113 case 't': return cursor_push_byte(cur, '\t'); 5114 case 'n': return cursor_push_byte(cur, '\n'); 5115 case 'r': return cursor_push_byte(cur, '\r'); 5116 case 'b': return cursor_push_byte(cur, '\b'); 5117 case 'f': return cursor_push_byte(cur, '\f'); 5118 case '\\': return cursor_push_byte(cur, '\\'); 5119 case '/': return cursor_push_byte(cur, '/'); 5120 case '"': return cursor_push_byte(cur, '"'); 5121 case 'u': 5122 // these aren't handled yet 5123 return 0; 5124 default: 5125 return cursor_push_byte(cur, c1) && cursor_push_byte(cur, c2); 5126 } 5127 } 5128 5129 static int cursor_push_escaped_char(struct cursor *cur, char c) 5130 { 5131 switch (c) { 5132 case '"': return cursor_push_str(cur, "\\\""); 5133 case '\\': return cursor_push_str(cur, "\\\\"); 5134 case '\b': return cursor_push_str(cur, "\\b"); 5135 case '\f': return cursor_push_str(cur, "\\f"); 5136 case '\n': return cursor_push_str(cur, "\\n"); 5137 case '\r': return cursor_push_str(cur, "\\r"); 5138 case '\t': return cursor_push_str(cur, "\\t"); 5139 // TODO: \u hex hex hex hex 5140 } 5141 return cursor_push_byte(cur, c); 5142 } 5143 5144 static int cursor_push_hex_str(struct cursor *cur, unsigned char *buf, int len) 5145 { 5146 int i; 5147 5148 if (len % 2 != 0) 5149 return 0; 5150 5151 if (!cursor_push_byte(cur, '"')) 5152 return 0; 5153 5154 for (i = 0; i < len; i++) { 5155 unsigned int c = ((const unsigned char *)buf)[i]; 5156 if (!cursor_push_byte(cur, hexchar(c >> 4))) 5157 return 0; 5158 if (!cursor_push_byte(cur, hexchar(c & 0xF))) 5159 return 0; 5160 } 5161 5162 if (!cursor_push_byte(cur, '"')) 5163 return 0; 5164 5165 return 1; 5166 } 5167 5168 static int cursor_push_jsonstr(struct cursor *cur, const char *str) 5169 { 5170 int i; 5171 int len; 5172 5173 len = strlen(str); 5174 5175 if (!cursor_push_byte(cur, '"')) 5176 return 0; 5177 5178 for (i = 0; i < len; i++) { 5179 if (!cursor_push_escaped_char(cur, str[i])) 5180 return 0; 5181 } 5182 5183 if (!cursor_push_byte(cur, '"')) 5184 return 0; 5185 5186 return 1; 5187 } 5188 5189 5190 static inline int cursor_push_json_tag_str(struct cursor *cur, struct ndb_str str) 5191 { 5192 if (str.flag == NDB_PACKED_ID) 5193 return cursor_push_hex_str(cur, str.id, 32); 5194 5195 return cursor_push_jsonstr(cur, str.str); 5196 } 5197 5198 static int cursor_push_json_tag(struct cursor *cur, struct ndb_note *note, 5199 struct ndb_tag *tag) 5200 { 5201 int i; 5202 5203 if (!cursor_push_byte(cur, '[')) 5204 return 0; 5205 5206 for (i = 0; i < tag->count; i++) { 5207 if (!cursor_push_json_tag_str(cur, ndb_tag_str(note, tag, i))) 5208 return 0; 5209 if (i != tag->count-1 && !cursor_push_byte(cur, ',')) 5210 return 0; 5211 } 5212 5213 return cursor_push_byte(cur, ']'); 5214 } 5215 5216 static int cursor_push_json_tags(struct cursor *cur, struct ndb_note *note) 5217 { 5218 int i; 5219 struct ndb_iterator iter, *it = &iter; 5220 ndb_tags_iterate_start(note, it); 5221 5222 if (!cursor_push_byte(cur, '[')) 5223 return 0; 5224 5225 i = 0; 5226 while (ndb_tags_iterate_next(it)) { 5227 if (!cursor_push_json_tag(cur, note, it->tag)) 5228 return 0; 5229 if (i != note->tags.count-1 && !cursor_push_str(cur, ",")) 5230 return 0; 5231 i++; 5232 } 5233 5234 if (!cursor_push_byte(cur, ']')) 5235 return 0; 5236 5237 return 1; 5238 } 5239 5240 static int ndb_event_commitment(struct ndb_note *ev, unsigned char *buf, int buflen) 5241 { 5242 char timebuf[16] = {0}; 5243 char kindbuf[16] = {0}; 5244 char pubkey[65]; 5245 struct cursor cur; 5246 int ok; 5247 5248 if (!hex_encode(ev->pubkey, sizeof(ev->pubkey), pubkey)) 5249 return 0; 5250 5251 make_cursor(buf, buf + buflen, &cur); 5252 5253 // TODO: update in 2106 ... 5254 snprintf(timebuf, sizeof(timebuf), "%d", (uint32_t)ev->created_at); 5255 snprintf(kindbuf, sizeof(kindbuf), "%d", ev->kind); 5256 5257 ok = 5258 cursor_push_str(&cur, "[0,\"") && 5259 cursor_push_str(&cur, pubkey) && 5260 cursor_push_str(&cur, "\",") && 5261 cursor_push_str(&cur, timebuf) && 5262 cursor_push_str(&cur, ",") && 5263 cursor_push_str(&cur, kindbuf) && 5264 cursor_push_str(&cur, ",") && 5265 cursor_push_json_tags(&cur, ev) && 5266 cursor_push_str(&cur, ",") && 5267 cursor_push_jsonstr(&cur, ndb_note_str(ev, &ev->content).str) && 5268 cursor_push_str(&cur, "]"); 5269 5270 if (!ok) 5271 return 0; 5272 5273 return cur.p - cur.start; 5274 } 5275 5276 static int cursor_push_hex(struct cursor *c, unsigned char *bytes, int len) 5277 { 5278 int i; 5279 unsigned char chr; 5280 if (c->p + (len * 2) >= c->end) 5281 return 0; 5282 5283 for (i = 0; i < len; i++) { 5284 chr = bytes[i]; 5285 5286 *(c->p++) = hexchar(chr >> 4); 5287 *(c->p++) = hexchar(chr & 0xF); 5288 } 5289 5290 return 1; 5291 } 5292 5293 static int cursor_push_int_str(struct cursor *c, uint64_t num) 5294 { 5295 char timebuf[16] = {0}; 5296 snprintf(timebuf, sizeof(timebuf), "%" PRIu64, num); 5297 return cursor_push_str(c, timebuf); 5298 } 5299 5300 int ndb_note_json(struct ndb_note *note, char *buf, int buflen) 5301 { 5302 struct cursor cur, *c = &cur; 5303 5304 make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, &cur); 5305 5306 int ok = cursor_push_str(c, "{\"id\":\"") && 5307 cursor_push_hex(c, ndb_note_id(note), 32) && 5308 cursor_push_str(c, "\",\"pubkey\":\"") && 5309 cursor_push_hex(c, ndb_note_pubkey(note), 32) && 5310 cursor_push_str(c, "\",\"created_at\":") && 5311 cursor_push_int_str(c, ndb_note_created_at(note)) && 5312 cursor_push_str(c, ",\"kind\":") && 5313 cursor_push_int_str(c, ndb_note_kind(note)) && 5314 cursor_push_str(c, ",\"tags\":") && 5315 cursor_push_json_tags(c, note) && 5316 cursor_push_str(c, ",\"content\":") && 5317 cursor_push_jsonstr(c, ndb_note_content(note)) && 5318 cursor_push_str(c, ",\"sig\":\"") && 5319 cursor_push_hex(c, ndb_note_sig(note), 64) && 5320 cursor_push_c_str(c, "\"}"); 5321 5322 if (!ok) { 5323 return 0; 5324 } 5325 5326 return cur.p - cur.start; 5327 } 5328 5329 static int cursor_push_json_elem_array(struct cursor *cur, 5330 const struct ndb_filter *filter, 5331 struct ndb_filter_elements *elems) 5332 { 5333 int i; 5334 unsigned char *id; 5335 const char *str; 5336 uint64_t val; 5337 5338 if (!cursor_push_byte(cur, '[')) 5339 return 0; 5340 5341 for (i = 0; i < elems->count; i++) { 5342 5343 switch (elems->field.elem_type) { 5344 case NDB_ELEMENT_STRING: 5345 str = ndb_filter_get_string_element(filter, elems, i); 5346 if (!cursor_push_jsonstr(cur, str)) 5347 return 0; 5348 break; 5349 case NDB_ELEMENT_ID: 5350 id = ndb_filter_get_id_element(filter, elems, i); 5351 if (!cursor_push_hex_str(cur, id, 32)) 5352 return 0; 5353 break; 5354 case NDB_ELEMENT_INT: 5355 val = ndb_filter_get_int_element(elems, i); 5356 if (!cursor_push_int_str(cur, val)) 5357 return 0; 5358 break; 5359 case NDB_ELEMENT_UNKNOWN: 5360 ndb_debug("unknown element in cursor_push_json_elem_array"); 5361 return 0; 5362 } 5363 5364 if (i != elems->count-1) { 5365 if (!cursor_push_byte(cur, ',')) 5366 return 0; 5367 } 5368 } 5369 5370 if (!cursor_push_str(cur, "]")) 5371 return 0; 5372 5373 return 1; 5374 } 5375 5376 int ndb_filter_json(const struct ndb_filter *filter, char *buf, int buflen) 5377 { 5378 struct cursor cur, *c = &cur; 5379 struct ndb_filter_elements *elems; 5380 int i; 5381 5382 if (!filter->finalized) { 5383 ndb_debug("filter not finalized in ndb_filter_json\n"); 5384 return 0; 5385 } 5386 5387 make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, c); 5388 5389 if (!cursor_push_str(c, "{")) 5390 return 0; 5391 5392 for (i = 0; i < filter->num_elements; i++) { 5393 elems = ndb_filter_get_elements(filter, i); 5394 switch (elems->field.type) { 5395 case NDB_FILTER_IDS: 5396 if (!cursor_push_str(c, "\"ids\":")) 5397 return 0; 5398 if (!cursor_push_json_elem_array(c, filter, elems)) 5399 return 0; 5400 break; 5401 case NDB_FILTER_AUTHORS: 5402 if (!cursor_push_str(c, "\"authors\":")) 5403 return 0; 5404 if (!cursor_push_json_elem_array(c, filter, elems)) 5405 return 0; 5406 break; 5407 case NDB_FILTER_KINDS: 5408 if (!cursor_push_str(c, "\"kinds\":")) 5409 return 0; 5410 if (!cursor_push_json_elem_array(c, filter, elems)) 5411 return 0; 5412 break; 5413 case NDB_FILTER_TAGS: 5414 if (!cursor_push_str(c, "\"#")) 5415 return 0; 5416 if (!cursor_push_byte(c, elems->field.tag)) 5417 return 0; 5418 if (!cursor_push_str(c, "\":")) 5419 return 0; 5420 if (!cursor_push_json_elem_array(c, filter, elems)) 5421 return 0; 5422 break; 5423 case NDB_FILTER_SINCE: 5424 if (!cursor_push_str(c, "\"since\":")) 5425 return 0; 5426 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 5427 return 0; 5428 break; 5429 case NDB_FILTER_UNTIL: 5430 if (!cursor_push_str(c, "\"until\":")) 5431 return 0; 5432 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 5433 return 0; 5434 break; 5435 case NDB_FILTER_LIMIT: 5436 if (!cursor_push_str(c, "\"limit\":")) 5437 return 0; 5438 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 5439 return 0; 5440 break; 5441 } 5442 5443 if (i != filter->num_elements-1) { 5444 if (!cursor_push_byte(c, ',')) { 5445 return 0; 5446 } 5447 } 5448 5449 } 5450 5451 if (!cursor_push_c_str(c, "}")) 5452 return 0; 5453 5454 return cur.p - cur.start; 5455 } 5456 5457 int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen) { 5458 int len; 5459 5460 if (!(len = ndb_event_commitment(note, buf, buflen))) 5461 return 0; 5462 5463 //fprintf(stderr, "%.*s\n", len, buf); 5464 5465 sha256((struct sha256*)note->id, buf, len); 5466 5467 return 1; 5468 } 5469 5470 int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32], 5471 unsigned char sig[64]) 5472 { 5473 unsigned char aux[32]; 5474 secp256k1_keypair *pair = (secp256k1_keypair*) keypair->pair; 5475 5476 if (!fill_random(aux, sizeof(aux))) 5477 return 0; 5478 5479 secp256k1_context *ctx = 5480 secp256k1_context_create(SECP256K1_CONTEXT_NONE); 5481 5482 return secp256k1_schnorrsig_sign32(ctx, sig, id, pair, aux); 5483 } 5484 5485 int ndb_create_keypair(struct ndb_keypair *kp) 5486 { 5487 secp256k1_keypair *keypair = (secp256k1_keypair*)kp->pair; 5488 secp256k1_xonly_pubkey pubkey; 5489 5490 secp256k1_context *ctx = 5491 secp256k1_context_create(SECP256K1_CONTEXT_NONE);; 5492 5493 /* Try to create a keypair with a valid context, it should only 5494 * fail if the secret key is zero or out of range. */ 5495 if (!secp256k1_keypair_create(ctx, keypair, kp->secret)) 5496 return 0; 5497 5498 if (!secp256k1_keypair_xonly_pub(ctx, &pubkey, NULL, keypair)) 5499 return 0; 5500 5501 /* Serialize the public key. Should always return 1 for a valid public key. */ 5502 return secp256k1_xonly_pubkey_serialize(ctx, kp->pubkey, &pubkey); 5503 } 5504 5505 int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair) 5506 { 5507 if (!hex_decode(secstr, strlen(secstr), keypair->secret, 32)) { 5508 fprintf(stderr, "could not hex decode secret key\n"); 5509 return 0; 5510 } 5511 5512 return ndb_create_keypair(keypair); 5513 } 5514 5515 int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note, 5516 struct ndb_keypair *keypair) 5517 { 5518 int strings_len = builder->strings.p - builder->strings.start; 5519 unsigned char *note_end = builder->note_cur.p + strings_len; 5520 int total_size = note_end - builder->note_cur.start; 5521 5522 // move the strings buffer next to the end of our ndb_note 5523 memmove(builder->note_cur.p, builder->strings.start, strings_len); 5524 5525 // set the strings location 5526 builder->note->strings = builder->note_cur.p - builder->note_cur.start; 5527 5528 // record the total size 5529 //builder->note->size = total_size; 5530 5531 *note = builder->note; 5532 5533 // generate id and sign if we're building this manually 5534 if (keypair) { 5535 // use the remaining memory for building our id buffer 5536 unsigned char *end = builder->mem.end; 5537 unsigned char *start = (unsigned char*)(*note) + total_size; 5538 5539 ndb_builder_set_pubkey(builder, keypair->pubkey); 5540 5541 if (!ndb_calculate_id(builder->note, start, end - start)) 5542 return 0; 5543 5544 if (!ndb_sign_id(keypair, (*note)->id, (*note)->sig)) 5545 return 0; 5546 } 5547 5548 // make sure we're aligned as a whole 5549 total_size = (total_size + 7) & ~7; 5550 assert((total_size % 8) == 0); 5551 return total_size; 5552 } 5553 5554 struct ndb_note * ndb_builder_note(struct ndb_builder *builder) 5555 { 5556 return builder->note; 5557 } 5558 5559 static union ndb_packed_str ndb_offset_str(uint32_t offset) 5560 { 5561 // ensure accidents like -1 don't corrupt our packed_str 5562 union ndb_packed_str str; 5563 // most significant byte is reserved for ndb_packtype 5564 str.offset = offset & 0xFFFFFF; 5565 return str; 5566 } 5567 5568 5569 /// find an existing string via str_indices. these indices only exist in the 5570 /// builder phase just for this purpose. 5571 static inline int ndb_builder_find_str(struct ndb_builder *builder, 5572 const char *str, int len, 5573 union ndb_packed_str *pstr) 5574 { 5575 // find existing matching string to avoid duplicate strings 5576 int indices = cursor_count(&builder->str_indices, sizeof(uint32_t)); 5577 for (int i = 0; i < indices; i++) { 5578 uint32_t index = ((uint32_t*)builder->str_indices.start)[i]; 5579 const char *some_str = (const char*)builder->strings.start + index; 5580 5581 if (!memcmp(some_str, str, len) && some_str[len] == '\0') { 5582 // found an existing matching str, use that index 5583 *pstr = ndb_offset_str(index); 5584 return 1; 5585 } 5586 } 5587 5588 return 0; 5589 } 5590 5591 static int ndb_builder_push_str(struct ndb_builder *builder, const char *str, 5592 int len, union ndb_packed_str *pstr) 5593 { 5594 uint32_t loc; 5595 5596 // no string found, push a new one 5597 loc = builder->strings.p - builder->strings.start; 5598 if (!(cursor_push(&builder->strings, (unsigned char*)str, len) && 5599 cursor_push_byte(&builder->strings, '\0'))) { 5600 return 0; 5601 } 5602 5603 *pstr = ndb_offset_str(loc); 5604 5605 // record in builder indices. ignore return value, if we can't cache it 5606 // then whatever 5607 cursor_push_u32(&builder->str_indices, loc); 5608 5609 return 1; 5610 } 5611 5612 static int ndb_builder_push_packed_id(struct ndb_builder *builder, 5613 unsigned char *id, 5614 union ndb_packed_str *pstr) 5615 { 5616 // Don't both find id duplicates. very rarely are they duplicated 5617 // and it slows things down quite a bit. If we really care about this 5618 // We can switch to a hash table. 5619 //if (ndb_builder_find_str(builder, (const char*)id, 32, pstr)) { 5620 // pstr->packed.flag = NDB_PACKED_ID; 5621 // return 1; 5622 //} 5623 5624 if (ndb_builder_push_str(builder, (const char*)id, 32, pstr)) { 5625 pstr->packed.flag = NDB_PACKED_ID; 5626 return 1; 5627 } 5628 5629 return 0; 5630 } 5631 5632 union ndb_packed_str ndb_chars_to_packed_str(char c1, char c2) 5633 { 5634 union ndb_packed_str str; 5635 str.packed.flag = NDB_PACKED_STR; 5636 str.packed.str[0] = c1; 5637 str.packed.str[1] = c2; 5638 str.packed.str[2] = '\0'; 5639 return str; 5640 } 5641 5642 static union ndb_packed_str ndb_char_to_packed_str(char c) 5643 { 5644 union ndb_packed_str str; 5645 str.packed.flag = NDB_PACKED_STR; 5646 str.packed.str[0] = c; 5647 str.packed.str[1] = '\0'; 5648 return str; 5649 } 5650 5651 5652 /// Check for small strings to pack 5653 static inline int ndb_builder_try_compact_str(struct ndb_builder *builder, 5654 const char *str, int len, 5655 union ndb_packed_str *pstr, 5656 int pack_ids) 5657 { 5658 unsigned char id_buf[32]; 5659 5660 if (len == 0) { 5661 *pstr = ndb_char_to_packed_str(0); 5662 return 1; 5663 } else if (len == 1) { 5664 *pstr = ndb_char_to_packed_str(str[0]); 5665 return 1; 5666 } else if (len == 2) { 5667 *pstr = ndb_chars_to_packed_str(str[0], str[1]); 5668 return 1; 5669 } else if (pack_ids && len == 64 && hex_decode(str, 64, id_buf, 32)) { 5670 return ndb_builder_push_packed_id(builder, id_buf, pstr); 5671 } 5672 5673 return 0; 5674 } 5675 5676 5677 static int ndb_builder_push_unpacked_str(struct ndb_builder *builder, 5678 const char *str, int len, 5679 union ndb_packed_str *pstr) 5680 { 5681 if (ndb_builder_find_str(builder, str, len, pstr)) 5682 return 1; 5683 5684 return ndb_builder_push_str(builder, str, len, pstr); 5685 } 5686 5687 int ndb_builder_make_str(struct ndb_builder *builder, const char *str, int len, 5688 union ndb_packed_str *pstr, int pack_ids) 5689 { 5690 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 5691 return 1; 5692 5693 return ndb_builder_push_unpacked_str(builder, str, len, pstr); 5694 } 5695 5696 int ndb_builder_set_content(struct ndb_builder *builder, const char *content, 5697 int len) 5698 { 5699 int pack_ids = 0; 5700 builder->note->content_length = len; 5701 return ndb_builder_make_str(builder, content, len, 5702 &builder->note->content, pack_ids); 5703 } 5704 5705 5706 static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len, 5707 const char *s) 5708 { 5709 if (tok->type == JSMN_STRING && (int)strlen(s) == tok_len && 5710 memcmp(json + tok->start, s, tok_len) == 0) { 5711 return 1; 5712 } 5713 return 0; 5714 } 5715 5716 static int ndb_builder_finalize_tag(struct ndb_builder *builder, 5717 union ndb_packed_str offset) 5718 { 5719 if (!cursor_push_u32(&builder->note_cur, offset.offset)) 5720 return 0; 5721 builder->current_tag->count++; 5722 return 1; 5723 } 5724 5725 /// Unescape and push json strings 5726 static int ndb_builder_make_json_str(struct ndb_builder *builder, 5727 const char *str, int len, 5728 union ndb_packed_str *pstr, 5729 int *written, int pack_ids) 5730 { 5731 // let's not care about de-duping these. we should just unescape 5732 // in-place directly into the strings table. 5733 if (written) 5734 *written = len; 5735 5736 const char *p, *end, *start; 5737 unsigned char *builder_start; 5738 5739 // always try compact strings first 5740 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 5741 return 1; 5742 5743 end = str + len; 5744 start = str; // Initialize start to the beginning of the string 5745 5746 *pstr = ndb_offset_str(builder->strings.p - builder->strings.start); 5747 builder_start = builder->strings.p; 5748 5749 for (p = str; p < end; p++) { 5750 if (*p == '\\' && p+1 < end) { 5751 // Push the chunk of unescaped characters before this escape sequence 5752 if (start < p && !cursor_push(&builder->strings, 5753 (unsigned char *)start, 5754 p - start)) { 5755 return 0; 5756 } 5757 5758 if (!cursor_push_unescaped_char(&builder->strings, *p, *(p+1))) 5759 return 0; 5760 5761 p++; // Skip the character following the backslash 5762 start = p + 1; // Update the start pointer to the next character 5763 } 5764 } 5765 5766 // Handle the last chunk after the last escape sequence (or if there are no escape sequences at all) 5767 if (start < p && !cursor_push(&builder->strings, (unsigned char *)start, 5768 p - start)) { 5769 return 0; 5770 } 5771 5772 if (written) 5773 *written = builder->strings.p - builder_start; 5774 5775 // TODO: dedupe these!? 5776 return cursor_push_byte(&builder->strings, '\0'); 5777 } 5778 5779 static int ndb_builder_push_json_tag(struct ndb_builder *builder, 5780 const char *str, int len) 5781 { 5782 union ndb_packed_str pstr; 5783 int pack_ids = 1; 5784 if (!ndb_builder_make_json_str(builder, str, len, &pstr, NULL, pack_ids)) 5785 return 0; 5786 return ndb_builder_finalize_tag(builder, pstr); 5787 } 5788 5789 // Push a json array into an ndb tag ["p", "abcd..."] -> struct ndb_tag 5790 static int ndb_builder_tag_from_json_array(struct ndb_json_parser *p, 5791 jsmntok_t *array) 5792 { 5793 jsmntok_t *str_tok; 5794 const char *str; 5795 5796 if (array->size == 0) 5797 return 0; 5798 5799 if (!ndb_builder_new_tag(&p->builder)) 5800 return 0; 5801 5802 for (int i = 0; i < array->size; i++) { 5803 str_tok = &array[i+1]; 5804 str = p->json + str_tok->start; 5805 5806 if (!ndb_builder_push_json_tag(&p->builder, str, 5807 toksize(str_tok))) { 5808 return 0; 5809 } 5810 } 5811 5812 return 1; 5813 } 5814 5815 // Push json tags into ndb data 5816 // [["t", "hashtag"], ["p", "abcde..."]] -> struct ndb_tags 5817 static inline int ndb_builder_process_json_tags(struct ndb_json_parser *p, 5818 jsmntok_t *array) 5819 { 5820 jsmntok_t *tag = array; 5821 5822 if (array->size == 0) 5823 return 1; 5824 5825 for (int i = 0; i < array->size; i++) { 5826 if (!ndb_builder_tag_from_json_array(p, &tag[i+1])) 5827 return 0; 5828 tag += tag[i+1].size; 5829 } 5830 5831 return 1; 5832 } 5833 5834 static int parse_unsigned_int(const char *start, int len, unsigned int *num) 5835 { 5836 unsigned int number = 0; 5837 const char *p = start, *end = start + len; 5838 int digits = 0; 5839 5840 while (p < end) { 5841 char c = *p; 5842 5843 if (c < '0' || c > '9') 5844 break; 5845 5846 // Check for overflow 5847 char digit = c - '0'; 5848 if (number > (UINT_MAX - digit) / 10) 5849 return 0; // Overflow detected 5850 5851 number = number * 10 + digit; 5852 5853 p++; 5854 digits++; 5855 } 5856 5857 if (digits == 0) 5858 return 0; 5859 5860 *num = number; 5861 return 1; 5862 } 5863 5864 int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce, 5865 unsigned char *buf, int bufsize, struct ndb_id_cb *cb) 5866 { 5867 jsmntok_t *tok = NULL; 5868 int tok_len, res; 5869 struct ndb_json_parser parser; 5870 struct ndb_event *ev = &fce->event; 5871 5872 ndb_json_parser_init(&parser, json, len, buf, bufsize); 5873 5874 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 5875 return res; 5876 5877 if (parser.toks[0].type == JSMN_OBJECT) { 5878 ndb_debug("got raw json in client_event_from_json\n"); 5879 fce->evtype = NDB_FCE_EVENT; 5880 return ndb_parse_json_note(&parser, &ev->note); 5881 } 5882 5883 if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY) 5884 return 0; 5885 5886 parser.i = 1; 5887 tok = &parser.toks[parser.i++]; 5888 tok_len = toksize(tok); 5889 if (tok->type != JSMN_STRING) 5890 return 0; 5891 5892 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 5893 fce->evtype = NDB_FCE_EVENT; 5894 return ndb_parse_json_note(&parser, &ev->note); 5895 } 5896 5897 return 0; 5898 } 5899 5900 5901 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, 5902 unsigned char *buf, int bufsize, 5903 struct ndb_id_cb *cb) 5904 { 5905 jsmntok_t *tok = NULL; 5906 int tok_len, res; 5907 struct ndb_json_parser parser; 5908 struct ndb_event *ev = &tce->event; 5909 5910 tce->subid_len = 0; 5911 tce->subid = ""; 5912 5913 ndb_json_parser_init(&parser, json, len, buf, bufsize); 5914 5915 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 5916 return res; 5917 5918 if (parser.toks[0].type == JSMN_OBJECT) { 5919 ndb_debug("got raw json in ws_event_from_json\n"); 5920 tce->evtype = NDB_TCE_EVENT; 5921 return ndb_parse_json_note(&parser, &ev->note); 5922 } 5923 5924 if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY) 5925 return 0; 5926 5927 parser.i = 1; 5928 tok = &parser.toks[parser.i++]; 5929 tok_len = toksize(tok); 5930 if (tok->type != JSMN_STRING) 5931 return 0; 5932 5933 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 5934 tce->evtype = NDB_TCE_EVENT; 5935 5936 tok = &parser.toks[parser.i++]; 5937 if (tok->type != JSMN_STRING) 5938 return 0; 5939 5940 tce->subid = json + tok->start; 5941 tce->subid_len = toksize(tok); 5942 5943 return ndb_parse_json_note(&parser, &ev->note); 5944 } else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) { 5945 tce->evtype = NDB_TCE_EOSE; 5946 5947 tok = &parser.toks[parser.i++]; 5948 if (tok->type != JSMN_STRING) 5949 return 0; 5950 5951 tce->subid = json + tok->start; 5952 tce->subid_len = toksize(tok); 5953 return 1; 5954 } else if (tok_len == 2 && !memcmp("OK", json + tok->start, 2)) { 5955 if (parser.num_tokens != 5) 5956 return 0; 5957 5958 struct ndb_command_result *cr = &tce->command_result; 5959 5960 tce->evtype = NDB_TCE_OK; 5961 5962 tok = &parser.toks[parser.i++]; 5963 if (tok->type != JSMN_STRING) 5964 return 0; 5965 5966 tce->subid = json + tok->start; 5967 tce->subid_len = toksize(tok); 5968 5969 tok = &parser.toks[parser.i++]; 5970 if (tok->type != JSMN_PRIMITIVE || toksize(tok) == 0) 5971 return 0; 5972 5973 cr->ok = (json + tok->start)[0] == 't'; 5974 5975 tok = &parser.toks[parser.i++]; 5976 if (tok->type != JSMN_STRING) 5977 return 0; 5978 5979 tce->command_result.msg = json + tok->start; 5980 tce->command_result.msglen = toksize(tok); 5981 5982 return 1; 5983 } else if (tok_len == 4 && !memcmp("AUTH", json + tok->start, 4)) { 5984 tce->evtype = NDB_TCE_AUTH; 5985 5986 tok = &parser.toks[parser.i++]; 5987 if (tok->type != JSMN_STRING) 5988 return 0; 5989 5990 tce->subid = json + tok->start; 5991 tce->subid_len = toksize(tok); 5992 5993 return 1; 5994 } 5995 5996 return 0; 5997 } 5998 5999 static enum ndb_filter_fieldtype 6000 ndb_filter_parse_field(const char *tok, int len, char *tagchar) 6001 { 6002 *tagchar = 0; 6003 6004 if (len == 0) 6005 return 0; 6006 6007 if (len == 7 && !strncmp(tok, "authors", 7)) { 6008 return NDB_FILTER_AUTHORS; 6009 } else if (len == 3 && !strncmp(tok, "ids", 3)) { 6010 return NDB_FILTER_IDS; 6011 } else if (len == 5 && !strncmp(tok, "kinds", 5)) { 6012 return NDB_FILTER_KINDS; 6013 } else if (len == 2 && tok[0] == '#') { 6014 *tagchar = tok[1]; 6015 return NDB_FILTER_TAGS; 6016 } else if (len == 5 && !strncmp(tok, "since", 5)) { 6017 return NDB_FILTER_SINCE; 6018 } else if (len == 5 && !strncmp(tok, "until", 5)) { 6019 return NDB_FILTER_UNTIL; 6020 } else if (len == 5 && !strncmp(tok, "limit", 5)) { 6021 return NDB_FILTER_LIMIT; 6022 } 6023 6024 return 0; 6025 } 6026 6027 static int ndb_filter_parse_json_ids(struct ndb_json_parser *parser, 6028 struct ndb_filter *filter) 6029 { 6030 jsmntok_t *tok; 6031 const char *start; 6032 unsigned char hexbuf[32]; 6033 int tok_len, i, size; 6034 6035 tok = &parser->toks[parser->i++]; 6036 6037 if (tok->type != JSMN_ARRAY) { 6038 ndb_debug("parse_json_ids: not an array\n"); 6039 return 0; 6040 } 6041 6042 size = tok->size; 6043 6044 for (i = 0; i < size; parser->i++, i++) { 6045 tok = &parser->toks[parser->i]; 6046 start = parser->json + tok->start; 6047 tok_len = toksize(tok); 6048 6049 if (tok->type != JSMN_STRING) { 6050 ndb_debug("parse_json_ids: not a string '%d'\n", tok->type); 6051 return 0; 6052 } 6053 6054 if (tok_len != 64) { 6055 ndb_debug("parse_json_ids: not len 64: '%.*s'\n", tok_len, start); 6056 return 0; 6057 } 6058 6059 // id 6060 if (!hex_decode(start, tok_len, hexbuf, sizeof(hexbuf))) { 6061 ndb_debug("parse_json_ids: hex decode failed\n"); 6062 return 0; 6063 } 6064 6065 ndb_debug("adding id elem\n"); 6066 if (!ndb_filter_add_id_element(filter, hexbuf)) { 6067 ndb_debug("parse_json_ids: failed to add id element\n"); 6068 return 0; 6069 } 6070 } 6071 6072 parser->i--; 6073 return 1; 6074 } 6075 6076 static int ndb_filter_parse_json_elems(struct ndb_json_parser *parser, 6077 struct ndb_filter *filter) 6078 { 6079 jsmntok_t *tok; 6080 const char *start; 6081 int tok_len; 6082 unsigned char hexbuf[32]; 6083 enum ndb_generic_element_type typ; 6084 tok = NULL; 6085 int i, size; 6086 6087 tok = &parser->toks[parser->i++]; 6088 6089 if (tok->type != JSMN_ARRAY) 6090 return 0; 6091 6092 size = tok->size; 6093 6094 for (i = 0; i < size; i++, parser->i++) { 6095 tok = &parser->toks[parser->i]; 6096 start = parser->json + tok->start; 6097 tok_len = toksize(tok); 6098 6099 if (tok->type != JSMN_STRING) 6100 return 0; 6101 6102 if (i == 0) { 6103 if (tok_len == 64 && hex_decode(start, 64, hexbuf, sizeof(hexbuf))) { 6104 typ = NDB_ELEMENT_ID; 6105 if (!ndb_filter_add_id_element(filter, hexbuf)) { 6106 ndb_debug("failed to push id elem\n"); 6107 return 0; 6108 } 6109 } else { 6110 typ = NDB_ELEMENT_STRING; 6111 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 6112 return 0; 6113 } 6114 } else if (typ == NDB_ELEMENT_ID) { 6115 if (!hex_decode(start, 64, hexbuf, sizeof(hexbuf))) 6116 return 0; 6117 if (!ndb_filter_add_id_element(filter, hexbuf)) 6118 return 0; 6119 } else if (typ == NDB_ELEMENT_STRING) { 6120 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 6121 return 0; 6122 } else { 6123 // ??? 6124 return 0; 6125 } 6126 } 6127 6128 parser->i--; 6129 return 1; 6130 } 6131 6132 static int ndb_filter_parse_json_int(struct ndb_json_parser *parser, 6133 struct ndb_filter *filter) 6134 { 6135 jsmntok_t *tok; 6136 const char *start; 6137 int tok_len; 6138 unsigned int value; 6139 6140 tok = &parser->toks[parser->i]; 6141 start = parser->json + tok->start; 6142 tok_len = toksize(tok); 6143 6144 if (tok->type != JSMN_PRIMITIVE) 6145 return 0; 6146 6147 if (!parse_unsigned_int(start, tok_len, &value)) 6148 return 0; 6149 6150 if (!ndb_filter_add_int_element(filter, (uint64_t)value)) 6151 return 0; 6152 6153 ndb_debug("added int elem %d\n", value); 6154 6155 return 1; 6156 } 6157 6158 6159 static int ndb_filter_parse_json_ints(struct ndb_json_parser *parser, 6160 struct ndb_filter *filter) 6161 { 6162 jsmntok_t *tok; 6163 int size, i; 6164 6165 tok = &parser->toks[parser->i++]; 6166 6167 if (tok->type != JSMN_ARRAY) 6168 return 0; 6169 6170 size = tok->size; 6171 6172 for (i = 0; i < size; parser->i++, i++) { 6173 if (!ndb_filter_parse_json_int(parser, filter)) 6174 return 0; 6175 } 6176 6177 parser->i--; 6178 return 1; 6179 } 6180 6181 6182 static int ndb_filter_parse_json(struct ndb_json_parser *parser, 6183 struct ndb_filter *filter) 6184 { 6185 jsmntok_t *tok = NULL; 6186 const char *json = parser->json; 6187 const char *start; 6188 char tag; 6189 int tok_len; 6190 enum ndb_filter_fieldtype field; 6191 6192 if (parser->toks[parser->i++].type != JSMN_OBJECT) 6193 return 0; 6194 6195 for (; parser->i < parser->num_tokens; parser->i++) { 6196 tok = &parser->toks[parser->i++]; 6197 start = json + tok->start; 6198 tok_len = toksize(tok); 6199 6200 if (!(field = ndb_filter_parse_field(start, tok_len, &tag))) { 6201 ndb_debug("failed field '%.*s'\n", tok_len, start); 6202 continue; 6203 } 6204 6205 if (tag) { 6206 ndb_debug("starting tag field '%c'\n", tag); 6207 if (!ndb_filter_start_tag_field(filter, tag)) { 6208 ndb_debug("failed to start tag field '%c'\n", tag); 6209 return 0; 6210 } 6211 } else if (!ndb_filter_start_field(filter, field)) { 6212 ndb_debug("field already started\n"); 6213 return 0; 6214 } 6215 6216 // we parsed a top-level field 6217 switch(field) { 6218 case NDB_FILTER_AUTHORS: 6219 case NDB_FILTER_IDS: 6220 if (!ndb_filter_parse_json_ids(parser, filter)) { 6221 ndb_debug("failed to parse filter ids/authors\n"); 6222 return 0; 6223 } 6224 break; 6225 case NDB_FILTER_SINCE: 6226 case NDB_FILTER_UNTIL: 6227 case NDB_FILTER_LIMIT: 6228 if (!ndb_filter_parse_json_int(parser, filter)) { 6229 ndb_debug("failed to parse filter since/until/limit\n"); 6230 return 0; 6231 } 6232 break; 6233 case NDB_FILTER_KINDS: 6234 if (!ndb_filter_parse_json_ints(parser, filter)) { 6235 ndb_debug("failed to parse filter kinds\n"); 6236 return 0; 6237 } 6238 break; 6239 case NDB_FILTER_TAGS: 6240 if (!ndb_filter_parse_json_elems(parser, filter)) { 6241 ndb_debug("failed to parse filter tags\n"); 6242 return 0; 6243 } 6244 break; 6245 } 6246 6247 ndb_filter_end_field(filter); 6248 } 6249 6250 return ndb_filter_end(filter); 6251 } 6252 6253 int ndb_parse_json_note(struct ndb_json_parser *parser, struct ndb_note **note) 6254 { 6255 jsmntok_t *tok = NULL; 6256 unsigned char hexbuf[64]; 6257 const char *json = parser->json; 6258 const char *start; 6259 int i, tok_len, parsed; 6260 6261 parsed = 0; 6262 6263 if (parser->toks[parser->i].type != JSMN_OBJECT) 6264 return 0; 6265 6266 // TODO: build id buffer and verify at end 6267 6268 for (i = parser->i + 1; i < parser->num_tokens; i++) { 6269 tok = &parser->toks[i]; 6270 start = json + tok->start; 6271 tok_len = toksize(tok); 6272 6273 //printf("toplevel %.*s %d\n", tok_len, json + tok->start, tok->type); 6274 if (tok_len == 0 || i + 1 >= parser->num_tokens) 6275 continue; 6276 6277 if (start[0] == 'p' && jsoneq(json, tok, tok_len, "pubkey")) { 6278 // pubkey 6279 tok = &parser->toks[i+1]; 6280 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6281 parsed |= NDB_PARSED_PUBKEY; 6282 ndb_builder_set_pubkey(&parser->builder, hexbuf); 6283 } else if (tok_len == 2 && start[0] == 'i' && start[1] == 'd') { 6284 // id 6285 tok = &parser->toks[i+1]; 6286 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6287 parsed |= NDB_PARSED_ID; 6288 ndb_builder_set_id(&parser->builder, hexbuf); 6289 } else if (tok_len == 3 && start[0] == 's' && start[1] == 'i' && start[2] == 'g') { 6290 // sig 6291 tok = &parser->toks[i+1]; 6292 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6293 parsed |= NDB_PARSED_SIG; 6294 ndb_builder_set_sig(&parser->builder, hexbuf); 6295 } else if (start[0] == 'k' && jsoneq(json, tok, tok_len, "kind")) { 6296 // kind 6297 tok = &parser->toks[i+1]; 6298 start = json + tok->start; 6299 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 6300 return 0; 6301 if (!parse_unsigned_int(start, toksize(tok), 6302 &parser->builder.note->kind)) 6303 return 0; 6304 parsed |= NDB_PARSED_KIND; 6305 } else if (start[0] == 'c') { 6306 if (jsoneq(json, tok, tok_len, "created_at")) { 6307 // created_at 6308 tok = &parser->toks[i+1]; 6309 start = json + tok->start; 6310 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 6311 return 0; 6312 // TODO: update to int64 in 2106 ... xD 6313 unsigned int bigi; 6314 if (!parse_unsigned_int(start, toksize(tok), &bigi)) 6315 return 0; 6316 parser->builder.note->created_at = bigi; 6317 parsed |= NDB_PARSED_CREATED_AT; 6318 } else if (jsoneq(json, tok, tok_len, "content")) { 6319 // content 6320 tok = &parser->toks[i+1]; 6321 union ndb_packed_str pstr; 6322 tok_len = toksize(tok); 6323 int written, pack_ids = 0; 6324 if (!ndb_builder_make_json_str(&parser->builder, 6325 json + tok->start, 6326 tok_len, &pstr, 6327 &written, pack_ids)) { 6328 ndb_debug("ndb_builder_make_json_str failed\n"); 6329 return 0; 6330 } 6331 parser->builder.note->content_length = written; 6332 parser->builder.note->content = pstr; 6333 parsed |= NDB_PARSED_CONTENT; 6334 } 6335 } else if (start[0] == 't' && jsoneq(json, tok, tok_len, "tags")) { 6336 tok = &parser->toks[i+1]; 6337 ndb_builder_process_json_tags(parser, tok); 6338 i += tok->size; 6339 parsed |= NDB_PARSED_TAGS; 6340 } 6341 } 6342 6343 //ndb_debug("parsed %d = %d, &->%d", parsed, NDB_PARSED_ALL, parsed & NDB_PARSED_ALL); 6344 if (parsed != NDB_PARSED_ALL) 6345 return 0; 6346 6347 return ndb_builder_finalize(&parser->builder, note, NULL); 6348 } 6349 6350 int ndb_filter_from_json(const char *json, int len, struct ndb_filter *filter, 6351 unsigned char *buf, int bufsize) 6352 { 6353 struct ndb_json_parser parser; 6354 int res; 6355 6356 if (filter->finalized) 6357 return 0; 6358 6359 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6360 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 6361 return res; 6362 6363 if (parser.num_tokens < 1) 6364 return 0; 6365 6366 return ndb_filter_parse_json(&parser, filter); 6367 } 6368 6369 int ndb_note_from_json(const char *json, int len, struct ndb_note **note, 6370 unsigned char *buf, int bufsize) 6371 { 6372 struct ndb_json_parser parser; 6373 int res; 6374 6375 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6376 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 6377 return res; 6378 6379 if (parser.num_tokens < 1) 6380 return 0; 6381 6382 return ndb_parse_json_note(&parser, note); 6383 } 6384 6385 void ndb_builder_set_pubkey(struct ndb_builder *builder, unsigned char *pubkey) 6386 { 6387 memcpy(builder->note->pubkey, pubkey, 32); 6388 } 6389 6390 void ndb_builder_set_id(struct ndb_builder *builder, unsigned char *id) 6391 { 6392 memcpy(builder->note->id, id, 32); 6393 } 6394 6395 void ndb_builder_set_sig(struct ndb_builder *builder, unsigned char *sig) 6396 { 6397 memcpy(builder->note->sig, sig, 64); 6398 } 6399 6400 void ndb_builder_set_kind(struct ndb_builder *builder, uint32_t kind) 6401 { 6402 builder->note->kind = kind; 6403 } 6404 6405 void ndb_builder_set_created_at(struct ndb_builder *builder, uint64_t created_at) 6406 { 6407 builder->note->created_at = created_at; 6408 } 6409 6410 int ndb_builder_new_tag(struct ndb_builder *builder) 6411 { 6412 builder->note->tags.count++; 6413 struct ndb_tag tag = {0}; 6414 builder->current_tag = (struct ndb_tag *)builder->note_cur.p; 6415 return cursor_push_tag(&builder->note_cur, &tag); 6416 } 6417 6418 void ndb_stat_counts_init(struct ndb_stat_counts *counts) 6419 { 6420 counts->count = 0; 6421 counts->key_size = 0; 6422 counts->value_size = 0; 6423 } 6424 6425 static void ndb_stat_init(struct ndb_stat *stat) 6426 { 6427 // init stats 6428 int i; 6429 6430 for (i = 0; i < NDB_CKIND_COUNT; i++) { 6431 ndb_stat_counts_init(&stat->common_kinds[i]); 6432 } 6433 6434 for (i = 0; i < NDB_DBS; i++) { 6435 ndb_stat_counts_init(&stat->dbs[i]); 6436 } 6437 6438 ndb_stat_counts_init(&stat->other_kinds); 6439 } 6440 6441 int ndb_stat(struct ndb *ndb, struct ndb_stat *stat) 6442 { 6443 int rc; 6444 MDB_cursor *cur; 6445 MDB_val k, v; 6446 MDB_dbi db; 6447 struct ndb_txn txn; 6448 struct ndb_note *note; 6449 int i; 6450 enum ndb_common_kind common_kind; 6451 6452 // initialize to 0 6453 ndb_stat_init(stat); 6454 6455 if (!ndb_begin_query(ndb, &txn)) { 6456 fprintf(stderr, "ndb_stat failed at ndb_begin_query\n"); 6457 return 0; 6458 } 6459 6460 // stat each dbi in the database 6461 for (i = 0; i < NDB_DBS; i++) 6462 { 6463 db = ndb->lmdb.dbs[i]; 6464 6465 if ((rc = mdb_cursor_open(txn.mdb_txn, db, &cur))) { 6466 fprintf(stderr, "ndb_stat: mdb_cursor_open failed, error '%s'\n", 6467 mdb_strerror(rc)); 6468 return 0; 6469 } 6470 6471 // loop over every entry and count kv sizes 6472 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6473 // we gather more detailed per-kind stats if we're in 6474 // the notes db 6475 if (i == NDB_DB_NOTE) { 6476 note = v.mv_data; 6477 common_kind = ndb_kind_to_common_kind(note->kind); 6478 6479 // uncommon kind? just count them in bulk 6480 if ((int)common_kind == -1) { 6481 stat->other_kinds.count++; 6482 stat->other_kinds.key_size += k.mv_size; 6483 stat->other_kinds.value_size += v.mv_size; 6484 } else { 6485 stat->common_kinds[common_kind].count++; 6486 stat->common_kinds[common_kind].key_size += k.mv_size; 6487 stat->common_kinds[common_kind].value_size += v.mv_size; 6488 } 6489 } 6490 6491 stat->dbs[i].count++; 6492 stat->dbs[i].key_size += k.mv_size; 6493 stat->dbs[i].value_size += v.mv_size; 6494 } 6495 6496 // close the cursor, they are per-dbi 6497 mdb_cursor_close(cur); 6498 } 6499 6500 ndb_end_query(&txn); 6501 6502 return 1; 6503 } 6504 6505 /// Push an element to the current tag 6506 /// 6507 /// Basic idea is to call ndb_builder_new_tag 6508 int ndb_builder_push_tag_str(struct ndb_builder *builder, 6509 const char *str, int len) 6510 { 6511 union ndb_packed_str pstr; 6512 int pack_ids = 1; 6513 if (!ndb_builder_make_str(builder, str, len, &pstr, pack_ids)) 6514 return 0; 6515 return ndb_builder_finalize_tag(builder, pstr); 6516 } 6517 6518 // 6519 // CONFIG 6520 // 6521 void ndb_default_config(struct ndb_config *config) 6522 { 6523 int cores = get_cpu_cores(); 6524 config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB 6525 config->ingester_threads = cores == -1 ? 4 : cores; 6526 config->flags = 0; 6527 config->ingest_filter = NULL; 6528 config->filter_context = NULL; 6529 config->sub_cb_ctx = NULL; 6530 config->sub_cb = NULL; 6531 } 6532 6533 void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *context) 6534 { 6535 config->sub_cb_ctx = context; 6536 config->sub_cb = fn; 6537 } 6538 6539 void ndb_config_set_ingest_threads(struct ndb_config *config, int threads) 6540 { 6541 config->ingester_threads = threads; 6542 } 6543 6544 void ndb_config_set_flags(struct ndb_config *config, int flags) 6545 { 6546 config->flags = flags; 6547 } 6548 6549 void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize) 6550 { 6551 config->mapsize = mapsize; 6552 } 6553 6554 void ndb_config_set_ingest_filter(struct ndb_config *config, 6555 ndb_ingest_filter_fn fn, void *filter_ctx) 6556 { 6557 config->ingest_filter = fn; 6558 config->filter_context = filter_ctx; 6559 } 6560 6561 int ndb_print_tag_index(struct ndb_txn *txn) 6562 { 6563 MDB_cursor *cur; 6564 MDB_val k, v; 6565 int i; 6566 6567 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TAGS], &cur)) 6568 return 0; 6569 6570 i = 1; 6571 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6572 printf("%d ", i); 6573 print_tag_kv(txn, &k, &v); 6574 i++; 6575 } 6576 6577 return 1; 6578 } 6579 6580 int ndb_print_kind_keys(struct ndb_txn *txn) 6581 { 6582 MDB_cursor *cur; 6583 MDB_val k, v; 6584 int i; 6585 struct ndb_u64_ts *tsid; 6586 6587 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_KIND], &cur)) 6588 return 0; 6589 6590 i = 1; 6591 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6592 tsid = k.mv_data; 6593 printf("%d note_kind %" PRIu64 " %" PRIu64 "\n", 6594 i, tsid->u64, tsid->timestamp); 6595 6596 i++; 6597 } 6598 6599 return 1; 6600 } 6601 6602 // used by ndb.c 6603 int ndb_print_search_keys(struct ndb_txn *txn) 6604 { 6605 MDB_cursor *cur; 6606 MDB_val k, v; 6607 int i; 6608 struct ndb_text_search_key search_key; 6609 6610 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TEXT], &cur)) 6611 return 0; 6612 6613 i = 1; 6614 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 6615 if (!ndb_unpack_text_search_key(k.mv_data, k.mv_size, &search_key)) { 6616 fprintf(stderr, "error decoding key %d\n", i); 6617 continue; 6618 } 6619 6620 ndb_print_text_search_key(&search_key); 6621 printf("\n"); 6622 6623 i++; 6624 } 6625 6626 return 1; 6627 } 6628 6629 struct ndb_tags *ndb_note_tags(struct ndb_note *note) 6630 { 6631 return ¬e->tags; 6632 } 6633 6634 struct ndb_str ndb_note_str(struct ndb_note *note, union ndb_packed_str *pstr) 6635 { 6636 struct ndb_str str; 6637 str.flag = pstr->packed.flag; 6638 6639 if (str.flag == NDB_PACKED_STR) { 6640 str.str = pstr->packed.str; 6641 return str; 6642 } 6643 6644 str.str = ((const char *)note) + note->strings + (pstr->offset & 0xFFFFFF); 6645 return str; 6646 } 6647 6648 struct ndb_str ndb_tag_str(struct ndb_note *note, struct ndb_tag *tag, int ind) 6649 { 6650 return ndb_note_str(note, &tag->strs[ind]); 6651 } 6652 6653 int ndb_str_len(struct ndb_str *str) 6654 { 6655 if (str->flag == NDB_PACKED_ID) 6656 return 32; 6657 return strlen(str->str); 6658 } 6659 6660 struct ndb_str ndb_iter_tag_str(struct ndb_iterator *iter, int ind) 6661 { 6662 return ndb_tag_str(iter->note, iter->tag, ind); 6663 } 6664 6665 unsigned char * ndb_note_id(struct ndb_note *note) 6666 { 6667 return note->id; 6668 } 6669 6670 unsigned char * ndb_note_pubkey(struct ndb_note *note) 6671 { 6672 return note->pubkey; 6673 } 6674 6675 unsigned char * ndb_note_sig(struct ndb_note *note) 6676 { 6677 return note->sig; 6678 } 6679 6680 uint32_t ndb_note_created_at(struct ndb_note *note) 6681 { 6682 return note->created_at; 6683 } 6684 6685 uint32_t ndb_note_kind(struct ndb_note *note) 6686 { 6687 return note->kind; 6688 } 6689 6690 void _ndb_note_set_kind(struct ndb_note *note, uint32_t kind) 6691 { 6692 note->kind = kind; 6693 } 6694 6695 const char *ndb_note_content(struct ndb_note *note) 6696 { 6697 return ndb_note_str(note, ¬e->content).str; 6698 } 6699 6700 uint32_t ndb_note_content_length(struct ndb_note *note) 6701 { 6702 return note->content_length; 6703 } 6704 6705 struct ndb_note * ndb_note_from_bytes(unsigned char *bytes) 6706 { 6707 struct ndb_note *note = (struct ndb_note *)bytes; 6708 if (note->version != 1) 6709 return 0; 6710 return note; 6711 } 6712 6713 void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter) 6714 { 6715 iter->note = note; 6716 iter->tag = NULL; 6717 iter->index = -1; 6718 } 6719 6720 // Helper function to get a pointer to the nth tag 6721 static struct ndb_tag *ndb_tags_tag(struct ndb_tags *tags, size_t index) { 6722 return (struct ndb_tag *)((uint8_t *)tags + sizeof(struct ndb_tags) + index * sizeof(struct ndb_tag)); 6723 } 6724 6725 int ndb_tags_iterate_next(struct ndb_iterator *iter) 6726 { 6727 struct ndb_tags *tags; 6728 6729 if (iter->tag == NULL || iter->index == -1) { 6730 iter->tag = ndb_tags_tag(&iter->note->tags, 0); 6731 iter->index = 0; 6732 return iter->note->tags.count != 0; 6733 } 6734 6735 tags = &iter->note->tags; 6736 6737 if (++iter->index < tags->count) { 6738 uint32_t tag_data_size = iter->tag->count * sizeof(iter->tag->strs[0]); 6739 iter->tag = (struct ndb_tag *)(iter->tag->strs[0].bytes + tag_data_size); 6740 return 1; 6741 } 6742 6743 return 0; 6744 } 6745 6746 uint16_t ndb_tags_count(struct ndb_tags *tags) 6747 { 6748 return tags->count; 6749 } 6750 6751 uint16_t ndb_tag_count(struct ndb_tag *tags) 6752 { 6753 return tags->count; 6754 } 6755 6756 enum ndb_common_kind ndb_kind_to_common_kind(int kind) 6757 { 6758 switch (kind) 6759 { 6760 case 0: return NDB_CKIND_PROFILE; 6761 case 1: return NDB_CKIND_TEXT; 6762 case 3: return NDB_CKIND_CONTACTS; 6763 case 4: return NDB_CKIND_DM; 6764 case 5: return NDB_CKIND_DELETE; 6765 case 6: return NDB_CKIND_REPOST; 6766 case 7: return NDB_CKIND_REACTION; 6767 case 9735: return NDB_CKIND_ZAP; 6768 case 9734: return NDB_CKIND_ZAP_REQUEST; 6769 case 23194: return NDB_CKIND_NWC_REQUEST; 6770 case 23195: return NDB_CKIND_NWC_RESPONSE; 6771 case 27235: return NDB_CKIND_HTTP_AUTH; 6772 case 30000: return NDB_CKIND_LIST; 6773 case 30023: return NDB_CKIND_LONGFORM; 6774 case 30315: return NDB_CKIND_STATUS; 6775 } 6776 6777 return -1; 6778 } 6779 6780 const char *ndb_kind_name(enum ndb_common_kind ck) 6781 { 6782 switch (ck) { 6783 case NDB_CKIND_PROFILE: return "profile"; 6784 case NDB_CKIND_TEXT: return "text"; 6785 case NDB_CKIND_CONTACTS: return "contacts"; 6786 case NDB_CKIND_DM: return "dm"; 6787 case NDB_CKIND_DELETE: return "delete"; 6788 case NDB_CKIND_REPOST: return "repost"; 6789 case NDB_CKIND_REACTION: return "reaction"; 6790 case NDB_CKIND_ZAP: return "zap"; 6791 case NDB_CKIND_ZAP_REQUEST: return "zap_request"; 6792 case NDB_CKIND_NWC_REQUEST: return "nwc_request"; 6793 case NDB_CKIND_NWC_RESPONSE: return "nwc_response"; 6794 case NDB_CKIND_HTTP_AUTH: return "http_auth"; 6795 case NDB_CKIND_LIST: return "list"; 6796 case NDB_CKIND_LONGFORM: return "longform"; 6797 case NDB_CKIND_STATUS: return "status"; 6798 case NDB_CKIND_COUNT: return "unknown"; 6799 } 6800 6801 return "unknown"; 6802 } 6803 6804 const char *ndb_db_name(enum ndb_dbs db) 6805 { 6806 switch (db) { 6807 case NDB_DB_NOTE: 6808 return "note"; 6809 case NDB_DB_META: 6810 return "note_metadata"; 6811 case NDB_DB_PROFILE: 6812 return "profile"; 6813 case NDB_DB_NOTE_ID: 6814 return "note_index"; 6815 case NDB_DB_PROFILE_PK: 6816 return "profile_pubkey_index"; 6817 case NDB_DB_NDB_META: 6818 return "nostrdb_metadata"; 6819 case NDB_DB_PROFILE_SEARCH: 6820 return "profile_search"; 6821 case NDB_DB_PROFILE_LAST_FETCH: 6822 return "profile_last_fetch"; 6823 case NDB_DB_NOTE_KIND: 6824 return "note_kind_index"; 6825 case NDB_DB_NOTE_TEXT: 6826 return "note_fulltext"; 6827 case NDB_DB_NOTE_BLOCKS: 6828 return "note_blocks"; 6829 case NDB_DB_NOTE_TAGS: 6830 return "note_tags"; 6831 case NDB_DB_NOTE_PUBKEY: 6832 return "note_pubkey_index"; 6833 case NDB_DB_NOTE_PUBKEY_KIND: 6834 return "note_pubkey_kind_index"; 6835 case NDB_DBS: 6836 return "count"; 6837 } 6838 6839 return "unknown"; 6840 } 6841 6842 static struct ndb_blocks *ndb_note_to_blocks(struct ndb_note *note) 6843 { 6844 const char *content; 6845 size_t content_len; 6846 struct ndb_blocks *blocks; 6847 6848 content = ndb_note_content(note); 6849 content_len = ndb_note_content_length(note); 6850 6851 // something weird is going on 6852 if (content_len >= INT32_MAX) 6853 return NULL; 6854 6855 unsigned char *buffer = malloc(content_len); 6856 if (!buffer) 6857 return NULL; 6858 6859 if (!ndb_parse_content(buffer, content_len, content, content_len, &blocks)) { 6860 free(buffer); 6861 return NULL; 6862 } 6863 6864 //blocks = realloc(blocks, ndb_blocks_total_size(blocks)); 6865 //if (blocks == NULL) 6866 //return NULL; 6867 6868 blocks->flags |= NDB_BLOCK_FLAG_OWNED; 6869 6870 return blocks; 6871 } 6872 6873 struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, uint64_t note_key) 6874 { 6875 struct ndb_blocks *blocks, *blocks_to_writer; 6876 size_t blocks_size; 6877 struct ndb_note *note; 6878 size_t note_len; 6879 6880 if ((blocks = ndb_lookup_by_key(txn, note_key, NDB_DB_NOTE_BLOCKS, ¬e_len))) { 6881 return blocks; 6882 } 6883 6884 // If we don't have note blocks, let's lazily generate them. This is 6885 // migration-friendly instead of doing them all at once 6886 if (!(note = ndb_get_note_by_key(txn, note_key, ¬e_len))) { 6887 // no note found, can't return note blocks 6888 return NULL; 6889 } 6890 6891 if (!(blocks = ndb_note_to_blocks(note))) 6892 return NULL; 6893 6894 // send a copy to the writer 6895 blocks_size = ndb_blocks_total_size(blocks); 6896 blocks_to_writer = malloc(blocks_size); 6897 memcpy(blocks_to_writer, blocks, blocks_size); 6898 assert(blocks->flags & NDB_BLOCK_FLAG_OWNED); 6899 6900 // we generated new blocks, let's store them in the DB 6901 struct ndb_writer_blocks write_blocks = { 6902 .blocks = blocks_to_writer, 6903 .note_key = note_key 6904 }; 6905 6906 assert(write_blocks.blocks != blocks); 6907 6908 struct ndb_writer_msg msg = { .type = NDB_WRITER_BLOCKS }; 6909 msg.blocks = write_blocks; 6910 6911 ndb_writer_queue_msg(&ndb->writer, &msg); 6912 6913 return blocks; 6914 } 6915 6916 // please call ndb_monitor_lock before calling this 6917 static struct ndb_subscription * 6918 ndb_monitor_find_subscription(struct ndb_monitor *monitor, uint64_t subid, int *index) 6919 { 6920 struct ndb_subscription *sub, *tsub; 6921 int i; 6922 6923 for (i = 0, sub = NULL; i < monitor->num_subscriptions; i++) { 6924 tsub = &monitor->subscriptions[i]; 6925 if (tsub->subid == subid) { 6926 sub = tsub; 6927 if (index) 6928 *index = i; 6929 break; 6930 } 6931 } 6932 6933 return sub; 6934 } 6935 6936 int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, 6937 int note_id_capacity) 6938 { 6939 struct ndb_subscription *sub; 6940 int res; 6941 6942 if (subid == 0) 6943 return 0; 6944 6945 ndb_monitor_lock(&ndb->monitor); 6946 6947 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) 6948 res = 0; 6949 else 6950 res = prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity); 6951 6952 ndb_monitor_unlock(&ndb->monitor); 6953 6954 return res; 6955 } 6956 6957 int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, 6958 int note_id_capacity) 6959 { 6960 struct ndb_subscription *sub; 6961 struct prot_queue *queue_inbox; 6962 6963 // this is not a valid subscription id 6964 if (subid == 0) 6965 return 0; 6966 6967 ndb_monitor_lock(&ndb->monitor); 6968 6969 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) { 6970 ndb_monitor_unlock(&ndb->monitor); 6971 return 0; 6972 } 6973 6974 queue_inbox = &sub->inbox; 6975 6976 ndb_monitor_unlock(&ndb->monitor); 6977 6978 // there is technically a race condition if the thread yeilds at this 6979 // comment and a subscription is added/removed. A deadlock in the 6980 // writer queue would be much worse though. This function is dubious 6981 // anyways. 6982 6983 return prot_queue_pop_all(queue_inbox, note_ids, note_id_capacity); 6984 } 6985 6986 int ndb_unsubscribe(struct ndb *ndb, uint64_t subid) 6987 { 6988 struct ndb_subscription *sub; 6989 int index, res, elems_to_move; 6990 6991 ndb_monitor_lock(&ndb->monitor); 6992 6993 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, &index))) { 6994 res = 0; 6995 goto done; 6996 } 6997 6998 ndb_subscription_destroy(sub); 6999 7000 elems_to_move = (--ndb->monitor.num_subscriptions) - index; 7001 7002 memmove(&ndb->monitor.subscriptions[index], 7003 &ndb->monitor.subscriptions[index+1], 7004 elems_to_move * sizeof(*sub)); 7005 7006 res = 1; 7007 7008 done: 7009 ndb_monitor_unlock(&ndb->monitor); 7010 7011 return res; 7012 } 7013 7014 int ndb_num_subscriptions(struct ndb *ndb) 7015 { 7016 return ndb->monitor.num_subscriptions; 7017 } 7018 7019 uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filters) 7020 { 7021 static uint64_t subids = 0; 7022 struct ndb_subscription *sub; 7023 size_t buflen; 7024 uint64_t subid; 7025 char *buf; 7026 7027 ndb_monitor_lock(&ndb->monitor); 7028 7029 if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) { 7030 fprintf(stderr, "too many subscriptions\n"); 7031 subid = 0; 7032 goto done; 7033 } 7034 7035 sub = &ndb->monitor.subscriptions[ndb->monitor.num_subscriptions]; 7036 subid = ++subids; 7037 sub->subid = subid; 7038 7039 ndb_filter_group_init(&sub->group); 7040 if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) { 7041 subid = 0; 7042 goto done; 7043 } 7044 7045 // 500k ought to be enough for anyone 7046 buflen = sizeof(uint64_t) * DEFAULT_QUEUE_SIZE; 7047 buf = malloc(buflen); 7048 7049 if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) { 7050 fprintf(stderr, "failed to push prot queue\n"); 7051 subid = 0; 7052 goto done; 7053 } 7054 7055 ndb->monitor.num_subscriptions++; 7056 done: 7057 ndb_monitor_unlock(&ndb->monitor); 7058 7059 return subid; 7060 }