nostrdb.c (192618B)
1 2 #include "nostrdb.h" 3 #include "jsmn.h" 4 #include "hex.h" 5 #include "cursor.h" 6 #include "random.h" 7 #include "ccan/crypto/sha256/sha256.h" 8 #include "bolt11/bolt11.h" 9 #include "bolt11/amount.h" 10 #include "lmdb.h" 11 #include "util.h" 12 #include "cpu.h" 13 #include "block.h" 14 #include "threadpool.h" 15 #include "thread.h" 16 #include "protected_queue.h" 17 #include "memchr.h" 18 #include "print_util.h" 19 #include <stdlib.h> 20 #include <limits.h> 21 #include <assert.h> 22 23 #include "bindings/c/profile_json_parser.h" 24 #include "bindings/c/profile_builder.h" 25 #include "bindings/c/meta_builder.h" 26 #include "bindings/c/meta_reader.h" 27 #include "bindings/c/profile_verifier.h" 28 #include "secp256k1.h" 29 #include "secp256k1_ecdh.h" 30 #include "secp256k1_schnorrsig.h" 31 32 #define max(a,b) ((a) > (b) ? (a) : (b)) 33 #define min(a,b) ((a) < (b) ? (a) : (b)) 34 35 // the maximum number of things threads pop and push in bulk 36 #define THREAD_QUEUE_BATCH 4096 37 38 // maximum number of active subscriptions 39 #define MAX_SUBSCRIPTIONS 256 40 #define MAX_SCAN_CURSORS 12 41 #define MAX_FILTERS 16 42 43 // the maximum size of inbox queues 44 static const int DEFAULT_QUEUE_SIZE = 32768; 45 46 // 2mb scratch size for the writer thread 47 static const int DEFAULT_WRITER_SCRATCH_SIZE = 2097152; 48 49 // increase if we need bigger filters 50 #define NDB_FILTER_PAGES 64 51 52 #define ndb_flag_set(flags, f) ((flags & f) == f) 53 54 #define NDB_PARSED_ID (1 << 0) 55 #define NDB_PARSED_PUBKEY (1 << 1) 56 #define NDB_PARSED_SIG (1 << 2) 57 #define NDB_PARSED_CREATED_AT (1 << 3) 58 #define NDB_PARSED_KIND (1 << 4) 59 #define NDB_PARSED_CONTENT (1 << 5) 60 #define NDB_PARSED_TAGS (1 << 6) 61 #define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS) 62 63 typedef int (*ndb_migrate_fn)(struct ndb_txn *); 64 typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len, 65 int word_index); 66 67 // these must be byte-aligned, they are directly accessing the serialized data 68 // representation 69 #pragma pack(push, 1) 70 71 union ndb_packed_str { 72 struct { 73 char str[3]; 74 // we assume little endian everywhere. sorry not sorry. 75 unsigned char flag; // NDB_PACKED_STR, etc 76 } packed; 77 78 uint32_t offset; 79 unsigned char bytes[4]; 80 }; 81 82 struct ndb_tag { 83 uint16_t count; 84 union ndb_packed_str strs[0]; 85 }; 86 87 struct ndb_tags { 88 uint16_t padding; 89 uint16_t count; 90 }; 91 92 // v1 93 struct ndb_note { 94 unsigned char version; // v=1 95 unsigned char padding[3]; // keep things aligned 96 unsigned char id[32]; 97 unsigned char pubkey[32]; 98 unsigned char sig[64]; 99 100 uint64_t created_at; 101 uint32_t kind; 102 uint32_t content_length; 103 union ndb_packed_str content; 104 uint32_t strings; 105 // nothing can come after tags since it contains variadic data 106 struct ndb_tags tags; 107 }; 108 109 #pragma pack(pop) 110 111 112 struct ndb_migration { 113 ndb_migrate_fn fn; 114 }; 115 116 struct ndb_profile_record_builder { 117 flatcc_builder_t *builder; 118 void *flatbuf; 119 }; 120 121 // controls whether to continue or stop the json parser 122 enum ndb_idres { 123 NDB_IDRES_CONT, 124 NDB_IDRES_STOP, 125 }; 126 127 // closure data for the id-detecting ingest controller 128 struct ndb_ingest_controller 129 { 130 MDB_txn *read_txn; 131 struct ndb_lmdb *lmdb; 132 struct ndb_note *note; 133 uint64_t note_key; 134 }; 135 136 enum ndb_writer_msgtype { 137 NDB_WRITER_QUIT, // kill thread immediately 138 NDB_WRITER_NOTE, // write a note to the db 139 NDB_WRITER_PROFILE, // write a profile to the db 140 NDB_WRITER_DBMETA, // write ndb metadata 141 NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched 142 NDB_WRITER_BLOCKS, // write parsed note blocks 143 NDB_WRITER_MIGRATE, // migrate the database 144 NDB_WRITER_NOTE_RELAY, // we already have the note, but we have more relays to write 145 }; 146 147 // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) 148 enum ndb_meta_key { 149 NDB_META_KEY_VERSION = 1 150 }; 151 152 struct ndb_json_parser { 153 const char *json; 154 int json_len; 155 struct ndb_builder builder; 156 jsmn_parser json_parser; 157 jsmntok_t *toks, *toks_end; 158 int i; 159 int num_tokens; 160 }; 161 162 // useful to pass to threads on its own 163 struct ndb_lmdb { 164 MDB_env *env; 165 MDB_dbi dbs[NDB_DBS]; 166 }; 167 168 struct ndb_writer { 169 struct ndb_lmdb *lmdb; 170 struct ndb_monitor *monitor; 171 172 int scratch_size; 173 uint32_t ndb_flags; 174 void *queue_buf; 175 int queue_buflen; 176 pthread_t thread_id; 177 178 struct prot_queue inbox; 179 }; 180 181 struct ndb_ingester { 182 struct ndb_lmdb *lmdb; 183 uint32_t flags; 184 struct threadpool tp; 185 struct prot_queue *writer_inbox; 186 void *filter_context; 187 ndb_ingest_filter_fn filter; 188 }; 189 190 struct ndb_filter_group { 191 struct ndb_filter filters[MAX_FILTERS]; 192 int num_filters; 193 }; 194 195 struct ndb_subscription { 196 uint64_t subid; 197 struct ndb_filter_group group; 198 struct prot_queue inbox; 199 }; 200 201 struct ndb_monitor { 202 struct ndb_subscription subscriptions[MAX_SUBSCRIPTIONS]; 203 ndb_sub_fn sub_cb; 204 void *sub_cb_ctx; 205 int num_subscriptions; 206 207 // monitor isn't a full inbox. We want pollers to be able to poll 208 // subscriptions efficiently without going through a message queue, so 209 // we use a simple mutex here. 210 pthread_mutex_t mutex; 211 }; 212 213 struct ndb { 214 struct ndb_lmdb lmdb; 215 struct ndb_ingester ingester; 216 struct ndb_monitor monitor; 217 struct ndb_writer writer; 218 int version; 219 uint32_t flags; // setting flags 220 // lmdb environ handles, etc 221 }; 222 223 /// 224 /// Query Plans 225 /// 226 /// There are general strategies for performing certain types of query 227 /// depending on the filter. For example, for large contact list queries 228 /// with many authors, we simply do a descending scan on created_at 229 /// instead of doing 1000s of pubkey scans. 230 /// 231 /// Query plans are calculated from filters via `ndb_filter_plan` 232 /// 233 enum ndb_query_plan { 234 NDB_PLAN_KINDS, 235 NDB_PLAN_IDS, 236 NDB_PLAN_AUTHORS, 237 NDB_PLAN_AUTHOR_KINDS, 238 NDB_PLAN_CREATED, 239 NDB_PLAN_TAGS, 240 NDB_PLAN_SEARCH, 241 }; 242 243 // A id + u64 + timestamp 244 struct ndb_id_u64_ts { 245 unsigned char id[32]; // pubkey, id, etc 246 uint64_t u64; // kind, etc 247 uint64_t timestamp; 248 }; 249 250 // A clustered key with an id and a timestamp 251 struct ndb_tsid { 252 unsigned char id[32]; 253 uint64_t timestamp; 254 }; 255 256 // A u64 + timestamp id. Just using this for kinds at the moment. 257 struct ndb_u64_ts { 258 uint64_t u64; // kind, etc 259 uint64_t timestamp; 260 }; 261 262 struct ndb_word 263 { 264 const char *word; 265 int word_len; 266 }; 267 268 struct ndb_search_words 269 { 270 struct ndb_word words[MAX_TEXT_SEARCH_WORDS]; 271 int num_words; 272 }; 273 274 275 static inline int is_replaceable_kind(uint64_t kind) 276 { 277 return kind == 0 || kind == 3 278 || (10000 <= kind && kind < 20000) 279 || (30000 <= kind && kind < 40000); 280 } 281 282 283 // ndb_text_search_key 284 // 285 // This is compressed when in lmdb: 286 // 287 // note_id: varint 288 // strlen: varint 289 // str: cstr 290 // timestamp: varint 291 // word_index: varint 292 // 293 static int ndb_make_text_search_key(unsigned char *buf, int bufsize, 294 int word_index, int word_len, const char *str, 295 uint64_t timestamp, uint64_t note_id, 296 int *keysize) 297 { 298 struct cursor cur; 299 make_cursor(buf, buf + bufsize, &cur); 300 301 // TODO: need update this to uint64_t 302 // we push this first because our query function can pull this off 303 // quickly to check matches 304 if (!cursor_push_varint(&cur, (int32_t)note_id)) 305 return 0; 306 307 // string length 308 if (!cursor_push_varint(&cur, word_len)) 309 return 0; 310 311 // non-null terminated, lowercase string 312 if (!cursor_push_lowercase(&cur, str, word_len)) 313 return 0; 314 315 // TODO: need update this to uint64_t 316 if (!cursor_push_varint(&cur, (int)timestamp)) 317 return 0; 318 319 // the index of the word in the content so that we can do more accurate 320 // phrase searches 321 if (!cursor_push_varint(&cur, word_index)) 322 return 0; 323 324 // pad to 8-byte alignment 325 if (!cursor_align(&cur, 8)) 326 return 0; 327 328 *keysize = cur.p - cur.start; 329 assert((*keysize % 8) == 0); 330 331 return 1; 332 } 333 334 static int ndb_make_noted_text_search_key(unsigned char *buf, int bufsize, 335 int wordlen, const char *word, 336 uint64_t timestamp, uint64_t note_id, 337 int *keysize) 338 { 339 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 340 timestamp, note_id, keysize); 341 } 342 343 static int ndb_make_text_search_key_low(unsigned char *buf, int bufsize, 344 int wordlen, const char *word, 345 uint64_t since, 346 int *keysize) 347 { 348 uint64_t note_id; 349 note_id = 0; 350 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 351 since, note_id, keysize); 352 } 353 354 static int ndb_make_text_search_key_high(unsigned char *buf, int bufsize, 355 int wordlen, const char *word, 356 uint64_t until, 357 int *keysize) 358 { 359 uint64_t note_id; 360 note_id = INT32_MAX; 361 return ndb_make_text_search_key(buf, bufsize, 0, wordlen, word, 362 until, note_id, keysize); 363 } 364 365 typedef int (*ndb_text_search_key_order_fn)(unsigned char *buf, int bufsize, int wordlen, const char *word, uint64_t timestamp, int *keysize); 366 367 /** From LMDB: Compare two items lexically */ 368 static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) { 369 int diff; 370 ssize_t len_diff; 371 unsigned int len; 372 373 len = a->mv_size; 374 len_diff = (ssize_t) a->mv_size - (ssize_t) b->mv_size; 375 if (len_diff > 0) { 376 len = b->mv_size; 377 len_diff = 1; 378 } 379 380 diff = memcmp(a->mv_data, b->mv_data, len); 381 return diff ? diff : len_diff<0 ? -1 : len_diff; 382 } 383 384 static int ndb_tag_key_compare(const MDB_val *a, const MDB_val *b) 385 { 386 MDB_val va, vb; 387 uint64_t ts_a, ts_b; 388 int cmp; 389 390 va.mv_data = a->mv_data; 391 va.mv_size = a->mv_size - 8; 392 393 vb.mv_data = b->mv_data; 394 vb.mv_size = b->mv_size - 8; 395 396 if ((cmp = mdb_cmp_memn(&va, &vb))) 397 return cmp; 398 399 ts_a = *(uint64_t*)((unsigned char *)va.mv_data + va.mv_size); 400 ts_b = *(uint64_t*)((unsigned char *)vb.mv_data + vb.mv_size); 401 402 if (ts_a < ts_b) 403 return -1; 404 else if (ts_a > ts_b) 405 return 1; 406 407 return 0; 408 } 409 410 static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b) 411 { 412 struct cursor ca, cb; 413 uint64_t sa, sb, nid_a, nid_b; 414 MDB_val a2, b2; 415 416 make_cursor(a->mv_data, (unsigned char *)a->mv_data + a->mv_size, &ca); 417 make_cursor(b->mv_data, (unsigned char *)b->mv_data + b->mv_size, &cb); 418 419 // note_id 420 if (unlikely(!cursor_pull_varint(&ca, &nid_a) || !cursor_pull_varint(&cb, &nid_b))) 421 return 0; 422 423 // string size 424 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 425 return 0; 426 427 a2.mv_data = ca.p; 428 a2.mv_size = sa; 429 430 b2.mv_data = cb.p; 431 b2.mv_size = sb; 432 433 int cmp = mdb_cmp_memn(&a2, &b2); 434 if (cmp) return cmp; 435 436 // skip over string 437 ca.p += sa; 438 cb.p += sb; 439 440 // timestamp 441 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 442 return 0; 443 444 if (sa < sb) return -1; 445 else if (sa > sb) return 1; 446 447 // note_id 448 if (nid_a < nid_b) return -1; 449 else if (nid_a > nid_b) return 1; 450 451 // word index 452 if (unlikely(!cursor_pull_varint(&ca, &sa) || !cursor_pull_varint(&cb, &sb))) 453 return 0; 454 455 if (sa < sb) return -1; 456 else if (sa > sb) return 1; 457 458 return 0; 459 } 460 461 static inline int ndb_unpack_text_search_key_noteid( 462 struct cursor *cur, uint64_t *note_id) 463 { 464 if (!cursor_pull_varint(cur, note_id)) 465 return 0; 466 467 return 1; 468 } 469 470 // faster peek of just the string instead of unpacking everything 471 // this is used to quickly discard range query matches if there is no 472 // common prefix 473 static inline int ndb_unpack_text_search_key_string(struct cursor *cur, 474 const char **str, 475 int *str_len) 476 { 477 uint64_t len; 478 479 if (!cursor_pull_varint(cur, &len)) 480 return 0; 481 482 *str_len = len; 483 484 *str = (const char *)cur->p; 485 486 if (!cursor_skip(cur, *str_len)) 487 return 0; 488 489 return 1; 490 } 491 492 // should be called after ndb_unpack_text_search_key_string. It continues 493 // the unpacking of a text search key if we've already started it. 494 static inline int 495 ndb_unpack_remaining_text_search_key(struct cursor *cur, 496 struct ndb_text_search_key *key) 497 { 498 if (!cursor_pull_varint(cur, &key->timestamp)) 499 return 0; 500 501 if (!cursor_pull_varint(cur, &key->word_index)) 502 return 0; 503 504 return 1; 505 } 506 507 // unpack a fulltext search key 508 // 509 // full version of string + unpack remaining. This is split up because text 510 // searching only requires to pull the string for prefix searching, and the 511 // remaining is optional 512 static inline int ndb_unpack_text_search_key(unsigned char *p, int len, 513 struct ndb_text_search_key *key) 514 { 515 struct cursor c; 516 make_cursor(p, p + len, &c); 517 518 if (!ndb_unpack_text_search_key_noteid(&c, &key->note_id)) 519 return 0; 520 521 if (!ndb_unpack_text_search_key_string(&c, &key->str, &key->str_len)) 522 return 0; 523 524 return ndb_unpack_remaining_text_search_key(&c, key); 525 } 526 527 // Copies only lowercase characters to the destination string and fills the rest with null bytes. 528 // `dst` and `src` are pointers to the destination and source strings, respectively. 529 // `n` is the maximum number of characters to copy. 530 static void lowercase_strncpy(char *dst, const char *src, int n) { 531 int j = 0, i = 0; 532 533 if (!dst || !src || n == 0) { 534 return; 535 } 536 537 while (src[i] != '\0' && j < n) { 538 dst[j++] = tolower(src[i++]); 539 } 540 541 // Null-terminate and fill the destination string 542 while (j < n) { 543 dst[j++] = '\0'; 544 } 545 } 546 547 static inline int ndb_filter_elem_is_ptr(struct ndb_filter_field *field) { 548 return field->elem_type == NDB_ELEMENT_STRING || field->elem_type == NDB_ELEMENT_ID; 549 } 550 551 // Copy the filter 552 int ndb_filter_clone(struct ndb_filter *dst, struct ndb_filter *src) 553 { 554 size_t src_size, elem_size, data_size; 555 556 memcpy(dst, src, sizeof(*src)); 557 558 elem_size = src->elem_buf.end - src->elem_buf.start; 559 data_size = src->data_buf.end - src->data_buf.start; 560 src_size = data_size + elem_size; 561 562 // let's only allow finalized filters to be cloned 563 if (!src || !src->finalized) 564 return 0; 565 566 dst->elem_buf.start = malloc(src_size); 567 dst->elem_buf.end = dst->elem_buf.start + elem_size; 568 dst->elem_buf.p = dst->elem_buf.end; 569 570 dst->data_buf.start = dst->elem_buf.start + elem_size; 571 dst->data_buf.end = dst->data_buf.start + data_size; 572 dst->data_buf.p = dst->data_buf.end; 573 574 if (dst->elem_buf.start == NULL) 575 return 0; 576 577 memcpy(dst->elem_buf.start, src->elem_buf.start, src_size); 578 579 return 1; 580 } 581 582 // "Finalize" the filter. This resizes the allocated heap buffers so that they 583 // are as small as possible. This also prevents new fields from being added 584 int ndb_filter_end(struct ndb_filter *filter) 585 { 586 #ifdef NDB_LOG 587 size_t orig_size; 588 #endif 589 size_t data_len, elem_len; 590 if (filter->finalized == 1) 591 return 0; 592 593 // move the data buffer to the end of the element buffer and update 594 // all of the element pointers accordingly 595 data_len = filter->data_buf.p - filter->data_buf.start; 596 elem_len = filter->elem_buf.p - filter->elem_buf.start; 597 #ifdef NDB_LOG 598 orig_size = filter->data_buf.end - filter->elem_buf.start; 599 #endif 600 601 // cap the elem buff 602 filter->elem_buf.end = filter->elem_buf.p; 603 604 // move the data buffer to the end of the element buffer 605 memmove(filter->elem_buf.p, filter->data_buf.start, data_len); 606 607 // realloc the whole thing 608 filter->elem_buf.start = realloc(filter->elem_buf.start, elem_len + data_len); 609 filter->elem_buf.end = filter->elem_buf.start + elem_len; 610 filter->elem_buf.p = filter->elem_buf.end; 611 612 filter->data_buf.start = filter->elem_buf.end; 613 filter->data_buf.end = filter->data_buf.start + data_len; 614 filter->data_buf.p = filter->data_buf.end; 615 616 filter->finalized = 1; 617 618 ndb_debug("ndb_filter_end: %ld -> %ld\n", orig_size, elem_len + data_len); 619 620 return 1; 621 } 622 623 static inline struct ndb_filter_elements * 624 ndb_filter_get_elements_by_offset(const struct ndb_filter *filter, int offset) 625 { 626 struct ndb_filter_elements *els; 627 628 if (offset < 0) 629 return NULL; 630 631 els = (struct ndb_filter_elements *)(filter->elem_buf.start + offset); 632 633 if ((unsigned char *)els > filter->elem_buf.p) 634 return NULL; 635 636 return els; 637 } 638 639 struct ndb_filter_elements * 640 ndb_filter_current_element(const struct ndb_filter *filter) 641 { 642 return ndb_filter_get_elements_by_offset(filter, filter->current); 643 } 644 645 struct ndb_filter_elements * 646 ndb_filter_get_elements(const struct ndb_filter *filter, int index) 647 { 648 if (filter->num_elements <= 0) 649 return NULL; 650 651 if (index > filter->num_elements-1) 652 return NULL; 653 654 return ndb_filter_get_elements_by_offset(filter, filter->elements[index]); 655 } 656 657 static inline unsigned char * 658 ndb_filter_elements_data(const struct ndb_filter *filter, int offset) 659 { 660 unsigned char *data; 661 662 if (offset < 0) 663 return NULL; 664 665 data = filter->data_buf.start + offset; 666 if (data > filter->data_buf.p) 667 return NULL; 668 669 return data; 670 } 671 672 unsigned char * 673 ndb_filter_get_id_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index) 674 { 675 return ndb_filter_elements_data(filter, els->elements[index]); 676 } 677 678 const char * 679 ndb_filter_get_string_element(const struct ndb_filter *filter, const struct ndb_filter_elements *els, int index) 680 { 681 return (const char *)ndb_filter_elements_data(filter, els->elements[index]); 682 } 683 684 uint64_t * 685 ndb_filter_get_int_element_ptr(struct ndb_filter_elements *els, int index) 686 { 687 return &els->elements[index]; 688 } 689 690 uint64_t 691 ndb_filter_get_int_element(const struct ndb_filter_elements *els, int index) 692 { 693 return els->elements[index]; 694 } 695 696 int ndb_filter_init_with(struct ndb_filter *filter, int pages) 697 { 698 struct cursor cur; 699 int page_size, elem_size, data_size, buf_size; 700 701 page_size = 4096; // assuming this, not a big deal if we're wrong 702 703 buf_size = page_size * pages; 704 elem_size = buf_size / 4; 705 data_size = buf_size - elem_size; 706 707 unsigned char *buf = malloc(buf_size); 708 if (!buf) 709 return 0; 710 711 // init memory arena for the cursor 712 make_cursor(buf, buf + buf_size, &cur); 713 714 cursor_slice(&cur, &filter->elem_buf, elem_size); 715 cursor_slice(&cur, &filter->data_buf, data_size); 716 717 // make sure we are fully allocated 718 assert(cur.p == cur.end); 719 720 // make sure elem_buf is the start of the buffer 721 assert(filter->elem_buf.start == cur.start); 722 723 filter->num_elements = 0; 724 filter->elements[0] = 0; 725 filter->current = -1; 726 filter->finalized = 0; 727 728 return 1; 729 } 730 731 int ndb_filter_init(struct ndb_filter *filter) { 732 return ndb_filter_init_with(filter, NDB_FILTER_PAGES); 733 } 734 735 void ndb_filter_destroy(struct ndb_filter *filter) 736 { 737 if (filter->elem_buf.start) 738 free(filter->elem_buf.start); 739 740 memset(filter, 0, sizeof(*filter)); 741 } 742 743 static const char *ndb_filter_field_name(enum ndb_filter_fieldtype field) 744 { 745 switch (field) { 746 case NDB_FILTER_IDS: return "ids"; 747 case NDB_FILTER_AUTHORS: return "authors"; 748 case NDB_FILTER_KINDS: return "kinds"; 749 case NDB_FILTER_TAGS: return "tags"; 750 case NDB_FILTER_SINCE: return "since"; 751 case NDB_FILTER_UNTIL: return "until"; 752 case NDB_FILTER_LIMIT: return "limit"; 753 case NDB_FILTER_SEARCH: return "search"; 754 } 755 756 return "unknown"; 757 } 758 759 static int ndb_filter_start_field_impl(struct ndb_filter *filter, enum ndb_filter_fieldtype field, char tag) 760 { 761 int i; 762 struct ndb_filter_elements *els, *el; 763 764 if (ndb_filter_current_element(filter)) { 765 fprintf(stderr, "ndb_filter_start_field: filter field already in progress, did you forget to call ndb_filter_end_field?\n"); 766 return 0; 767 } 768 769 // you can only start and end fields once 770 for (i = 0; i < filter->num_elements; i++) { 771 el = ndb_filter_get_elements(filter, i); 772 assert(el); 773 // TODO: fix this tags check to try to find matching tags 774 if (el->field.type == field && field != NDB_FILTER_TAGS) { 775 fprintf(stderr, "ndb_filter_start_field: field '%s' already exists\n", 776 ndb_filter_field_name(field)); 777 return 0; 778 } 779 } 780 781 filter->current = filter->elem_buf.p - filter->elem_buf.start; 782 els = ndb_filter_current_element(filter); 783 assert(els); 784 785 // advance elem buffer to the variable data section 786 if (!cursor_skip(&filter->elem_buf, sizeof(struct ndb_filter_elements))) { 787 fprintf(stderr, "ndb_filter_start_field: '%s' oom (todo: realloc?)\n", 788 ndb_filter_field_name(field)); 789 return 0; 790 } 791 792 els->field.type = field; 793 els->field.tag = tag; 794 els->field.elem_type = 0; 795 els->count = 0; 796 797 return 1; 798 } 799 800 int ndb_filter_start_field(struct ndb_filter *filter, enum ndb_filter_fieldtype field) 801 { 802 return ndb_filter_start_field_impl(filter, field, 0); 803 } 804 805 int ndb_filter_start_tag_field(struct ndb_filter *filter, char tag) 806 { 807 return ndb_filter_start_field_impl(filter, NDB_FILTER_TAGS, tag); 808 } 809 810 static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_element el) 811 { 812 struct ndb_filter_elements *current; 813 uint64_t offset; 814 815 if (!(current = ndb_filter_current_element(filter))) 816 return 0; 817 818 offset = filter->data_buf.p - filter->data_buf.start; 819 820 switch (current->field.type) { 821 case NDB_FILTER_IDS: 822 case NDB_FILTER_AUTHORS: 823 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 824 return 0; 825 break; 826 case NDB_FILTER_KINDS: 827 offset = el.integer; 828 break; 829 case NDB_FILTER_SINCE: 830 case NDB_FILTER_UNTIL: 831 case NDB_FILTER_LIMIT: 832 // only one allowed for since/until 833 if (current->count != 0) 834 return 0; 835 offset = el.integer; 836 break; 837 case NDB_FILTER_SEARCH: 838 if (current->field.elem_type != NDB_ELEMENT_STRING) { 839 return 0; 840 } 841 if (!cursor_push(&filter->data_buf, (unsigned char *)el.string.string, el.string.len)) 842 return 0; 843 if (!cursor_push_byte(&filter->data_buf, 0)) 844 return 0; 845 break; 846 case NDB_FILTER_TAGS: 847 switch (current->field.elem_type) { 848 case NDB_ELEMENT_ID: 849 if (!cursor_push(&filter->data_buf, (unsigned char *)el.id, 32)) 850 return 0; 851 break; 852 case NDB_ELEMENT_STRING: 853 if (!cursor_push(&filter->data_buf, (unsigned char *)el.string.string, el.string.len)) 854 return 0; 855 if (!cursor_push_byte(&filter->data_buf, 0)) 856 return 0; 857 break; 858 case NDB_ELEMENT_INT: 859 // ints are not allowed in tag filters 860 case NDB_ELEMENT_UNKNOWN: 861 return 0; 862 } 863 // push a pointer of the string in the databuf as an element 864 break; 865 } 866 867 if (!cursor_push(&filter->elem_buf, (unsigned char *)&offset, 868 sizeof(offset))) { 869 return 0; 870 } 871 872 current->count++; 873 874 return 1; 875 } 876 877 static int ndb_filter_set_elem_type(struct ndb_filter *filter, 878 enum ndb_generic_element_type elem_type) 879 { 880 enum ndb_generic_element_type current_elem_type; 881 struct ndb_filter_elements *current; 882 883 if (!(current = ndb_filter_current_element(filter))) 884 return 0; 885 886 current_elem_type = current->field.elem_type; 887 888 // element types must be uniform 889 if (current_elem_type != elem_type && current_elem_type != NDB_ELEMENT_UNKNOWN) { 890 fprintf(stderr, "ndb_filter_set_elem_type: element types must be uniform\n"); 891 return 0; 892 } 893 894 current->field.elem_type = elem_type; 895 896 return 1; 897 } 898 899 900 int ndb_filter_add_str_element_len(struct ndb_filter *filter, const char *str, int len) 901 { 902 union ndb_filter_element el; 903 struct ndb_filter_elements *current; 904 905 if (!(current = ndb_filter_current_element(filter))) 906 return 0; 907 908 // only generic tags and search queries are allowed to have strings 909 switch (current->field.type) { 910 case NDB_FILTER_SINCE: 911 case NDB_FILTER_UNTIL: 912 case NDB_FILTER_LIMIT: 913 case NDB_FILTER_IDS: 914 case NDB_FILTER_AUTHORS: 915 case NDB_FILTER_KINDS: 916 return 0; 917 case NDB_FILTER_SEARCH: 918 if (current->count == 1) { 919 // you can't add more than one string to a search 920 return 0; 921 } 922 break; 923 case NDB_FILTER_TAGS: 924 break; 925 } 926 927 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_STRING)) 928 return 0; 929 930 el.string.string = str; 931 el.string.len = len; 932 933 return ndb_filter_add_element(filter, el); 934 } 935 936 int ndb_filter_add_str_element(struct ndb_filter *filter, const char *str) 937 { 938 return ndb_filter_add_str_element_len(filter, str, strlen(str)); 939 } 940 941 int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer) 942 { 943 union ndb_filter_element el; 944 struct ndb_filter_elements *current; 945 if (!(current = ndb_filter_current_element(filter))) 946 return 0; 947 948 switch (current->field.type) { 949 case NDB_FILTER_IDS: 950 case NDB_FILTER_AUTHORS: 951 case NDB_FILTER_TAGS: 952 case NDB_FILTER_SEARCH: 953 return 0; 954 case NDB_FILTER_KINDS: 955 case NDB_FILTER_SINCE: 956 case NDB_FILTER_UNTIL: 957 case NDB_FILTER_LIMIT: 958 break; 959 } 960 961 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_INT)) 962 return 0; 963 964 el.integer = integer; 965 966 return ndb_filter_add_element(filter, el); 967 } 968 969 int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id) 970 { 971 union ndb_filter_element el; 972 struct ndb_filter_elements *current; 973 974 if (!(current = ndb_filter_current_element(filter))) 975 return 0; 976 977 // only certain filter types allow pushing id elements 978 switch (current->field.type) { 979 case NDB_FILTER_SINCE: 980 case NDB_FILTER_UNTIL: 981 case NDB_FILTER_LIMIT: 982 case NDB_FILTER_KINDS: 983 case NDB_FILTER_SEARCH: 984 return 0; 985 case NDB_FILTER_IDS: 986 case NDB_FILTER_AUTHORS: 987 case NDB_FILTER_TAGS: 988 break; 989 } 990 991 if (!ndb_filter_set_elem_type(filter, NDB_ELEMENT_ID)) 992 return 0; 993 994 // this is needed so that generic filters know its an id 995 el.id = id; 996 997 return ndb_filter_add_element(filter, el); 998 } 999 1000 static int ndb_tag_filter_matches(struct ndb_filter *filter, 1001 struct ndb_filter_elements *els, 1002 struct ndb_note *note) 1003 { 1004 int i; 1005 const unsigned char *id; 1006 const char *el_str; 1007 struct ndb_iterator iter, *it = &iter; 1008 struct ndb_str str; 1009 1010 ndb_tags_iterate_start(note, it); 1011 1012 while (ndb_tags_iterate_next(it)) { 1013 // we're looking for tags with 2 or more entries: ["p", id], etc 1014 if (it->tag->count < 2) 1015 continue; 1016 1017 str = ndb_tag_str(note, it->tag, 0); 1018 1019 // we only care about packed strings (single char, etc) 1020 if (str.flag != NDB_PACKED_STR) 1021 continue; 1022 1023 // do we have #e matching e (or p, etc) 1024 if (str.str[0] != els->field.tag || str.str[1] != 0) 1025 continue; 1026 1027 str = ndb_tag_str(note, it->tag, 1); 1028 1029 switch (els->field.elem_type) { 1030 case NDB_ELEMENT_ID: 1031 // if our filter element type is an id, then we 1032 // expect a packed id in the tag, otherwise skip 1033 if (str.flag != NDB_PACKED_ID) 1034 continue; 1035 break; 1036 case NDB_ELEMENT_STRING: 1037 // if our filter element type is a string, then 1038 // we should not expect an id 1039 if (str.flag == NDB_PACKED_ID) 1040 continue; 1041 1042 break; 1043 case NDB_ELEMENT_UNKNOWN: 1044 default: 1045 // For some reason the element type is not set. It's 1046 // possible nothing was added to the generic filter? 1047 // Let's just fail here and log a note for debugging 1048 fprintf(stderr, "UNUSUAL ndb_tag_filter_matches: have unknown element type %d\n", els->field.elem_type); 1049 return 0; 1050 } 1051 1052 for (i = 0; i < els->count; i++) { 1053 switch (els->field.elem_type) { 1054 case NDB_ELEMENT_ID: 1055 id = ndb_filter_get_id_element(filter, els, i); 1056 if (!memcmp(id, str.id, 32)) 1057 return 1; 1058 break; 1059 case NDB_ELEMENT_STRING: 1060 el_str = ndb_filter_get_string_element(filter, els, i); 1061 if (!strcmp(el_str, str.str)) 1062 return 1; 1063 break; 1064 case NDB_ELEMENT_INT: 1065 // int elements int tag queries are not supported 1066 case NDB_ELEMENT_UNKNOWN: 1067 return 0; 1068 } 1069 } 1070 } 1071 1072 return 0; 1073 } 1074 1075 struct search_id_state { 1076 struct ndb_filter *filter; 1077 struct ndb_filter_elements *els; 1078 unsigned char *key; 1079 }; 1080 1081 static int compare_ids(const void *pa, const void *pb) 1082 { 1083 unsigned char *a = *(unsigned char **)pa; 1084 unsigned char *b = *(unsigned char **)pb; 1085 1086 return memcmp(a, b, 32); 1087 } 1088 1089 static int search_ids(const void *ctx, const void *mid_ptr) 1090 { 1091 struct search_id_state *state; 1092 unsigned char *mid_id; 1093 uint32_t mid; 1094 1095 state = (struct search_id_state *)ctx; 1096 mid = *(uint32_t *)mid_ptr; 1097 1098 mid_id = ndb_filter_elements_data(state->filter, mid); 1099 assert(mid_id); 1100 1101 return memcmp(state->key, mid_id, 32); 1102 } 1103 1104 static int compare_kinds(const void *pa, const void *pb) 1105 { 1106 1107 // NOTE: this should match type in `union ndb_filter_element` 1108 uint64_t a = *(uint64_t *)pa; 1109 uint64_t b = *(uint64_t *)pb; 1110 1111 if (a < b) { 1112 return -1; 1113 } else if (a > b) { 1114 return 1; 1115 } else { 1116 return 0; 1117 } 1118 } 1119 1120 // 1121 // returns 1 if a filter matches a note 1122 static int ndb_filter_matches_with(struct ndb_filter *filter, 1123 struct ndb_note *note, int already_matched) 1124 { 1125 int i, j; 1126 struct ndb_filter_elements *els; 1127 struct search_id_state state; 1128 1129 state.filter = filter; 1130 1131 for (i = 0; i < filter->num_elements; i++) { 1132 els = ndb_filter_get_elements(filter, i); 1133 state.els = els; 1134 assert(els); 1135 1136 // if we know we already match from a query scan result, 1137 // we can skip this check 1138 if ((1 << els->field.type) & already_matched) 1139 continue; 1140 1141 switch (els->field.type) { 1142 case NDB_FILTER_KINDS: 1143 for (j = 0; j < els->count; j++) { 1144 if ((unsigned int)els->elements[j] == note->kind) 1145 goto cont; 1146 } 1147 break; 1148 case NDB_FILTER_IDS: 1149 state.key = ndb_note_id(note); 1150 if (bsearch(&state, &els->elements[0], els->count, 1151 sizeof(els->elements[0]), search_ids)) { 1152 continue; 1153 } 1154 break; 1155 case NDB_FILTER_AUTHORS: 1156 state.key = ndb_note_pubkey(note); 1157 if (bsearch(&state, &els->elements[0], els->count, 1158 sizeof(els->elements[0]), search_ids)) { 1159 continue; 1160 } 1161 break; 1162 case NDB_FILTER_TAGS: 1163 if (ndb_tag_filter_matches(filter, els, note)) 1164 continue; 1165 break; 1166 case NDB_FILTER_SINCE: 1167 assert(els->count == 1); 1168 if (note->created_at >= els->elements[0]) 1169 continue; 1170 break; 1171 case NDB_FILTER_UNTIL: 1172 assert(els->count == 1); 1173 if (note->created_at < els->elements[0]) 1174 continue; 1175 break; 1176 case NDB_FILTER_SEARCH: 1177 // TODO: matching search filters will need an accelerated 1178 // data structure, like our minimal perfect hashmap 1179 // idea for mutewords. 1180 // 1181 // We'll also want to store tokenized words in the filter 1182 // itself, so that we can walk over each word and check 1183 // the hashmap to see if the note contains at least 1184 // one word. 1185 // 1186 // For now we always return true, since we assume 1187 // the search index will be walked for these kinds 1188 // of queries. 1189 continue; 1190 case NDB_FILTER_LIMIT: 1191 cont: 1192 continue; 1193 } 1194 1195 // all need to match 1196 return 0; 1197 } 1198 1199 return 1; 1200 } 1201 1202 int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note) 1203 { 1204 return ndb_filter_matches_with(filter, note, 0); 1205 } 1206 1207 // because elements are stored as offsets and qsort doesn't support context, 1208 // we do a dumb thing where we convert elements to pointers and back if we 1209 // are doing an id or string sort 1210 static void sort_filter_elements(struct ndb_filter *filter, 1211 struct ndb_filter_elements *els, 1212 int (*cmp)(const void *, const void *)) 1213 { 1214 int i; 1215 1216 assert(ndb_filter_elem_is_ptr(&els->field)); 1217 1218 for (i = 0; i < els->count; i++) 1219 els->elements[i] += (uint64_t)filter->data_buf.start; 1220 1221 qsort(&els->elements[0], els->count, sizeof(els->elements[0]), cmp); 1222 1223 for (i = 0; i < els->count; i++) 1224 els->elements[i] -= (uint64_t)filter->data_buf.start; 1225 } 1226 1227 static int ndb_filter_field_eq(struct ndb_filter *a_filt, 1228 struct ndb_filter_elements *a_field, 1229 struct ndb_filter *b_filt, 1230 struct ndb_filter_elements *b_field) 1231 { 1232 int i; 1233 const char *a_str, *b_str; 1234 unsigned char *a_id, *b_id; 1235 uint64_t a_int, b_int; 1236 1237 if (a_field->count != b_field->count) 1238 return 0; 1239 1240 if (a_field->field.type != b_field->field.type) { 1241 ndb_debug("UNUSUAL: field types do not match in ndb_filter_field_eq\n"); 1242 return 0; 1243 } 1244 1245 if (a_field->field.elem_type != b_field->field.elem_type) { 1246 ndb_debug("UNUSUAL: field element types do not match in ndb_filter_field_eq\n"); 1247 return 0; 1248 } 1249 1250 if (a_field->field.elem_type == NDB_ELEMENT_UNKNOWN) { 1251 ndb_debug("UNUSUAL: field element types are unknown\n"); 1252 return 0; 1253 } 1254 1255 for (i = 0; i < a_field->count; i++) { 1256 switch (a_field->field.elem_type) { 1257 case NDB_ELEMENT_UNKNOWN: 1258 return 0; 1259 case NDB_ELEMENT_STRING: 1260 a_str = ndb_filter_get_string_element(a_filt, a_field, i); 1261 b_str = ndb_filter_get_string_element(b_filt, b_field, i); 1262 if (strcmp(a_str, b_str)) 1263 return 0; 1264 break; 1265 case NDB_ELEMENT_ID: 1266 a_id = ndb_filter_get_id_element(a_filt, a_field, i); 1267 b_id = ndb_filter_get_id_element(b_filt, b_field, i); 1268 if (memcmp(a_id, b_id, 32)) 1269 return 0; 1270 break; 1271 case NDB_ELEMENT_INT: 1272 a_int = ndb_filter_get_int_element(a_field, i); 1273 b_int = ndb_filter_get_int_element(b_field, i); 1274 if (a_int != b_int) 1275 return 0; 1276 break; 1277 } 1278 } 1279 1280 return 1; 1281 } 1282 1283 void ndb_filter_end_field(struct ndb_filter *filter) 1284 { 1285 int cur_offset; 1286 struct ndb_filter_elements *cur; 1287 1288 cur_offset = filter->current; 1289 1290 if (!(cur = ndb_filter_current_element(filter))) 1291 return; 1292 1293 filter->elements[filter->num_elements++] = cur_offset; 1294 1295 // sort elements for binary search 1296 switch (cur->field.type) { 1297 case NDB_FILTER_IDS: 1298 case NDB_FILTER_AUTHORS: 1299 sort_filter_elements(filter, cur, compare_ids); 1300 break; 1301 case NDB_FILTER_KINDS: 1302 qsort(&cur->elements[0], cur->count, 1303 sizeof(cur->elements[0]), compare_kinds); 1304 break; 1305 case NDB_FILTER_TAGS: 1306 // TODO: generic tag search sorting 1307 break; 1308 case NDB_FILTER_SINCE: 1309 case NDB_FILTER_UNTIL: 1310 case NDB_FILTER_LIMIT: 1311 case NDB_FILTER_SEARCH: 1312 // don't need to sort these 1313 break; 1314 } 1315 1316 filter->current = -1; 1317 1318 } 1319 1320 static void ndb_filter_group_init(struct ndb_filter_group *group) 1321 { 1322 group->num_filters = 0; 1323 } 1324 1325 static int ndb_filter_group_add(struct ndb_filter_group *group, 1326 struct ndb_filter *filter) 1327 { 1328 if (group->num_filters + 1 > MAX_FILTERS) 1329 return 0; 1330 1331 return ndb_filter_clone(&group->filters[group->num_filters++], filter); 1332 } 1333 1334 static int ndb_filter_group_matches(struct ndb_filter_group *group, 1335 struct ndb_note *note) 1336 { 1337 int i; 1338 struct ndb_filter *filter; 1339 1340 if (group->num_filters == 0) 1341 return 1; 1342 1343 for (i = 0; i < group->num_filters; i++) { 1344 filter = &group->filters[i]; 1345 1346 if (ndb_filter_matches(filter, note)) 1347 return 1; 1348 } 1349 1350 return 0; 1351 } 1352 1353 static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id, 1354 uint64_t timestamp, const char *search) 1355 { 1356 memcpy(key->id, id, 32); 1357 key->timestamp = timestamp; 1358 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 1359 key->search[sizeof(key->search) - 1] = '\0'; 1360 } 1361 1362 static int ndb_write_profile_search_index(struct ndb_txn *txn, 1363 struct ndb_search_key *index_key, 1364 uint64_t profile_key) 1365 { 1366 int rc; 1367 MDB_val key, val; 1368 1369 key.mv_data = index_key; 1370 key.mv_size = sizeof(*index_key); 1371 val.mv_data = &profile_key; 1372 val.mv_size = sizeof(profile_key); 1373 1374 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 1375 &key, &val, 0))) 1376 { 1377 ndb_debug("ndb_write_profile_search_index failed: %s\n", 1378 mdb_strerror(rc)); 1379 return 0; 1380 } 1381 1382 return 1; 1383 } 1384 1385 1386 // map usernames and display names to profile keys for user searching 1387 static int ndb_write_profile_search_indices(struct ndb_txn *txn, 1388 struct ndb_note *note, 1389 uint64_t profile_key, 1390 void *profile_root) 1391 { 1392 struct ndb_search_key index; 1393 NdbProfileRecord_table_t profile_record; 1394 NdbProfile_table_t profile; 1395 1396 profile_record = NdbProfileRecord_as_root(profile_root); 1397 profile = NdbProfileRecord_profile_get(profile_record); 1398 1399 const char *name = NdbProfile_name_get(profile); 1400 const char *display_name = NdbProfile_display_name_get(profile); 1401 1402 // words + pubkey + created 1403 if (name) { 1404 ndb_make_search_key(&index, note->pubkey, note->created_at, 1405 name); 1406 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 1407 return 0; 1408 } 1409 1410 if (display_name) { 1411 // don't write the same name/display_name twice 1412 if (name && !strcmp(display_name, name)) { 1413 return 1; 1414 } 1415 ndb_make_search_key(&index, note->pubkey, note->created_at, 1416 display_name); 1417 if (!ndb_write_profile_search_index(txn, &index, profile_key)) 1418 return 0; 1419 } 1420 1421 return 1; 1422 } 1423 1424 static inline void ndb_tsid_init(struct ndb_tsid *key, unsigned char *id, 1425 uint64_t timestamp) 1426 { 1427 memcpy(key->id, id, 32); 1428 key->timestamp = timestamp; 1429 } 1430 1431 static inline void ndb_tsid_low(struct ndb_tsid *key, unsigned char *id) 1432 { 1433 memcpy(key->id, id, 32); 1434 key->timestamp = 0; 1435 } 1436 1437 static inline void ndb_u64_ts_init(struct ndb_u64_ts *key, uint64_t integer, 1438 uint64_t timestamp) 1439 { 1440 key->u64 = integer; 1441 key->timestamp = timestamp; 1442 } 1443 1444 // useful for range-searching for the latest key with a clustered created_at timen 1445 static inline void ndb_tsid_high(struct ndb_tsid *key, const unsigned char *id) 1446 { 1447 memcpy(key->id, id, 32); 1448 key->timestamp = UINT64_MAX; 1449 } 1450 1451 static int _ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn, int flags) 1452 { 1453 txn->lmdb = &ndb->lmdb; 1454 MDB_txn **mdb_txn = (MDB_txn **)&txn->mdb_txn; 1455 if (!txn->lmdb->env) 1456 return 0; 1457 return mdb_txn_begin(txn->lmdb->env, NULL, flags, mdb_txn) == 0; 1458 } 1459 1460 int ndb_begin_query(struct ndb *ndb, struct ndb_txn *txn) 1461 { 1462 return _ndb_begin_query(ndb, txn, MDB_RDONLY); 1463 } 1464 1465 // this should only be used in migrations, etc 1466 static int ndb_begin_rw_query(struct ndb *ndb, struct ndb_txn *txn) 1467 { 1468 return _ndb_begin_query(ndb, txn, 0); 1469 } 1470 1471 static int ndb_db_is_index(enum ndb_dbs index) 1472 { 1473 switch (index) { 1474 case NDB_DB_NOTE: 1475 case NDB_DB_META: 1476 case NDB_DB_PROFILE: 1477 case NDB_DB_NOTE_ID: 1478 case NDB_DB_NDB_META: 1479 case NDB_DB_PROFILE_SEARCH: 1480 case NDB_DB_PROFILE_LAST_FETCH: 1481 case NDB_DB_NOTE_RELAYS: 1482 case NDB_DBS: 1483 return 0; 1484 case NDB_DB_PROFILE_PK: 1485 case NDB_DB_NOTE_KIND: 1486 case NDB_DB_NOTE_TEXT: 1487 case NDB_DB_NOTE_BLOCKS: 1488 case NDB_DB_NOTE_TAGS: 1489 case NDB_DB_NOTE_PUBKEY: 1490 case NDB_DB_NOTE_PUBKEY_KIND: 1491 case NDB_DB_NOTE_RELAY_KIND: 1492 return 1; 1493 } 1494 1495 return 0; 1496 } 1497 1498 static inline void ndb_id_u64_ts_init(struct ndb_id_u64_ts *key, 1499 unsigned char *id, uint64_t iu64, 1500 uint64_t timestamp) 1501 { 1502 memcpy(key->id, id, 32); 1503 key->u64 = iu64; 1504 key->timestamp = timestamp; 1505 } 1506 1507 // formats the relay url buffer for the NDB_DB_NOTE_RELAYS value. It's a 1508 // null terminated string padded to 8 bytes (we must keep the entire database 1509 // aligned to 8 bytes at all times) 1510 static int prepare_relay_buf(char *relay_buf, int bufsize, const char *relay, 1511 int relay_len) 1512 { 1513 struct cursor cur; 1514 1515 // make sure the size of the buffer is aligned 1516 assert((bufsize % 8) == 0); 1517 1518 make_cursor((unsigned char *)relay_buf, (unsigned char *)relay_buf + bufsize, &cur); 1519 1520 // push the relay string 1521 if (!cursor_push(&cur, (unsigned char *)relay, relay_len)) 1522 return 0; 1523 1524 // relay urls are null terminated for convenience 1525 if (!cursor_push_byte(&cur, 0)) 1526 return 0; 1527 1528 // align the buffer 1529 if (!cursor_align(&cur, 8)) 1530 return 0; 1531 1532 return cur.p - cur.start; 1533 } 1534 1535 // Write to the note_id -> relay_url database. This records where notes 1536 // have been seen 1537 static int ndb_write_note_relay(struct ndb_txn *txn, uint64_t note_key, 1538 const char *relay, int relay_len) 1539 { 1540 char relay_buf[256]; 1541 int rc, len; 1542 MDB_val k, v; 1543 1544 if (relay == NULL || relay_len == 0) 1545 return 0; 1546 1547 if (!(len = prepare_relay_buf(relay_buf, sizeof(relay_buf), relay, relay_len))) { 1548 fprintf(stderr, "relay url '%s' too large when writing note relay index\n", relay); 1549 return 0; 1550 } 1551 1552 assert((len % 8) == 0); 1553 1554 k.mv_data = ¬e_key; 1555 k.mv_size = sizeof(note_key); 1556 1557 v.mv_data = relay_buf; 1558 v.mv_size = len; 1559 1560 // NODUPDATA is specified so that we don't accidently add duplicate 1561 // key/value pairs 1562 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAYS], 1563 &k, &v, MDB_NODUPDATA))) 1564 { 1565 ndb_debug("ndb_write_note_relay failed: %s\n", mdb_strerror(rc)); 1566 return 0; 1567 } 1568 1569 return 1; 1570 } 1571 1572 static int ndb_write_note_relay_kind_index(struct ndb_txn *txn, 1573 uint64_t kind, 1574 uint64_t note_key, 1575 uint64_t created_at, 1576 const char *relay, 1577 int relay_len) 1578 { 1579 // The relay kind key has a layout like so 1580 // 1581 // - note_key: 00 + 8 bytes 1582 // - kind: 08 + 8 bytes 1583 // - created_at: 16 + 8 bytes 1584 // - relay_url_size: 24 + 1 byte 1585 // - relay_url: 25 + n byte null-terminated string 1586 // - pad to 8 byte alignment 1587 1588 unsigned char buf[256]; 1589 int rc; 1590 struct cursor cur; 1591 MDB_val k, v; 1592 1593 // come on bro 1594 if (relay_len > 248) 1595 return 0; 1596 1597 if (relay == NULL || relay_len == 0) 1598 return 0; 1599 1600 make_cursor(buf, buf + sizeof(buf), &cur); 1601 1602 if (!cursor_push(&cur, (unsigned char *)¬e_key, 8)) return 0; 1603 if (!cursor_push(&cur, (unsigned char *)&kind, 8)) return 0; 1604 if (!cursor_push(&cur, (unsigned char *)&created_at, 8)) return 0; 1605 if (!cursor_push_byte(&cur, (uint8_t)relay_len)) return 0; 1606 if (!cursor_push(&cur, (unsigned char *)relay, relay_len)) return 0; 1607 if (!cursor_align(&cur, 8)) return 0; 1608 1609 assert(((cur.p-cur.start)%8) == 0); 1610 1611 k.mv_data = cur.start; 1612 k.mv_size = cur.p - cur.start; 1613 1614 v.mv_data = NULL; 1615 v.mv_size = 0; 1616 1617 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAY_KIND], &k, &v, 0))) { 1618 fprintf(stderr, "write note relay kind index failed: %s\n", 1619 mdb_strerror(rc)); 1620 return 0; 1621 } 1622 1623 return 1; 1624 } 1625 1626 static int ndb_write_note_pubkey_index(struct ndb_txn *txn, struct ndb_note *note, 1627 uint64_t note_key) 1628 { 1629 int rc; 1630 struct ndb_tsid key; 1631 MDB_val k, v; 1632 1633 ndb_tsid_init(&key, ndb_note_pubkey(note), ndb_note_created_at(note)); 1634 1635 k.mv_data = &key; 1636 k.mv_size = sizeof(key); 1637 1638 v.mv_data = ¬e_key; 1639 v.mv_size = sizeof(note_key); 1640 1641 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY], &k, &v, 0))) { 1642 fprintf(stderr, "write note pubkey index failed: %s\n", 1643 mdb_strerror(rc)); 1644 return 0; 1645 } 1646 1647 return 1; 1648 } 1649 1650 static int ndb_write_note_pubkey_kind_index(struct ndb_txn *txn, 1651 struct ndb_note *note, 1652 uint64_t note_key) 1653 { 1654 int rc; 1655 struct ndb_id_u64_ts key; 1656 MDB_val k, v; 1657 1658 ndb_id_u64_ts_init(&key, ndb_note_pubkey(note), ndb_note_kind(note), 1659 ndb_note_created_at(note)); 1660 1661 k.mv_data = &key; 1662 k.mv_size = sizeof(key); 1663 1664 v.mv_data = ¬e_key; 1665 v.mv_size = sizeof(note_key); 1666 1667 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND], &k, &v, 0))) { 1668 fprintf(stderr, "write note pubkey_kind index failed: %s\n", 1669 mdb_strerror(rc)); 1670 return 0; 1671 } 1672 1673 return 1; 1674 } 1675 1676 1677 static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, int num_indices) 1678 { 1679 MDB_val k, v; 1680 MDB_cursor *cur; 1681 int i, drop_dbi, count, rc; 1682 uint64_t note_key; 1683 struct ndb_note *note; 1684 enum ndb_dbs index; 1685 1686 // 0 means empty, not delete the dbi 1687 drop_dbi = 0; 1688 1689 // ensure they are all index dbs 1690 for (i = 0; i < num_indices; i++) { 1691 if (!ndb_db_is_index(indices[i])) { 1692 fprintf(stderr, "ndb_rebuild_note_index: %s is not an index db\n", ndb_db_name(indices[i])); 1693 return -1; 1694 } 1695 } 1696 1697 // empty the index dbs before we rebuild 1698 for (i = 0; i < num_indices; i++) { 1699 index = indices[i]; 1700 if (mdb_drop(txn->mdb_txn, index, drop_dbi)) { 1701 fprintf(stderr, "ndb_rebuild_pubkey_index: mdb_drop failed for %s\n", ndb_db_name(index)); 1702 return -1; 1703 } 1704 } 1705 1706 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE], &cur))) { 1707 fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); 1708 return -1; 1709 } 1710 1711 count = 0; 1712 1713 // loop through all notes and write search indices 1714 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1715 note = v.mv_data; 1716 note_key = *((uint64_t*)k.mv_data); 1717 1718 for (i = 0; i < num_indices; i++) { 1719 index = indices[i]; 1720 switch (index) { 1721 case NDB_DB_NOTE: 1722 case NDB_DB_META: 1723 case NDB_DB_PROFILE: 1724 case NDB_DB_NOTE_ID: 1725 case NDB_DB_NDB_META: 1726 case NDB_DB_PROFILE_SEARCH: 1727 case NDB_DB_PROFILE_LAST_FETCH: 1728 case NDB_DB_NOTE_RELAYS: 1729 case NDB_DBS: 1730 // this should never happen since we check at 1731 // the start 1732 count = -1; 1733 goto cleanup; 1734 case NDB_DB_PROFILE_PK: 1735 case NDB_DB_NOTE_KIND: 1736 case NDB_DB_NOTE_TEXT: 1737 case NDB_DB_NOTE_BLOCKS: 1738 case NDB_DB_NOTE_TAGS: 1739 fprintf(stderr, "%s index rebuild not supported yet. sorry.\n", ndb_db_name(index)); 1740 count = -1; 1741 goto cleanup; 1742 case NDB_DB_NOTE_PUBKEY: 1743 if (!ndb_write_note_pubkey_index(txn, note, note_key)) { 1744 count = -1; 1745 goto cleanup; 1746 } 1747 break; 1748 case NDB_DB_NOTE_RELAY_KIND: 1749 fprintf(stderr, "it doesn't make sense to rebuild note relay kind index\n"); 1750 return 0; 1751 case NDB_DB_NOTE_PUBKEY_KIND: 1752 if (!ndb_write_note_pubkey_kind_index(txn, note, note_key)) { 1753 count = -1; 1754 goto cleanup; 1755 } 1756 break; 1757 } 1758 } 1759 1760 count++; 1761 } 1762 1763 cleanup: 1764 mdb_cursor_close(cur); 1765 1766 return count; 1767 } 1768 1769 1770 // Migrations 1771 // 1772 1773 // This was before we had note_profile_pubkey{,_kind} indices. Let's create them. 1774 static int ndb_migrate_profile_indices(struct ndb_txn *txn) 1775 { 1776 int count; 1777 1778 enum ndb_dbs indices[] = {NDB_DB_NOTE_PUBKEY, NDB_DB_NOTE_PUBKEY_KIND}; 1779 if ((count = ndb_rebuild_note_indices(txn, indices, 2)) != -1) { 1780 fprintf(stderr, "migrated %d notes to have pubkey and pubkey_kind indices\n", count); 1781 return 1; 1782 } else { 1783 fprintf(stderr, "error migrating notes to have pubkey and pubkey_kind indices, aborting.\n"); 1784 return 0; 1785 } 1786 } 1787 1788 static int ndb_migrate_user_search_indices(struct ndb_txn *txn) 1789 { 1790 int rc; 1791 MDB_cursor *cur; 1792 MDB_val k, v; 1793 void *profile_root; 1794 NdbProfileRecord_table_t record; 1795 struct ndb_note *note; 1796 uint64_t note_key, profile_key; 1797 size_t len; 1798 int count; 1799 1800 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { 1801 fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); 1802 return 0; 1803 } 1804 1805 count = 0; 1806 1807 // loop through all profiles and write search indices 1808 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 1809 profile_root = v.mv_data; 1810 profile_key = *((uint64_t*)k.mv_data); 1811 record = NdbProfileRecord_as_root(profile_root); 1812 note_key = NdbProfileRecord_note_key(record); 1813 note = ndb_get_note_by_key(txn, note_key, &len); 1814 1815 if (note == NULL) { 1816 continue; 1817 } 1818 1819 if (!ndb_write_profile_search_indices(txn, note, profile_key, 1820 profile_root)) { 1821 fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n"); 1822 continue; 1823 } 1824 1825 count++; 1826 } 1827 1828 fprintf(stderr, "migrated %d profiles to include search indices\n", count); 1829 1830 mdb_cursor_close(cur); 1831 1832 return 1; 1833 } 1834 1835 static int ndb_migrate_lower_user_search_indices(struct ndb_txn *txn) 1836 { 1837 // just drop the search db so we can rebuild it 1838 if (mdb_drop(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 0)) { 1839 fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n"); 1840 return 0; 1841 } 1842 1843 return ndb_migrate_user_search_indices(txn); 1844 } 1845 1846 int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile); 1847 1848 1849 int ndb_db_version(struct ndb_txn *txn) 1850 { 1851 uint64_t version, version_key; 1852 MDB_val k, v; 1853 1854 version_key = NDB_META_KEY_VERSION; 1855 k.mv_data = &version_key; 1856 k.mv_size = sizeof(version_key); 1857 1858 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &k, &v)) { 1859 version = -1; 1860 } else { 1861 if (v.mv_size != 8) { 1862 fprintf(stderr, "run_migrations: invalid version size?"); 1863 return 0; 1864 } 1865 version = *((uint64_t*)v.mv_data); 1866 } 1867 1868 return version; 1869 } 1870 1871 // custom pubkey+kind+timestamp comparison function. This is used by lmdb to 1872 // perform b+ tree searches over the pubkey+kind+timestamp index 1873 static int ndb_id_u64_ts_compare(const MDB_val *a, const MDB_val *b) 1874 { 1875 struct ndb_id_u64_ts *tsa, *tsb; 1876 MDB_val a2 = *a, b2 = *b; 1877 1878 a2.mv_size = sizeof(tsa->id); 1879 b2.mv_size = sizeof(tsb->id); 1880 1881 int cmp = mdb_cmp_memn(&a2, &b2); 1882 if (cmp) return cmp; 1883 1884 tsa = a->mv_data; 1885 tsb = b->mv_data; 1886 1887 if (tsa->u64 < tsb->u64) 1888 return -1; 1889 else if (tsa->u64 > tsb->u64) 1890 return 1; 1891 1892 if (tsa->timestamp < tsb->timestamp) 1893 return -1; 1894 else if (tsa->timestamp > tsb->timestamp) 1895 return 1; 1896 1897 return 0; 1898 } 1899 1900 // custom kind+timestamp comparison function. This is used by lmdb to perform 1901 // b+ tree searches over the kind+timestamp index 1902 static int ndb_u64_ts_compare(const MDB_val *a, const MDB_val *b) 1903 { 1904 struct ndb_u64_ts *tsa, *tsb; 1905 tsa = a->mv_data; 1906 tsb = b->mv_data; 1907 1908 if (tsa->u64 < tsb->u64) 1909 return -1; 1910 else if (tsa->u64 > tsb->u64) 1911 return 1; 1912 1913 if (tsa->timestamp < tsb->timestamp) 1914 return -1; 1915 else if (tsa->timestamp > tsb->timestamp) 1916 return 1; 1917 1918 return 0; 1919 } 1920 1921 static int ndb_tsid_compare(const MDB_val *a, const MDB_val *b) 1922 { 1923 struct ndb_tsid *tsa, *tsb; 1924 MDB_val a2 = *a, b2 = *b; 1925 1926 a2.mv_size = sizeof(tsa->id); 1927 b2.mv_size = sizeof(tsb->id); 1928 1929 int cmp = mdb_cmp_memn(&a2, &b2); 1930 if (cmp) return cmp; 1931 1932 tsa = a->mv_data; 1933 tsb = b->mv_data; 1934 1935 if (tsa->timestamp < tsb->timestamp) 1936 return -1; 1937 else if (tsa->timestamp > tsb->timestamp) 1938 return 1; 1939 return 0; 1940 } 1941 1942 enum ndb_ingester_msgtype { 1943 NDB_INGEST_EVENT, // write json to the ingester queue for processing 1944 NDB_INGEST_QUIT, // kill ingester thread immediately 1945 }; 1946 1947 struct ndb_ingester_event { 1948 const char *relay; 1949 char *json; 1950 unsigned client : 1; // ["EVENT", {...}] messages 1951 unsigned len : 31; 1952 }; 1953 1954 struct ndb_writer_note_relay { 1955 const char *relay; 1956 uint64_t note_key; 1957 uint64_t kind; 1958 uint64_t created_at; 1959 }; 1960 1961 struct ndb_writer_note { 1962 struct ndb_note *note; 1963 size_t note_len; 1964 const char *relay; 1965 }; 1966 1967 struct ndb_writer_profile { 1968 struct ndb_writer_note note; 1969 struct ndb_profile_record_builder record; 1970 }; 1971 1972 struct ndb_ingester_msg { 1973 enum ndb_ingester_msgtype type; 1974 union { 1975 struct ndb_ingester_event event; 1976 }; 1977 }; 1978 1979 struct ndb_writer_ndb_meta { 1980 // these are 64 bit because I'm paranoid of db-wide alignment issues 1981 uint64_t version; 1982 }; 1983 1984 // Used in the writer thread when writing ndb_profile_fetch_record's 1985 // kv = pubkey: recor 1986 struct ndb_writer_last_fetch { 1987 unsigned char pubkey[32]; 1988 uint64_t fetched_at; 1989 }; 1990 1991 // write note blocks 1992 struct ndb_writer_blocks { 1993 struct ndb_blocks *blocks; 1994 uint64_t note_key; 1995 }; 1996 1997 // The different types of messages that the writer thread can write to the 1998 // database 1999 struct ndb_writer_msg { 2000 enum ndb_writer_msgtype type; 2001 union { 2002 struct ndb_writer_note_relay note_relay; 2003 struct ndb_writer_note note; 2004 struct ndb_writer_profile profile; 2005 struct ndb_writer_ndb_meta ndb_meta; 2006 struct ndb_writer_last_fetch last_fetch; 2007 struct ndb_writer_blocks blocks; 2008 }; 2009 }; 2010 2011 static inline int ndb_writer_queue_msg(struct ndb_writer *writer, 2012 struct ndb_writer_msg *msg) 2013 { 2014 return prot_queue_push(&writer->inbox, msg); 2015 } 2016 2017 static uint64_t ndb_write_note_and_profile(struct ndb_txn *txn, struct ndb_writer_profile *profile, unsigned char *scratch, size_t scratch_size, uint32_t ndb_flags); 2018 static int ndb_migrate_utf8_profile_names(struct ndb_txn *txn) 2019 { 2020 int rc; 2021 MDB_cursor *cur; 2022 MDB_val k, v; 2023 void *profile_root; 2024 NdbProfileRecord_table_t record; 2025 struct ndb_note *note, *copied_note; 2026 uint64_t note_key; 2027 size_t len; 2028 int count, failed, ret; 2029 struct ndb_writer_profile profile; 2030 2031 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { 2032 fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc); 2033 return 0; 2034 } 2035 2036 size_t scratch_size = 8 * 1024 * 1024; 2037 unsigned char *scratch = malloc(scratch_size); 2038 2039 ret = 1; 2040 count = 0; 2041 failed = 0; 2042 2043 // loop through all profiles and write search indices 2044 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 2045 profile_root = v.mv_data; 2046 record = NdbProfileRecord_as_root(profile_root); 2047 note_key = NdbProfileRecord_note_key(record); 2048 note = ndb_get_note_by_key(txn, note_key, &len); 2049 2050 if (note == NULL) { 2051 failed++; 2052 continue; 2053 } 2054 2055 struct ndb_profile_record_builder *b = &profile.record; 2056 2057 // reprocess profile 2058 if (!ndb_process_profile_note(note, b)) { 2059 failed++; 2060 continue; 2061 } 2062 2063 // the writer needs to own this note, and its expected to free it 2064 copied_note = malloc(len); 2065 memcpy(copied_note, note, len); 2066 2067 profile.note.note = copied_note; 2068 profile.note.note_len = len; 2069 2070 // we don't pass in flags when migrating... a bit sketchy but 2071 // whatever. noone is using this to customize nostrdb atm 2072 if (ndb_write_note_and_profile(txn, &profile, scratch, scratch_size, 0)) { 2073 count++; 2074 } 2075 } 2076 2077 fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count); 2078 2079 if (failed != 0) { 2080 fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed); 2081 } 2082 2083 free(scratch); 2084 mdb_cursor_close(cur); 2085 2086 return ret; 2087 } 2088 2089 static struct ndb_migration MIGRATIONS[] = { 2090 { .fn = ndb_migrate_user_search_indices }, 2091 { .fn = ndb_migrate_lower_user_search_indices }, 2092 { .fn = ndb_migrate_utf8_profile_names }, 2093 { .fn = ndb_migrate_profile_indices }, 2094 }; 2095 2096 2097 int ndb_end_query(struct ndb_txn *txn) 2098 { 2099 // this works on read or write queries. 2100 return mdb_txn_commit(txn->mdb_txn) == 0; 2101 } 2102 2103 int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32], 2104 unsigned char sig[64]) 2105 { 2106 secp256k1_xonly_pubkey xonly_pubkey; 2107 int ok; 2108 2109 ok = secp256k1_xonly_pubkey_parse((secp256k1_context*)ctx, &xonly_pubkey, 2110 pubkey) != 0; 2111 if (!ok) return 0; 2112 2113 ok = secp256k1_schnorrsig_verify((secp256k1_context*)ctx, sig, id, 32, 2114 &xonly_pubkey) > 0; 2115 if (!ok) return 0; 2116 2117 return 1; 2118 } 2119 2120 static void ndb_writer_last_profile_fetch(struct ndb_txn *txn, 2121 const unsigned char *pubkey, 2122 uint64_t fetched_at) 2123 { 2124 int rc; 2125 MDB_val key, val; 2126 2127 key.mv_data = (unsigned char*)pubkey; 2128 key.mv_size = 32; 2129 val.mv_data = &fetched_at; 2130 val.mv_size = sizeof(fetched_at); 2131 2132 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], 2133 &key, &val, 0))) 2134 { 2135 ndb_debug("write version to ndb_meta failed: %s\n", 2136 mdb_strerror(rc)); 2137 return; 2138 } 2139 2140 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 2141 } 2142 2143 2144 // We just received a profile that we haven't processed yet, but it could 2145 // be an older one! Make sure we only write last fetched profile if it's a new 2146 // one 2147 // 2148 // To do this, we first check the latest profile in the database. If the 2149 // created_date for this profile note is newer, then we write a 2150 // last_profile_fetch record, otherwise we do not. 2151 // 2152 // WARNING: This function is only valid when called from the writer thread 2153 static int ndb_maybe_write_last_profile_fetch(struct ndb_txn *txn, 2154 struct ndb_note *note) 2155 { 2156 size_t len; 2157 uint64_t profile_key, note_key; 2158 void *root; 2159 struct ndb_note *last_profile; 2160 NdbProfileRecord_table_t record; 2161 2162 if ((root = ndb_get_profile_by_pubkey(txn, note->pubkey, &len, &profile_key))) { 2163 record = NdbProfileRecord_as_root(root); 2164 note_key = NdbProfileRecord_note_key(record); 2165 last_profile = ndb_get_note_by_key(txn, note_key, &len); 2166 if (last_profile == NULL) { 2167 return 0; 2168 } 2169 2170 // found profile, let's see if it's newer than ours 2171 if (note->created_at > last_profile->created_at) { 2172 // this is a new profile note, record last fetched time 2173 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 2174 } 2175 } else { 2176 // couldn't fetch profile. record last fetched time 2177 ndb_writer_last_profile_fetch(txn, note->pubkey, time(NULL)); 2178 } 2179 2180 return 1; 2181 } 2182 2183 int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey, 2184 uint64_t fetched_at) 2185 { 2186 struct ndb_writer_msg msg; 2187 msg.type = NDB_WRITER_PROFILE_LAST_FETCH; 2188 memcpy(&msg.last_fetch.pubkey[0], pubkey, 32); 2189 msg.last_fetch.fetched_at = fetched_at; 2190 2191 return ndb_writer_queue_msg(&ndb->writer, &msg); 2192 } 2193 2194 2195 // When doing cursor scans from greatest to lowest, this function positions the 2196 // cursor at the first element before descending. MDB_SET_RANGE puts us right 2197 // after the first element, so we have to go back one. 2198 static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v) 2199 { 2200 // Position cursor at the next key greater than or equal to the 2201 // specified key 2202 if (mdb_cursor_get(cur, k, v, MDB_SET_RANGE)) { 2203 // Failed :(. It could be the last element? 2204 if (mdb_cursor_get(cur, k, v, MDB_LAST)) 2205 return 0; 2206 } else { 2207 // if set range worked and our key exists, it should be 2208 // the one right before this one 2209 if (mdb_cursor_get(cur, k, v, MDB_PREV)) 2210 return 0; 2211 } 2212 2213 return 1; 2214 } 2215 2216 2217 // get some value based on a clustered id key 2218 int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id, 2219 MDB_val *val) 2220 { 2221 MDB_val k, v; 2222 MDB_cursor *cur; 2223 int success = 0, rc; 2224 struct ndb_tsid tsid; 2225 2226 // position at the most recent 2227 ndb_tsid_high(&tsid, id); 2228 2229 k.mv_data = &tsid; 2230 k.mv_size = sizeof(tsid); 2231 2232 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[db], &cur))) { 2233 ndb_debug("ndb_get_tsid: failed to open cursor: '%s'\n", mdb_strerror(rc)); 2234 return 0; 2235 } 2236 2237 if (!ndb_cursor_start(cur, &k, &v)) 2238 goto cleanup; 2239 2240 if (memcmp(k.mv_data, id, 32) == 0) { 2241 *val = v; 2242 success = 1; 2243 } 2244 2245 cleanup: 2246 mdb_cursor_close(cur); 2247 return success; 2248 } 2249 2250 static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key, 2251 enum ndb_dbs store, size_t *len) 2252 { 2253 MDB_val k, v; 2254 2255 k.mv_data = &key; 2256 k.mv_size = sizeof(key); 2257 2258 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 2259 ndb_debug("ndb_lookup_by_key: mdb_get note failed\n"); 2260 return NULL; 2261 } 2262 2263 if (len) 2264 *len = v.mv_size; 2265 2266 return v.mv_data; 2267 } 2268 2269 static void *ndb_lookup_tsid(struct ndb_txn *txn, enum ndb_dbs ind, 2270 enum ndb_dbs store, const unsigned char *pk, 2271 size_t *len, uint64_t *primkey) 2272 { 2273 MDB_val k, v; 2274 void *res = NULL; 2275 if (len) 2276 *len = 0; 2277 2278 if (!ndb_get_tsid(txn, ind, pk, &k)) { 2279 //ndb_debug("ndb_get_profile_by_pubkey: ndb_get_tsid failed\n"); 2280 return 0; 2281 } 2282 2283 if (primkey) 2284 *primkey = *(uint64_t*)k.mv_data; 2285 2286 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) { 2287 ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n"); 2288 return 0; 2289 } 2290 2291 res = v.mv_data; 2292 assert(((uint64_t)res % 4) == 0); 2293 if (len) 2294 *len = v.mv_size; 2295 return res; 2296 } 2297 2298 void *ndb_get_profile_by_pubkey(struct ndb_txn *txn, const unsigned char *pk, size_t *len, uint64_t *key) 2299 { 2300 return ndb_lookup_tsid(txn, NDB_DB_PROFILE_PK, NDB_DB_PROFILE, pk, len, key); 2301 } 2302 2303 struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *key) 2304 { 2305 return ndb_lookup_tsid(txn, NDB_DB_NOTE_ID, NDB_DB_NOTE, id, len, key); 2306 } 2307 2308 static inline uint64_t ndb_get_indexkey_by_id(struct ndb_txn *txn, 2309 enum ndb_dbs db, 2310 const unsigned char *id) 2311 { 2312 MDB_val k; 2313 2314 if (!ndb_get_tsid(txn, db, id, &k)) 2315 return 0; 2316 2317 return *(uint32_t*)k.mv_data; 2318 } 2319 2320 uint64_t ndb_get_notekey_by_id(struct ndb_txn *txn, const unsigned char *id) 2321 { 2322 return ndb_get_indexkey_by_id(txn, NDB_DB_NOTE_ID, id); 2323 } 2324 2325 uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *id) 2326 { 2327 return ndb_get_indexkey_by_id(txn, NDB_DB_PROFILE_PK, id); 2328 } 2329 2330 struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 2331 { 2332 return ndb_lookup_by_key(txn, key, NDB_DB_NOTE, len); 2333 } 2334 2335 void *ndb_get_profile_by_key(struct ndb_txn *txn, uint64_t key, size_t *len) 2336 { 2337 return ndb_lookup_by_key(txn, key, NDB_DB_PROFILE, len); 2338 } 2339 2340 uint64_t 2341 ndb_read_last_profile_fetch(struct ndb_txn *txn, const unsigned char *pubkey) 2342 { 2343 MDB_val k, v; 2344 2345 k.mv_data = (unsigned char*)pubkey; 2346 k.mv_size = 32; 2347 2348 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH], &k, &v)) { 2349 //ndb_debug("ndb_read_last_profile_fetch: mdb_get note failed\n"); 2350 return 0; 2351 } 2352 2353 return *((uint64_t*)v.mv_data); 2354 } 2355 2356 2357 static int ndb_has_note(struct ndb_txn *txn, const unsigned char *id) 2358 { 2359 MDB_val val; 2360 2361 if (!ndb_get_tsid(txn, NDB_DB_NOTE_ID, id, &val)) 2362 return 0; 2363 2364 return 1; 2365 } 2366 2367 static void ndb_txn_from_mdb(struct ndb_txn *txn, struct ndb_lmdb *lmdb, 2368 MDB_txn *mdb_txn) 2369 { 2370 txn->lmdb = lmdb; 2371 txn->mdb_txn = mdb_txn; 2372 } 2373 2374 static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid) 2375 { 2376 unsigned char id[32]; 2377 struct ndb_ingest_controller *c = data; 2378 struct ndb_txn txn; 2379 2380 hex_decode(hexid, 64, id, sizeof(id)); 2381 2382 // let's see if we already have it 2383 ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn); 2384 c->note = ndb_get_note_by_id(&txn, id, NULL, &c->note_key); 2385 2386 if (c->note == NULL) 2387 return NDB_IDRES_CONT; 2388 2389 return NDB_IDRES_STOP; 2390 } 2391 2392 static int ndbprofile_parse_json(flatcc_builder_t *B, 2393 const char *buf, size_t bufsiz, int flags, NdbProfile_ref_t *profile) 2394 { 2395 flatcc_json_parser_t parser, *ctx = &parser; 2396 flatcc_json_parser_init(ctx, B, buf, buf + bufsiz, flags); 2397 2398 if (flatcc_builder_start_buffer(B, 0, 0, 0)) 2399 return 0; 2400 2401 NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile); 2402 if (ctx->error) 2403 return 0; 2404 2405 if (!flatcc_builder_end_buffer(B, *profile)) 2406 return 0; 2407 2408 ctx->end_loc = buf; 2409 2410 2411 return 1; 2412 } 2413 2414 void ndb_profile_record_builder_init(struct ndb_profile_record_builder *b) 2415 { 2416 b->builder = malloc(sizeof(*b->builder)); 2417 b->flatbuf = NULL; 2418 } 2419 2420 void ndb_profile_record_builder_free(struct ndb_profile_record_builder *b) 2421 { 2422 if (b->builder) 2423 free(b->builder); 2424 if (b->flatbuf) 2425 free(b->flatbuf); 2426 2427 b->builder = NULL; 2428 b->flatbuf = NULL; 2429 } 2430 2431 int ndb_process_profile_note(struct ndb_note *note, 2432 struct ndb_profile_record_builder *profile) 2433 { 2434 int res; 2435 2436 NdbProfile_ref_t profile_table; 2437 flatcc_builder_t *builder; 2438 2439 ndb_profile_record_builder_init(profile); 2440 builder = profile->builder; 2441 flatcc_builder_init(builder); 2442 2443 NdbProfileRecord_start_as_root(builder); 2444 2445 //printf("parsing profile '%.*s'\n", note->content_length, ndb_note_content(note)); 2446 if (!(res = ndbprofile_parse_json(builder, ndb_note_content(note), 2447 note->content_length, 2448 flatcc_json_parser_f_skip_unknown, 2449 &profile_table))) 2450 { 2451 ndb_debug("profile_parse_json failed %d '%.*s'\n", res, 2452 note->content_length, ndb_note_content(note)); 2453 ndb_profile_record_builder_free(profile); 2454 return 0; 2455 } 2456 2457 uint64_t received_at = time(NULL); 2458 const char *lnurl = "fixme"; 2459 2460 NdbProfileRecord_profile_add(builder, profile_table); 2461 NdbProfileRecord_received_at_add(builder, received_at); 2462 2463 flatcc_builder_ref_t lnurl_off; 2464 lnurl_off = flatcc_builder_create_string_str(builder, lnurl); 2465 2466 NdbProfileRecord_lnurl_add(builder, lnurl_off); 2467 2468 //*profile = flatcc_builder_finalize_aligned_buffer(builder, profile_len); 2469 return 1; 2470 } 2471 2472 static int ndb_ingester_queue_event(struct ndb_ingester *ingester, 2473 char *json, unsigned len, 2474 unsigned client, const char *relay) 2475 { 2476 struct ndb_ingester_msg msg; 2477 msg.type = NDB_INGEST_EVENT; 2478 2479 msg.event.json = json; 2480 msg.event.len = len; 2481 msg.event.client = client; 2482 msg.event.relay = relay; 2483 2484 return threadpool_dispatch(&ingester->tp, &msg); 2485 } 2486 2487 void ndb_ingest_meta_init(struct ndb_ingest_meta *meta, unsigned client, const char *relay) 2488 { 2489 meta->client = client; 2490 meta->relay = relay; 2491 } 2492 2493 static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json, 2494 int len, struct ndb_ingest_meta *meta) 2495 { 2496 const char *relay = meta->relay; 2497 2498 // Without this, we get bus errors in the json parser inside when 2499 // trying to ingest empty kind 6 reposts... we should probably do fuzz 2500 // testing on inputs to the json parser 2501 if (len == 0) 2502 return 0; 2503 2504 // Since we need to return as soon as possible, and we're not 2505 // making any assumptions about the lifetime of the string, we 2506 // definitely need to copy the json here. In the future once we 2507 // have our thread that manages a websocket connection, we can 2508 // avoid the copy and just use the buffer we get from that 2509 // thread. 2510 char *json_copy = strdupn(json, len); 2511 if (json_copy == NULL) 2512 return 0; 2513 2514 if (relay != NULL) { 2515 relay = strdup(meta->relay); 2516 if (relay == NULL) 2517 return 0; 2518 } 2519 2520 return ndb_ingester_queue_event(ingester, json_copy, len, meta->client, relay); 2521 } 2522 2523 2524 static int ndb_ingester_process_note(secp256k1_context *ctx, 2525 struct ndb_note *note, 2526 size_t note_size, 2527 struct ndb_writer_msg *out, 2528 struct ndb_ingester *ingester, 2529 const char *relay) 2530 { 2531 enum ndb_ingest_filter_action action; 2532 struct ndb_ingest_meta meta; 2533 2534 action = NDB_INGEST_ACCEPT; 2535 2536 if (ingester->filter) 2537 action = ingester->filter(ingester->filter_context, note); 2538 2539 if (action == NDB_INGEST_REJECT) 2540 return 0; 2541 2542 // some special situations we might want to skip sig validation, 2543 // like during large imports 2544 if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) { 2545 // if we're skipping validation we don't need to verify 2546 } else { 2547 // verify! If it's an invalid note we don't need to 2548 // bother writing it to the database 2549 if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) { 2550 ndb_debug("signature verification failed\n"); 2551 return 0; 2552 } 2553 } 2554 2555 // we didn't find anything. let's send it 2556 // to the writer thread 2557 note = realloc(note, note_size); 2558 assert(((uint64_t)note % 4) == 0); 2559 2560 if (note->kind == 0) { 2561 struct ndb_profile_record_builder *b = 2562 &out->profile.record; 2563 2564 ndb_process_profile_note(note, b); 2565 2566 out->type = NDB_WRITER_PROFILE; 2567 out->profile.note.note = note; 2568 out->profile.note.note_len = note_size; 2569 return 1; 2570 } else if (note->kind == 6) { 2571 // process the repost if we have a repost event 2572 ndb_debug("processing kind 6 repost\n"); 2573 // dup the relay string 2574 ndb_ingest_meta_init(&meta, 0, relay); 2575 ndb_ingest_event(ingester, ndb_note_content(note), 2576 ndb_note_content_length(note), 2577 &meta); 2578 } 2579 2580 out->type = NDB_WRITER_NOTE; 2581 out->note.note = note; 2582 out->note.note_len = note_size; 2583 out->note.relay = relay; 2584 2585 return 1; 2586 } 2587 2588 int ndb_note_seen_on_relay(struct ndb_txn *txn, uint64_t note_key, const char *relay) 2589 { 2590 MDB_val k, v; 2591 MDB_cursor *cur; 2592 int rc, len; 2593 char relay_buf[256]; 2594 2595 if (relay == NULL) 2596 return 0; 2597 2598 len = strlen(relay); 2599 2600 if (!(len = prepare_relay_buf(relay_buf, sizeof(relay_buf), relay, len))) 2601 return 0; 2602 2603 assert((len % 8) == 0); 2604 2605 k.mv_data = ¬e_key; 2606 k.mv_size = sizeof(note_key); 2607 2608 v.mv_data = relay_buf; 2609 v.mv_size = len; 2610 2611 if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAYS], &cur)) != MDB_SUCCESS) 2612 return 0; 2613 2614 rc = mdb_cursor_get(cur, &k, &v, MDB_GET_BOTH); 2615 mdb_cursor_close(cur); 2616 2617 return rc == MDB_SUCCESS; 2618 } 2619 2620 // process the relay for the note. this is called when we already have the 2621 // note in the database but still need to check if the relay needs to be 2622 // written to the relay indexes for corresponding note 2623 static int ndb_process_note_relay(struct ndb_txn *txn, struct ndb_writer_msg *out, 2624 uint64_t note_key, struct ndb_note *note, 2625 const char *relay) 2626 { 2627 // query to see if we already have the relay on this note 2628 if (ndb_note_seen_on_relay(txn, note_key, relay)) { 2629 return 0; 2630 } 2631 2632 // if not, tell the writer thread to emit a NOTE_RELAY event 2633 out->type = NDB_WRITER_NOTE_RELAY; 2634 2635 out->note_relay.relay = relay; 2636 out->note_relay.note_key = note_key; 2637 out->note_relay.kind = ndb_note_kind(note); 2638 out->note_relay.created_at = ndb_note_created_at(note); 2639 2640 return 1; 2641 } 2642 2643 static int ndb_ingester_process_event(secp256k1_context *ctx, 2644 struct ndb_ingester *ingester, 2645 struct ndb_ingester_event *ev, 2646 struct ndb_writer_msg *out, 2647 MDB_txn *read_txn) 2648 { 2649 struct ndb_tce tce; 2650 struct ndb_fce fce; 2651 struct ndb_note *note; 2652 struct ndb_ingest_controller controller; 2653 struct ndb_id_cb cb; 2654 void *buf; 2655 int ok; 2656 size_t bufsize, note_size; 2657 2658 ok = 0; 2659 2660 // we will use this to check if we already have it in the DB during 2661 // ID parsing 2662 controller.read_txn = read_txn; 2663 controller.lmdb = ingester->lmdb; 2664 cb.fn = ndb_ingester_json_controller; 2665 cb.data = &controller; 2666 2667 // since we're going to be passing this allocated note to a different 2668 // thread, we can't use thread-local buffers. just allocate a block 2669 bufsize = max(ev->len * 8.0, 4096); 2670 buf = malloc(bufsize); 2671 if (!buf) { 2672 ndb_debug("couldn't malloc buf\n"); 2673 return 0; 2674 } 2675 2676 note_size = 2677 ev->client ? 2678 ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) : 2679 ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb); 2680 2681 // This is a result from our special json parser. It parsed the id 2682 // and found that we already have it in the database 2683 if ((int)note_size == -42) { 2684 assert(controller.note != NULL); 2685 assert(controller.note_key != 0); 2686 struct ndb_txn txn; 2687 ndb_txn_from_mdb(&txn, ingester->lmdb, read_txn); 2688 2689 // we still need to process the relays on the note even 2690 // if we already have it 2691 if (ev->relay && ndb_process_note_relay(&txn, out, 2692 controller.note_key, 2693 controller.note, 2694 ev->relay)) 2695 { 2696 // free note buf here since we don't pass the note to the writer thread 2697 free(buf); 2698 goto success; 2699 } else { 2700 // we already have the note and there are no new 2701 // relays to process. nothing to write. 2702 goto cleanup; 2703 } 2704 } else if (note_size == 0) { 2705 ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json); 2706 goto cleanup; 2707 } 2708 2709 //ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json); 2710 2711 if (ev->client) { 2712 switch (fce.evtype) { 2713 case NDB_FCE_EVENT: 2714 note = fce.event.note; 2715 if (note != buf) { 2716 ndb_debug("note buffer not equal to malloc'd buffer\n"); 2717 goto cleanup; 2718 } 2719 2720 if (!ndb_ingester_process_note(ctx, note, note_size, 2721 out, ingester, 2722 ev->relay)) { 2723 ndb_debug("failed to process note\n"); 2724 goto cleanup; 2725 } else { 2726 goto success; 2727 } 2728 } 2729 } else { 2730 switch (tce.evtype) { 2731 case NDB_TCE_AUTH: goto cleanup; 2732 case NDB_TCE_NOTICE: goto cleanup; 2733 case NDB_TCE_EOSE: goto cleanup; 2734 case NDB_TCE_OK: goto cleanup; 2735 case NDB_TCE_EVENT: 2736 note = tce.event.note; 2737 if (note != buf) { 2738 ndb_debug("note buffer not equal to malloc'd buffer\n"); 2739 goto cleanup; 2740 } 2741 2742 if (!ndb_ingester_process_note(ctx, note, note_size, 2743 out, ingester, 2744 ev->relay)) { 2745 ndb_debug("failed to process note\n"); 2746 goto cleanup; 2747 } else { 2748 goto success; 2749 } 2750 } 2751 } 2752 2753 2754 success: 2755 free(ev->json); 2756 // we don't free relay or buf since those are passed to the writer thread 2757 return 1; 2758 2759 cleanup: 2760 free(ev->json); 2761 if (ev->relay) 2762 free((void*)ev->relay); 2763 free(buf); 2764 2765 return ok; 2766 } 2767 2768 static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db) 2769 { 2770 MDB_cursor *mc; 2771 MDB_val key, val; 2772 2773 if (mdb_cursor_open(txn, db, &mc)) 2774 return 0; 2775 2776 if (mdb_cursor_get(mc, &key, &val, MDB_LAST)) { 2777 mdb_cursor_close(mc); 2778 return 0; 2779 } 2780 2781 mdb_cursor_close(mc); 2782 2783 assert(key.mv_size == 8); 2784 return *((uint64_t*)key.mv_data); 2785 } 2786 2787 // 2788 // make a search key meant for user queries without any other note info 2789 static void ndb_make_search_key_low(struct ndb_search_key *key, const char *search) 2790 { 2791 memset(key->id, 0, sizeof(key->id)); 2792 key->timestamp = 0; 2793 lowercase_strncpy(key->search, search, sizeof(key->search) - 1); 2794 key->search[sizeof(key->search) - 1] = '\0'; 2795 } 2796 2797 int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query) 2798 { 2799 int rc; 2800 struct ndb_search_key s; 2801 MDB_val k, v; 2802 search->cursor = NULL; 2803 2804 MDB_cursor **cursor = (MDB_cursor **)&search->cursor; 2805 2806 ndb_make_search_key_low(&s, query); 2807 2808 k.mv_data = &s; 2809 k.mv_size = sizeof(s); 2810 2811 if ((rc = mdb_cursor_open(txn->mdb_txn, 2812 txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 2813 cursor))) { 2814 printf("search_profile: cursor opened failed: %s\n", 2815 mdb_strerror(rc)); 2816 return 0; 2817 } 2818 2819 // Position cursor at the next key greater than or equal to the specified key 2820 if (mdb_cursor_get(search->cursor, &k, &v, MDB_SET_RANGE)) { 2821 printf("search_profile: cursor get failed\n"); 2822 goto cleanup; 2823 } else { 2824 search->key = k.mv_data; 2825 assert(v.mv_size == 8); 2826 search->profile_key = *((uint64_t*)v.mv_data); 2827 return 1; 2828 } 2829 2830 cleanup: 2831 mdb_cursor_close(search->cursor); 2832 search->cursor = NULL; 2833 return 0; 2834 } 2835 2836 void ndb_search_profile_end(struct ndb_search *search) 2837 { 2838 if (search->cursor) 2839 mdb_cursor_close(search->cursor); 2840 } 2841 2842 int ndb_search_profile_next(struct ndb_search *search) 2843 { 2844 int rc; 2845 MDB_val k, v; 2846 unsigned char *init_id; 2847 2848 init_id = search->key->id; 2849 k.mv_data = search->key; 2850 k.mv_size = sizeof(*search->key); 2851 2852 retry: 2853 if ((rc = mdb_cursor_get(search->cursor, &k, &v, MDB_NEXT))) { 2854 ndb_debug("ndb_search_profile_next: %s\n", 2855 mdb_strerror(rc)); 2856 return 0; 2857 } else { 2858 search->key = k.mv_data; 2859 assert(v.mv_size == 8); 2860 search->profile_key = *((uint64_t*)v.mv_data); 2861 2862 // skip duplicate pubkeys 2863 if (!memcmp(init_id, search->key->id, 32)) 2864 goto retry; 2865 } 2866 2867 return 1; 2868 } 2869 2870 // 2871 // The relay kind index has a layout like so (so we don't need dupsort) 2872 // 2873 // - note_id: 00 + 8 bytes 2874 // - kind: 08 + 8 bytes 2875 // - created_at: 16 + 8 bytes 2876 // - relay_url_size: 24 + 1 byte 2877 // - relay_url: 25 + n byte null-terminated string 2878 // - pad to 8 byte alignment 2879 // 2880 // The key sort order is: 2881 // 2882 // relay_url, kind, created_at 2883 // 2884 static int ndb_relay_kind_cmp(const MDB_val *a, const MDB_val *b) 2885 { 2886 int cmp; 2887 MDB_val va, vb; 2888 uint64_t iva, ivb; 2889 unsigned char *ad = (unsigned char *)a->mv_data; 2890 unsigned char *bd = (unsigned char *)b->mv_data; 2891 assert(((uint64_t)a->mv_data % 8) == 0); 2892 2893 va.mv_size = *(ad + 24); 2894 va.mv_data = ad + 25; 2895 2896 vb.mv_size = *(bd + 24); 2897 vb.mv_data = bd + 25; 2898 2899 cmp = mdb_cmp_memn(&va, &vb); 2900 if (cmp) return cmp; 2901 2902 // kind 2903 iva = *(uint64_t*)(ad + 8); 2904 ivb = *(uint64_t*)(bd + 8); 2905 2906 if (iva < ivb) 2907 return -1; 2908 else if (iva > ivb) 2909 return 1; 2910 2911 // created_at 2912 iva = *(uint64_t*)(ad + 16); 2913 ivb = *(uint64_t*)(bd + 16); 2914 2915 if (iva < ivb) 2916 return -1; 2917 else if (iva > ivb) 2918 return 1; 2919 2920 // note_id (so we don't need dupsort logic) 2921 iva = *(uint64_t*)ad; 2922 ivb = *(uint64_t*)bd; 2923 2924 if (iva < ivb) 2925 return -1; 2926 else if (iva > ivb) 2927 return 1; 2928 2929 return 0; 2930 } 2931 2932 static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b) 2933 { 2934 int cmp; 2935 struct ndb_search_key *ska, *skb; 2936 2937 ska = a->mv_data; 2938 skb = b->mv_data; 2939 2940 MDB_val a2 = *a; 2941 MDB_val b2 = *b; 2942 2943 a2.mv_data = ska->search; 2944 a2.mv_size = sizeof(ska->search) + sizeof(ska->id); 2945 2946 cmp = mdb_cmp_memn(&a2, &b2); 2947 if (cmp) return cmp; 2948 2949 if (ska->timestamp < skb->timestamp) 2950 return -1; 2951 else if (ska->timestamp > skb->timestamp) 2952 return 1; 2953 2954 return 0; 2955 } 2956 2957 static int ndb_write_profile_pk_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key) 2958 2959 { 2960 MDB_val key, val; 2961 int rc; 2962 struct ndb_tsid tsid; 2963 MDB_dbi pk_db; 2964 2965 pk_db = txn->lmdb->dbs[NDB_DB_PROFILE_PK]; 2966 2967 // write profile_pk + created_at index 2968 ndb_tsid_init(&tsid, note->pubkey, note->created_at); 2969 2970 key.mv_data = &tsid; 2971 key.mv_size = sizeof(tsid); 2972 val.mv_data = &profile_key; 2973 val.mv_size = sizeof(profile_key); 2974 2975 if ((rc = mdb_put(txn->mdb_txn, pk_db, &key, &val, 0))) { 2976 ndb_debug("write profile_pk(%" PRIu64 ") to db failed: %s\n", 2977 profile_key, mdb_strerror(rc)); 2978 return 0; 2979 } 2980 2981 return 1; 2982 } 2983 2984 static int ndb_write_profile(struct ndb_txn *txn, 2985 struct ndb_writer_profile *profile, 2986 uint64_t note_key) 2987 { 2988 uint64_t profile_key; 2989 struct ndb_note *note; 2990 void *flatbuf; 2991 size_t flatbuf_len; 2992 int rc; 2993 2994 MDB_val key, val; 2995 MDB_dbi profile_db; 2996 2997 note = profile->note.note; 2998 2999 // add note_key to profile record 3000 NdbProfileRecord_note_key_add(profile->record.builder, note_key); 3001 NdbProfileRecord_end_as_root(profile->record.builder); 3002 3003 flatbuf = profile->record.flatbuf = 3004 flatcc_builder_finalize_aligned_buffer(profile->record.builder, &flatbuf_len); 3005 3006 assert(((uint64_t)flatbuf % 8) == 0); 3007 3008 // TODO: this may not be safe!? 3009 flatbuf_len = (flatbuf_len + 7) & ~7; 3010 3011 //assert(NdbProfileRecord_verify_as_root(flatbuf, flatbuf_len) == 0); 3012 3013 // get dbs 3014 profile_db = txn->lmdb->dbs[NDB_DB_PROFILE]; 3015 3016 // get new key 3017 profile_key = ndb_get_last_key(txn->mdb_txn, profile_db) + 1; 3018 3019 // write profile to profile store 3020 key.mv_data = &profile_key; 3021 key.mv_size = sizeof(profile_key); 3022 val.mv_data = flatbuf; 3023 val.mv_size = flatbuf_len; 3024 3025 if ((rc = mdb_put(txn->mdb_txn, profile_db, &key, &val, 0))) { 3026 ndb_debug("write profile to db failed: %s\n", mdb_strerror(rc)); 3027 return 0; 3028 } 3029 3030 // write last fetched record 3031 if (!ndb_maybe_write_last_profile_fetch(txn, note)) { 3032 ndb_debug("failed to write last profile fetched record\n"); 3033 } 3034 3035 // write profile pubkey index 3036 if (!ndb_write_profile_pk_index(txn, note, profile_key)) { 3037 ndb_debug("failed to write profile pubkey index\n"); 3038 return 0; 3039 } 3040 3041 // write name, display_name profile search indices 3042 if (!ndb_write_profile_search_indices(txn, note, profile_key, 3043 flatbuf)) { 3044 ndb_debug("failed to write profile search indices\n"); 3045 return 0; 3046 } 3047 3048 return 1; 3049 } 3050 3051 // find the last id tag in a note (e, p, etc) 3052 static unsigned char *ndb_note_last_id_tag(struct ndb_note *note, char type) 3053 { 3054 unsigned char *last = NULL; 3055 struct ndb_iterator iter; 3056 struct ndb_str str; 3057 3058 // get the liked event id (last id) 3059 ndb_tags_iterate_start(note, &iter); 3060 3061 while (ndb_tags_iterate_next(&iter)) { 3062 if (iter.tag->count < 2) 3063 continue; 3064 3065 str = ndb_tag_str(note, iter.tag, 0); 3066 3067 // assign liked to the last e tag 3068 if (str.flag == NDB_PACKED_STR && str.str[0] == type) { 3069 str = ndb_tag_str(note, iter.tag, 1); 3070 if (str.flag == NDB_PACKED_ID) 3071 last = str.id; 3072 } 3073 } 3074 3075 return last; 3076 } 3077 3078 void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len) 3079 { 3080 MDB_val k, v; 3081 3082 k.mv_data = (unsigned char*)id; 3083 k.mv_size = 32; 3084 3085 if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &k, &v)) { 3086 //ndb_debug("ndb_get_note_meta: mdb_get note failed\n"); 3087 return NULL; 3088 } 3089 3090 if (len) 3091 *len = v.mv_size; 3092 3093 return v.mv_data; 3094 } 3095 3096 // When receiving a reaction note, look for the liked id and increase the 3097 // reaction counter in the note metadata database 3098 static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note) 3099 { 3100 size_t len; 3101 void *root; 3102 int reactions, rc; 3103 MDB_val key, val; 3104 NdbEventMeta_table_t meta; 3105 unsigned char *liked = ndb_note_last_id_tag(note, 'e'); 3106 3107 if (liked == NULL) 3108 return 0; 3109 3110 root = ndb_get_note_meta(txn, liked, &len); 3111 3112 flatcc_builder_t builder; 3113 flatcc_builder_init(&builder); 3114 NdbEventMeta_start_as_root(&builder); 3115 3116 // no meta record, let's make one 3117 if (root == NULL) { 3118 NdbEventMeta_reactions_add(&builder, 1); 3119 } else { 3120 // clone existing and add to it 3121 meta = NdbEventMeta_as_root(root); 3122 3123 reactions = NdbEventMeta_reactions_get(meta); 3124 NdbEventMeta_clone(&builder, meta); 3125 NdbEventMeta_reactions_add(&builder, reactions + 1); 3126 } 3127 3128 NdbProfileRecord_end_as_root(&builder); 3129 root = flatcc_builder_finalize_aligned_buffer(&builder, &len); 3130 assert(((uint64_t)root % 8) == 0); 3131 3132 if (root == NULL) { 3133 ndb_debug("failed to create note metadata record\n"); 3134 return 0; 3135 } 3136 3137 // metadata is keyed on id because we want to collect stats regardless 3138 // if we have the note yet or not 3139 key.mv_data = liked; 3140 key.mv_size = 32; 3141 3142 val.mv_data = root; 3143 val.mv_size = len; 3144 3145 // write the new meta record 3146 //ndb_debug("writing stats record for "); 3147 //print_hex(liked, 32); 3148 //ndb_debug("\n"); 3149 3150 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) { 3151 ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc)); 3152 free(root); 3153 return 0; 3154 } 3155 3156 free(root); 3157 3158 return 1; 3159 } 3160 3161 3162 static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note, 3163 uint64_t note_key) 3164 3165 { 3166 struct ndb_tsid tsid; 3167 int rc; 3168 MDB_val key, val; 3169 MDB_dbi id_db; 3170 3171 ndb_tsid_init(&tsid, note->id, note->created_at); 3172 3173 key.mv_data = &tsid; 3174 key.mv_size = sizeof(tsid); 3175 val.mv_data = ¬e_key; 3176 val.mv_size = sizeof(note_key); 3177 3178 id_db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 3179 3180 if ((rc = mdb_put(txn->mdb_txn, id_db, &key, &val, 0))) { 3181 ndb_debug("write note id index to db failed: %s\n", 3182 mdb_strerror(rc)); 3183 return 0; 3184 } 3185 3186 return 1; 3187 } 3188 3189 static int ndb_filter_group_add_filters(struct ndb_filter_group *group, 3190 struct ndb_filter *filters, 3191 int num_filters) 3192 { 3193 int i; 3194 3195 for (i = 0; i < num_filters; i++) { 3196 if (!ndb_filter_group_add(group, &filters[i])) 3197 return 0; 3198 } 3199 3200 return 1; 3201 } 3202 3203 3204 static struct ndb_filter_elements * 3205 ndb_filter_find_elements(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) 3206 { 3207 int i; 3208 struct ndb_filter_elements *els; 3209 3210 for (i = 0; i < filter->num_elements; i++) { 3211 els = ndb_filter_get_elements(filter, i); 3212 assert(els); 3213 if (els->field.type == typ) { 3214 return els; 3215 } 3216 } 3217 3218 return NULL; 3219 } 3220 3221 static const char *ndb_filter_find_search(struct ndb_filter *filter) 3222 { 3223 struct ndb_filter_elements *els; 3224 3225 if (!(els = ndb_filter_find_elements(filter, NDB_FILTER_SEARCH))) 3226 return NULL; 3227 3228 return ndb_filter_get_string_element(filter, els, 0); 3229 } 3230 3231 int ndb_filter_is_subset_of(const struct ndb_filter *a, const struct ndb_filter *b) 3232 { 3233 int i; 3234 struct ndb_filter_elements *b_field, *a_field; 3235 3236 // Everything is always a subset of {} 3237 if (b->num_elements == 0) 3238 return 1; 3239 3240 // We can't be a subset if the number of elements in the other 3241 // filter is larger then the number of elements we have. 3242 if (b->num_elements > a->num_elements) 3243 return 0; 3244 3245 // If our filter count matches, we can only be a subset if we are 3246 // equal 3247 if (b->num_elements == a->num_elements) 3248 return ndb_filter_eq(a, b); 3249 3250 // If our element count is larger than the other filter, then we 3251 // must see if every element in the other filter exists in ours. If 3252 // so, then we are a subset of the other. 3253 // 3254 // eg: B={k:1, a:b} <- A={t:x, k:1, a:b} 3255 // 3256 // A is a subset of B because `k:1` and `a:b` both exist in A 3257 3258 for (i = 0; i < b->num_elements; i++) { 3259 b_field = ndb_filter_get_elements((struct ndb_filter*)b, i); 3260 a_field = ndb_filter_find_elements((struct ndb_filter*)a, 3261 b_field->field.type); 3262 3263 if (a_field == NULL) 3264 return 0; 3265 3266 if (!ndb_filter_field_eq((struct ndb_filter*)a, a_field, 3267 (struct ndb_filter*)b, b_field)) 3268 return 0; 3269 } 3270 3271 return 1; 3272 } 3273 3274 int ndb_filter_eq(const struct ndb_filter *a, const struct ndb_filter *b) 3275 { 3276 int i; 3277 struct ndb_filter_elements *a_els, *b_els; 3278 3279 if (a->num_elements != b->num_elements) 3280 return 0; 3281 3282 for (i = 0; i < a->num_elements; i++) { 3283 a_els = ndb_filter_get_elements((struct ndb_filter*)a, i); 3284 b_els = ndb_filter_find_elements((struct ndb_filter *)b, 3285 a_els->field.type); 3286 3287 if (b_els == NULL) 3288 return 0; 3289 3290 if (!ndb_filter_field_eq((struct ndb_filter*)a, a_els, 3291 (struct ndb_filter*)b, b_els)) 3292 return 0; 3293 } 3294 3295 return 1; 3296 } 3297 3298 3299 static uint64_t * 3300 ndb_filter_get_elem(struct ndb_filter *filter, enum ndb_filter_fieldtype typ) 3301 { 3302 struct ndb_filter_elements *els; 3303 if ((els = ndb_filter_find_elements(filter, typ))) 3304 return &els->elements[0]; 3305 return NULL; 3306 } 3307 3308 static uint64_t *ndb_filter_get_int(struct ndb_filter *filter, 3309 enum ndb_filter_fieldtype typ) 3310 { 3311 uint64_t *el; 3312 if (NULL == (el = ndb_filter_get_elem(filter, typ))) 3313 return NULL; 3314 return el; 3315 } 3316 3317 static inline int push_query_result(struct ndb_query_results *results, 3318 struct ndb_query_result *result) 3319 { 3320 return cursor_push(&results->cur, (unsigned char*)result, sizeof(*result)); 3321 } 3322 3323 static int compare_query_results(const void *pa, const void *pb) 3324 { 3325 struct ndb_query_result *a, *b; 3326 3327 a = (struct ndb_query_result *)pa; 3328 b = (struct ndb_query_result *)pb; 3329 3330 if (a->note->created_at == b->note->created_at) { 3331 return 0; 3332 } else if (a->note->created_at > b->note->created_at) { 3333 return -1; 3334 } else { 3335 return 1; 3336 } 3337 } 3338 3339 static void ndb_query_result_init(struct ndb_query_result *res, 3340 struct ndb_note *note, 3341 uint64_t note_size, 3342 uint64_t note_id) 3343 { 3344 *res = (struct ndb_query_result){ 3345 .note_id = note_id, 3346 .note_size = note_size, 3347 .note = note, 3348 }; 3349 } 3350 3351 static int query_is_full(struct ndb_query_results *results, int limit) 3352 { 3353 if (results->cur.p >= results->cur.end) 3354 return 1; 3355 3356 return cursor_count(&results->cur, sizeof(struct ndb_query_result)) >= limit; 3357 } 3358 3359 static int ndb_query_plan_execute_search(struct ndb_txn *txn, 3360 struct ndb_filter *filter, 3361 struct ndb_query_results *results, 3362 int limit) 3363 { 3364 const char *search; 3365 int i; 3366 struct ndb_text_search_results text_results; 3367 struct ndb_text_search_result *text_result; 3368 struct ndb_text_search_config config; 3369 struct ndb_query_result result; 3370 3371 ndb_default_text_search_config(&config); 3372 3373 if (!(search = ndb_filter_find_search(filter))) 3374 return 0; 3375 3376 if (!ndb_text_search_with(txn, search, &text_results, &config, filter)) 3377 return 0; 3378 3379 for (i = 0; i < text_results.num_results; i++) { 3380 if (query_is_full(results, limit)) 3381 break; 3382 3383 text_result = &text_results.results[i]; 3384 3385 result.note = text_result->note; 3386 result.note_size = text_result->note_size; 3387 result.note_id = text_result->key.note_id; 3388 3389 if (!push_query_result(results, &result)) 3390 break; 3391 } 3392 3393 return 1; 3394 } 3395 3396 static int ndb_query_plan_execute_ids(struct ndb_txn *txn, 3397 struct ndb_filter *filter, 3398 struct ndb_query_results *results, 3399 int limit) 3400 { 3401 MDB_cursor *cur; 3402 MDB_dbi db; 3403 MDB_val k, v; 3404 int rc, i; 3405 struct ndb_filter_elements *ids; 3406 struct ndb_note *note; 3407 struct ndb_query_result res; 3408 struct ndb_tsid tsid, *ptsid; 3409 uint64_t note_id, until, *pint; 3410 size_t note_size; 3411 unsigned char *id; 3412 3413 until = UINT64_MAX; 3414 3415 if (!(ids = ndb_filter_find_elements(filter, NDB_FILTER_IDS))) 3416 return 0; 3417 3418 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3419 until = *pint; 3420 3421 db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 3422 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3423 return 0; 3424 3425 // for each id in our ids filter, find in the db 3426 for (i = 0; i < ids->count; i++) { 3427 if (query_is_full(results, limit)) 3428 break; 3429 3430 id = ndb_filter_get_id_element(filter, ids, i); 3431 ndb_tsid_init(&tsid, (unsigned char *)id, until); 3432 3433 k.mv_data = &tsid; 3434 k.mv_size = sizeof(tsid); 3435 3436 if (!ndb_cursor_start(cur, &k, &v)) 3437 continue; 3438 3439 ptsid = (struct ndb_tsid *)k.mv_data; 3440 note_id = *(uint64_t*)v.mv_data; 3441 3442 if (memcmp(id, ptsid->id, 32)) 3443 continue; 3444 3445 // get the note because we need it to match against the filter 3446 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3447 continue; 3448 3449 // Sure this particular lookup matched the index query, but 3450 // does it match the entire filter? Check! We also pass in 3451 // things we've already matched via the filter so we don't have 3452 // to check again. This can be pretty important for filters 3453 // with a large number of entries. 3454 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_IDS)) 3455 continue; 3456 3457 ndb_query_result_init(&res, note, note_size, note_id); 3458 if (!push_query_result(results, &res)) 3459 break; 3460 } 3461 3462 mdb_cursor_close(cur); 3463 return 1; 3464 } 3465 3466 // 3467 // encode a tag index key 3468 // 3469 // consists of: 3470 // 3471 // u8 tag 3472 // u8 tag_val_len 3473 // [u8] tag_val_bytes 3474 // u64 created_at 3475 // 3476 static int ndb_encode_tag_key(unsigned char *buf, int buf_size, 3477 char tag, const unsigned char *val, 3478 unsigned char val_len, 3479 uint64_t timestamp) 3480 { 3481 struct cursor writer; 3482 int ok; 3483 3484 // quick exit for obvious case where it will be too big. There can be 3485 // values of val_len that still fail, but we just let the writer handle 3486 // those failure cases 3487 if (val_len >= buf_size) 3488 return 0; 3489 3490 make_cursor(buf, buf + buf_size, &writer); 3491 3492 ok = 3493 cursor_push_byte(&writer, tag) && 3494 cursor_push(&writer, (unsigned char*)val, val_len) && 3495 cursor_push(&writer, (unsigned char*)×tamp, sizeof(timestamp)); 3496 3497 if (!ok) 3498 return 0; 3499 3500 return writer.p - writer.start; 3501 } 3502 3503 static int ndb_query_plan_execute_authors(struct ndb_txn *txn, 3504 struct ndb_filter *filter, 3505 struct ndb_query_results *results, 3506 int limit) 3507 { 3508 MDB_val k, v; 3509 MDB_cursor *cur; 3510 int rc, i; 3511 uint64_t *pint, until, since, note_key; 3512 unsigned char *author; 3513 struct ndb_note *note; 3514 size_t note_size; 3515 struct ndb_filter_elements *authors; 3516 struct ndb_query_result res; 3517 struct ndb_tsid tsid, *ptsid; 3518 enum ndb_dbs db; 3519 3520 db = txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY]; 3521 3522 if (!(authors = ndb_filter_find_elements(filter, NDB_FILTER_AUTHORS))) 3523 return 0; 3524 3525 until = UINT64_MAX; 3526 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3527 until = *pint; 3528 3529 since = 0; 3530 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3531 since = *pint; 3532 3533 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3534 return 0; 3535 3536 for (i = 0; i < authors->count; i++) { 3537 author = ndb_filter_get_id_element(filter, authors, i); 3538 3539 ndb_tsid_init(&tsid, author, until); 3540 3541 k.mv_data = &tsid; 3542 k.mv_size = sizeof(tsid); 3543 3544 if (!ndb_cursor_start(cur, &k, &v)) 3545 continue; 3546 3547 // for each id in our ids filter, find in the db 3548 while (!query_is_full(results, limit)) { 3549 ptsid = (struct ndb_tsid *)k.mv_data; 3550 note_key = *(uint64_t*)v.mv_data; 3551 3552 // don't continue the scan if we're below `since` 3553 if (ptsid->timestamp < since) 3554 break; 3555 3556 // our author should match, if not bail 3557 if (memcmp(author, ptsid->id, 32)) 3558 break; 3559 3560 // fetch the note, we need it for our query results 3561 // and to match further against the filter 3562 if (!(note = ndb_get_note_by_key(txn, note_key, ¬e_size))) 3563 goto next; 3564 3565 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_AUTHORS)) 3566 goto next; 3567 3568 ndb_query_result_init(&res, note, note_size, note_key); 3569 if (!push_query_result(results, &res)) 3570 break; 3571 3572 next: 3573 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3574 break; 3575 } 3576 } 3577 3578 mdb_cursor_close(cur); 3579 return 1; 3580 } 3581 3582 static int ndb_query_plan_execute_created_at(struct ndb_txn *txn, 3583 struct ndb_filter *filter, 3584 struct ndb_query_results *results, 3585 int limit) 3586 { 3587 MDB_dbi db; 3588 MDB_val k, v; 3589 MDB_cursor *cur; 3590 int rc; 3591 struct ndb_note *note; 3592 struct ndb_tsid key, *pkey; 3593 uint64_t *pint, until, since, note_id; 3594 size_t note_size; 3595 struct ndb_query_result res; 3596 unsigned char high_key[32] = {0xFF}; 3597 3598 db = txn->lmdb->dbs[NDB_DB_NOTE_ID]; 3599 3600 until = UINT64_MAX; 3601 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3602 until = *pint; 3603 3604 since = 0; 3605 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3606 since = *pint; 3607 3608 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3609 return 0; 3610 3611 // if we have until, start there, otherwise just use max 3612 ndb_tsid_init(&key, high_key, until); 3613 k.mv_data = &key; 3614 k.mv_size = sizeof(key); 3615 3616 if (!ndb_cursor_start(cur, &k, &v)) 3617 return 1; 3618 3619 while (!query_is_full(results, limit)) { 3620 pkey = (struct ndb_tsid *)k.mv_data; 3621 note_id = *(uint64_t*)v.mv_data; 3622 assert(v.mv_size == 8); 3623 3624 // don't continue the scan if we're below `since` 3625 if (pkey->timestamp < since) 3626 break; 3627 3628 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3629 goto next; 3630 3631 // does this entry match our filter? 3632 if (!ndb_filter_matches_with(filter, note, 0)) 3633 goto next; 3634 3635 ndb_query_result_init(&res, note, (uint64_t)note_size, note_id); 3636 if (!push_query_result(results, &res)) 3637 break; 3638 next: 3639 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3640 break; 3641 } 3642 3643 mdb_cursor_close(cur); 3644 return 1; 3645 } 3646 3647 static int ndb_query_plan_execute_tags(struct ndb_txn *txn, 3648 struct ndb_filter *filter, 3649 struct ndb_query_results *results, 3650 int limit) 3651 { 3652 MDB_cursor *cur; 3653 MDB_dbi db; 3654 MDB_val k, v; 3655 int len, taglen, rc, i; 3656 uint64_t *pint, until, note_id; 3657 size_t note_size; 3658 unsigned char key_buffer[255]; 3659 struct ndb_note *note; 3660 struct ndb_filter_elements *tags; 3661 unsigned char *tag; 3662 struct ndb_query_result res; 3663 3664 db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; 3665 3666 if (!(tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS))) 3667 return 0; 3668 3669 until = UINT64_MAX; 3670 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3671 until = *pint; 3672 3673 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3674 return 0; 3675 3676 for (i = 0; i < tags->count; i++) { 3677 tag = ndb_filter_get_id_element(filter, tags, i); 3678 3679 taglen = tags->field.elem_type == NDB_ELEMENT_ID 3680 ? 32 : strlen((const char*)tag); 3681 3682 if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), 3683 tags->field.tag, tag, taglen, 3684 until))) 3685 return 0; 3686 3687 k.mv_data = key_buffer; 3688 k.mv_size = len; 3689 3690 if (!ndb_cursor_start(cur, &k, &v)) 3691 continue; 3692 3693 // for each id in our ids filter, find in the db 3694 while (!query_is_full(results, limit)) { 3695 // check if tag value matches, bail if not 3696 if (((unsigned char *)k.mv_data)[0] != tags->field.tag) 3697 break; 3698 3699 // check if tag value matches, bail if not 3700 if (taglen != k.mv_size - 9) 3701 break; 3702 3703 if (memcmp((unsigned char *)k.mv_data+1, tag, k.mv_size-9)) 3704 break; 3705 3706 note_id = *(uint64_t*)v.mv_data; 3707 3708 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3709 goto next; 3710 3711 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_TAGS)) 3712 goto next; 3713 3714 ndb_query_result_init(&res, note, note_size, note_id); 3715 if (!push_query_result(results, &res)) 3716 break; 3717 3718 next: 3719 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3720 break; 3721 } 3722 } 3723 3724 mdb_cursor_close(cur); 3725 return 1; 3726 } 3727 3728 static int ndb_query_plan_execute_kinds(struct ndb_txn *txn, 3729 struct ndb_filter *filter, 3730 struct ndb_query_results *results, 3731 int limit) 3732 { 3733 MDB_cursor *cur; 3734 MDB_dbi db; 3735 MDB_val k, v; 3736 struct ndb_note *note; 3737 struct ndb_u64_ts tsid, *ptsid; 3738 struct ndb_filter_elements *kinds; 3739 struct ndb_query_result res; 3740 uint64_t kind, note_id, until, since, *pint; 3741 size_t note_size; 3742 int i, rc; 3743 3744 // we should have kinds in a kinds filter! 3745 if (!(kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS))) 3746 return 0; 3747 3748 until = UINT64_MAX; 3749 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 3750 until = *pint; 3751 3752 since = 0; 3753 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 3754 since = *pint; 3755 3756 db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 3757 3758 if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur))) 3759 return 0; 3760 3761 for (i = 0; i < kinds->count; i++) { 3762 if (query_is_full(results, limit)) 3763 break; 3764 3765 kind = kinds->elements[i]; 3766 ndb_debug("kind %" PRIu64 "\n", kind); 3767 ndb_u64_ts_init(&tsid, kind, until); 3768 3769 k.mv_data = &tsid; 3770 k.mv_size = sizeof(tsid); 3771 3772 if (!ndb_cursor_start(cur, &k, &v)) 3773 continue; 3774 3775 // for each id in our ids filter, find in the db 3776 while (!query_is_full(results, limit)) { 3777 ptsid = (struct ndb_u64_ts *)k.mv_data; 3778 if (ptsid->u64 != kind) 3779 break; 3780 3781 // don't continue the scan if we're below `since` 3782 if (ptsid->timestamp < since) 3783 break; 3784 3785 note_id = *(uint64_t*)v.mv_data; 3786 if (!(note = ndb_get_note_by_key(txn, note_id, ¬e_size))) 3787 goto next; 3788 3789 if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_KINDS)) 3790 goto next; 3791 3792 ndb_query_result_init(&res, note, note_size, note_id); 3793 if (!push_query_result(results, &res)) 3794 break; 3795 3796 next: 3797 if (mdb_cursor_get(cur, &k, &v, MDB_PREV)) 3798 break; 3799 } 3800 } 3801 3802 mdb_cursor_close(cur); 3803 return 1; 3804 } 3805 3806 static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter) 3807 { 3808 struct ndb_filter_elements *ids, *kinds, *authors, *tags, *search; 3809 3810 ids = ndb_filter_find_elements(filter, NDB_FILTER_IDS); 3811 search = ndb_filter_find_elements(filter, NDB_FILTER_SEARCH); 3812 kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS); 3813 authors = ndb_filter_find_elements(filter, NDB_FILTER_AUTHORS); 3814 tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS); 3815 3816 // this is rougly similar to the heuristic in strfry's dbscan 3817 if (search) { 3818 return NDB_PLAN_SEARCH; 3819 } else if (ids) { 3820 return NDB_PLAN_IDS; 3821 } else if (kinds && authors && authors->count <= 10) { 3822 return NDB_PLAN_AUTHOR_KINDS; 3823 } else if (authors && authors->count <= 10) { 3824 return NDB_PLAN_AUTHORS; 3825 } else if (tags && tags->count <= 10) { 3826 return NDB_PLAN_TAGS; 3827 } else if (kinds) { 3828 return NDB_PLAN_KINDS; 3829 } 3830 3831 return NDB_PLAN_CREATED; 3832 } 3833 3834 static const char *ndb_query_plan_name(int plan_id) 3835 { 3836 switch (plan_id) { 3837 case NDB_PLAN_IDS: return "ids"; 3838 case NDB_PLAN_SEARCH: return "search"; 3839 case NDB_PLAN_KINDS: return "kinds"; 3840 case NDB_PLAN_TAGS: return "tags"; 3841 case NDB_PLAN_CREATED: return "created"; 3842 case NDB_PLAN_AUTHORS: return "authors"; 3843 } 3844 3845 return "unknown"; 3846 } 3847 3848 static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter, 3849 struct ndb_query_result *res, int capacity, 3850 int *results_out) 3851 { 3852 struct ndb_query_results results; 3853 uint64_t limit, *pint; 3854 enum ndb_query_plan plan; 3855 limit = capacity; 3856 3857 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT))) 3858 limit = *pint; 3859 3860 limit = min(capacity, limit); 3861 make_cursor((unsigned char *)res, 3862 ((unsigned char *)res) + limit * sizeof(*res), 3863 &results.cur); 3864 3865 plan = ndb_filter_plan(filter); 3866 ndb_debug("using query plan '%s'\n", ndb_query_plan_name(plan)); 3867 switch (plan) { 3868 // We have a list of ids, just open a cursor and jump to each once 3869 case NDB_PLAN_IDS: 3870 if (!ndb_query_plan_execute_ids(txn, filter, &results, limit)) 3871 return 0; 3872 break; 3873 3874 case NDB_PLAN_SEARCH: 3875 if (!ndb_query_plan_execute_search(txn, filter, &results, limit)) 3876 return 0; 3877 break; 3878 3879 // We have just kinds, just scan the kind index 3880 case NDB_PLAN_KINDS: 3881 if (!ndb_query_plan_execute_kinds(txn, filter, &results, limit)) 3882 return 0; 3883 break; 3884 3885 case NDB_PLAN_TAGS: 3886 if (!ndb_query_plan_execute_tags(txn, filter, &results, limit)) 3887 return 0; 3888 break; 3889 case NDB_PLAN_CREATED: 3890 if (!ndb_query_plan_execute_created_at(txn, filter, &results, limit)) 3891 return 0; 3892 break; 3893 case NDB_PLAN_AUTHORS: 3894 if (!ndb_query_plan_execute_authors(txn, filter, &results, limit)) 3895 return 0; 3896 break; 3897 case NDB_PLAN_AUTHOR_KINDS: 3898 /* TODO: author kinds 3899 if (!ndb_query_plan_execute_author_kinds(txn, filter, &results, limit)) 3900 return 0; 3901 */ 3902 if (!ndb_query_plan_execute_authors(txn, filter, &results, limit)) 3903 return 0; 3904 break; 3905 } 3906 3907 *results_out = cursor_count(&results.cur, sizeof(*res)); 3908 return 1; 3909 } 3910 3911 int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters, 3912 struct ndb_query_result *results, int result_capacity, int *count) 3913 { 3914 int i, out; 3915 struct ndb_query_result *p = results; 3916 3917 out = 0; 3918 *count = 0; 3919 3920 for (i = 0; i < num_filters; i++) { 3921 if (!ndb_query_filter(txn, &filters[i], p, 3922 result_capacity, &out)) { 3923 return 0; 3924 } 3925 3926 *count += out; 3927 p += out; 3928 result_capacity -= out; 3929 if (result_capacity <= 0) 3930 break; 3931 } 3932 3933 // sort results 3934 qsort(results, *count, sizeof(*results), compare_query_results); 3935 return 1; 3936 } 3937 3938 static int ndb_write_note_tag_index(struct ndb_txn *txn, struct ndb_note *note, 3939 uint64_t note_key) 3940 { 3941 unsigned char key_buffer[255]; 3942 struct ndb_iterator iter; 3943 struct ndb_str tkey, tval; 3944 char tchar; 3945 int len, rc; 3946 MDB_val key, val; 3947 MDB_dbi tags_db; 3948 3949 tags_db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS]; 3950 3951 ndb_tags_iterate_start(note, &iter); 3952 3953 while (ndb_tags_iterate_next(&iter)) { 3954 if (iter.tag->count < 2) 3955 continue; 3956 3957 tkey = ndb_tag_str(note, iter.tag, 0); 3958 3959 // we only write indices for 1-char tags. 3960 tchar = tkey.str[0]; 3961 if (tchar == 0 || tkey.str[1] != 0) 3962 continue; 3963 3964 tval = ndb_tag_str(note, iter.tag, 1); 3965 len = ndb_str_len(&tval); 3966 3967 if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer), 3968 tchar, tval.id, (unsigned char)len, 3969 ndb_note_created_at(note)))) { 3970 // this will fail when we try to encode a key that is 3971 // too big 3972 continue; 3973 } 3974 3975 //ndb_debug("writing tag '%c':'data:%d' to index\n", tchar, len); 3976 3977 key.mv_data = key_buffer; 3978 key.mv_size = len; 3979 3980 val.mv_data = ¬e_key; 3981 val.mv_size = sizeof(note_key); 3982 3983 if ((rc = mdb_put(txn->mdb_txn, tags_db, &key, &val, 0))) { 3984 ndb_debug("write note tag index to db failed: %s\n", 3985 mdb_strerror(rc)); 3986 return 0; 3987 } 3988 } 3989 3990 return 1; 3991 } 3992 3993 static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note, 3994 uint64_t note_key) 3995 { 3996 struct ndb_u64_ts tsid; 3997 int rc; 3998 MDB_val key, val; 3999 MDB_dbi kind_db; 4000 4001 ndb_u64_ts_init(&tsid, note->kind, note->created_at); 4002 4003 key.mv_data = &tsid; 4004 key.mv_size = sizeof(tsid); 4005 val.mv_data = ¬e_key; 4006 val.mv_size = sizeof(note_key); 4007 4008 kind_db = txn->lmdb->dbs[NDB_DB_NOTE_KIND]; 4009 4010 if ((rc = mdb_put(txn->mdb_txn, kind_db, &key, &val, 0))) { 4011 ndb_debug("write note kind index to db failed: %s\n", 4012 mdb_strerror(rc)); 4013 return 0; 4014 } 4015 4016 return 1; 4017 } 4018 4019 static int ndb_write_word_to_index(struct ndb_txn *txn, const char *word, 4020 int word_len, int word_index, 4021 uint64_t timestamp, uint64_t note_id) 4022 { 4023 // cap to some reasonable key size 4024 unsigned char buffer[1024]; 4025 int keysize, rc; 4026 MDB_val k, v; 4027 MDB_dbi text_db; 4028 4029 // build our compressed text index key 4030 if (!ndb_make_text_search_key(buffer, sizeof(buffer), word_index, 4031 word_len, word, timestamp, note_id, 4032 &keysize)) { 4033 // probably too big 4034 4035 return 0; 4036 } 4037 4038 k.mv_data = buffer; 4039 k.mv_size = keysize; 4040 4041 v.mv_data = NULL; 4042 v.mv_size = 0; 4043 4044 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 4045 4046 if ((rc = mdb_put(txn->mdb_txn, text_db, &k, &v, 0))) { 4047 ndb_debug("write note text index to db failed: %s\n", 4048 mdb_strerror(rc)); 4049 return 0; 4050 } 4051 4052 return 1; 4053 } 4054 4055 4056 4057 // break a string into individual words for querying or for building the 4058 // fulltext search index. This is callback based so we don't need to 4059 // build up an intermediate structure 4060 static int ndb_parse_words(struct cursor *cur, void *ctx, ndb_word_parser_fn fn) 4061 { 4062 int word_len, words; 4063 const char *word; 4064 4065 words = 0; 4066 4067 while (cur->p < cur->end) { 4068 consume_whitespace_or_punctuation(cur); 4069 if (cur->p >= cur->end) 4070 break; 4071 word = (const char *)cur->p; 4072 4073 if (!consume_until_boundary(cur)) 4074 break; 4075 4076 // start of word or end 4077 word_len = cur->p - (unsigned char *)word; 4078 if (word_len == 0 && cur->p >= cur->end) 4079 break; 4080 4081 if (word_len == 0) { 4082 if (!cursor_skip(cur, 1)) 4083 break; 4084 continue; 4085 } 4086 4087 //ndb_debug("writing word index '%.*s'\n", word_len, word); 4088 4089 if (!fn(ctx, word, word_len, words)) 4090 continue; 4091 4092 words++; 4093 } 4094 4095 return 1; 4096 } 4097 4098 struct ndb_word_writer_ctx 4099 { 4100 struct ndb_txn *txn; 4101 struct ndb_note *note; 4102 uint64_t note_id; 4103 }; 4104 4105 static int ndb_fulltext_word_writer(void *ctx, 4106 const char *word, int word_len, int words) 4107 { 4108 struct ndb_word_writer_ctx *wctx = ctx; 4109 4110 if (!ndb_write_word_to_index(wctx->txn, word, word_len, words, 4111 wctx->note->created_at, wctx->note_id)) { 4112 // too big to write this one, just skip it 4113 ndb_debug("failed to write word '%.*s' to index\n", word_len, word); 4114 4115 return 0; 4116 } 4117 4118 //fprintf(stderr, "wrote '%.*s' to note text index\n", word_len, word); 4119 return 1; 4120 } 4121 4122 static int ndb_write_note_fulltext_index(struct ndb_txn *txn, 4123 struct ndb_note *note, 4124 uint64_t note_id) 4125 { 4126 struct cursor cur; 4127 unsigned char *content; 4128 struct ndb_str str; 4129 struct ndb_word_writer_ctx ctx; 4130 4131 str = ndb_note_str(note, ¬e->content); 4132 // I don't think this should happen? 4133 if (unlikely(str.flag == NDB_PACKED_ID)) 4134 return 0; 4135 4136 content = (unsigned char *)str.str; 4137 4138 make_cursor(content, content + note->content_length, &cur); 4139 4140 ctx.txn = txn; 4141 ctx.note = note; 4142 ctx.note_id = note_id; 4143 4144 ndb_parse_words(&cur, &ctx, ndb_fulltext_word_writer); 4145 4146 return 1; 4147 } 4148 4149 static int ndb_parse_search_words(void *ctx, const char *word_str, int word_len, int word_index) 4150 { 4151 (void)word_index; 4152 struct ndb_search_words *words = ctx; 4153 struct ndb_word *word; 4154 4155 if (words->num_words + 1 > MAX_TEXT_SEARCH_WORDS) 4156 return 0; 4157 4158 word = &words->words[words->num_words++]; 4159 word->word = word_str; 4160 word->word_len = word_len; 4161 4162 return 1; 4163 } 4164 4165 static void ndb_search_words_init(struct ndb_search_words *words) 4166 { 4167 words->num_words = 0; 4168 } 4169 4170 static int prefix_count(const char *str1, int len1, const char *str2, int len2) { 4171 int i, count = 0; 4172 int min_len = len1 < len2 ? len1 : len2; 4173 4174 for (i = 0; i < min_len; i++) { 4175 // case insensitive 4176 if (tolower(str1[i]) == tolower(str2[i])) 4177 count++; 4178 else 4179 break; 4180 } 4181 4182 return count; 4183 } 4184 4185 static int ndb_prefix_matches(struct ndb_text_search_result *result, 4186 struct ndb_word *search_word) 4187 { 4188 // Empty strings shouldn't happen but let's 4189 if (result->key.str_len < 2 || search_word->word_len < 2) 4190 return 0; 4191 4192 // make sure we at least have two matching prefix characters. exact 4193 // matches are nice but range searches allow us to match prefixes as 4194 // well. A double-char prefix is suffient, but maybe we could up this 4195 // in the future. 4196 // 4197 // TODO: How are we handling utf-8 prefix matches like 4198 // japanese? 4199 // 4200 if ( result->key.str[0] != tolower(search_word->word[0]) 4201 && result->key.str[1] != tolower(search_word->word[1]) 4202 ) 4203 return 0; 4204 4205 // count the number of prefix-matched characters. This will be used 4206 // for ranking search results 4207 result->prefix_chars = prefix_count(result->key.str, 4208 result->key.str_len, 4209 search_word->word, 4210 search_word->word_len); 4211 4212 if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5)) 4213 return 0; 4214 4215 return 1; 4216 } 4217 4218 // This is called when scanning the full text search index. Scanning stops 4219 // when we no longer have a prefix match for the word 4220 static int ndb_text_search_next_word(MDB_cursor *cursor, MDB_cursor_op op, 4221 MDB_val *k, struct ndb_word *search_word, 4222 struct ndb_text_search_result *last_result, 4223 struct ndb_text_search_result *result, 4224 MDB_cursor_op order_op) 4225 { 4226 struct cursor key_cursor; 4227 //struct ndb_text_search_key search_key; 4228 MDB_val v; 4229 int retries; 4230 retries = -1; 4231 4232 make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor); 4233 4234 // When op is MDB_SET_RANGE, this initializes the search. Position 4235 // the cursor at the next key greater than or equal to the specified 4236 // key. 4237 // 4238 // Subsequent searches should use MDB_NEXT 4239 if (mdb_cursor_get(cursor, k, &v, op)) { 4240 // we should only do this if we're going in reverse 4241 if (op == MDB_SET_RANGE && order_op == MDB_PREV) { 4242 // if set range worked and our key exists, it should be 4243 // the one right before this one 4244 if (mdb_cursor_get(cursor, k, &v, MDB_PREV)) 4245 return 0; 4246 } else { 4247 return 0; 4248 } 4249 } 4250 4251 retry: 4252 retries++; 4253 /* 4254 printf("continuing from "); 4255 if (ndb_unpack_text_search_key(k->mv_data, k->mv_size, &search_key)) { 4256 ndb_print_text_search_key(&search_key); 4257 } else { printf("??"); } 4258 printf("\n"); 4259 */ 4260 4261 make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor); 4262 4263 if (unlikely(!ndb_unpack_text_search_key_noteid(&key_cursor, &result->key.note_id))) { 4264 fprintf(stderr, "UNUSUAL: failed to unpack text search key note_id\n"); 4265 return 0; 4266 } 4267 4268 // if the note id doesn't match the last result, then we stop trying 4269 // each search word 4270 if (last_result && last_result->key.note_id != result->key.note_id) { 4271 return 0; 4272 } 4273 4274 // On success, this could still be not related at all. 4275 // It could just be adjacent to the word. Let's check 4276 // if we have a matching prefix at least. 4277 4278 // Before we unpack the entire key, let's quickly 4279 // unpack just the string to check the prefix. We don't 4280 // need to unpack the entire key if the prefix doesn't 4281 // match 4282 if (!ndb_unpack_text_search_key_string(&key_cursor, 4283 &result->key.str, 4284 &result->key.str_len)) { 4285 // this should never happen 4286 fprintf(stderr, "UNUSUAL: failed to unpack text search key string\n"); 4287 return 0; 4288 } 4289 4290 if (!ndb_prefix_matches(result, search_word)) { 4291 /* 4292 printf("result prefix '%.*s' didn't match search word '%.*s'\n", 4293 result->key.str_len, result->key.str, 4294 search_word->word_len, search_word->word); 4295 */ 4296 // we should only do this if we're going in reverse 4297 if (retries == 0 && op == MDB_SET_RANGE && order_op == MDB_PREV) { 4298 // if set range worked and our key exists, it should be 4299 // the one right before this one 4300 mdb_cursor_get(cursor, k, &v, MDB_PREV); 4301 goto retry; 4302 } else { 4303 return 0; 4304 } 4305 } 4306 4307 // Unpack the remaining text search key, we will need this information 4308 // when building up our search results. 4309 if (!ndb_unpack_remaining_text_search_key(&key_cursor, &result->key)) { 4310 // This should never happen 4311 fprintf(stderr, "UNUSUAL: failed to unpack text search key\n"); 4312 return 0; 4313 } 4314 4315 /* 4316 if (last_result) { 4317 if (result->key.word_index < last_result->key.word_index) { 4318 fprintf(stderr, "skipping '%.*s' because it is before last result '%.*s'\n", 4319 result->key.str_len, result->key.str, 4320 last_result->key.str_len, last_result->key.str); 4321 return 0; 4322 } 4323 } 4324 */ 4325 4326 return 1; 4327 } 4328 4329 static void ndb_text_search_results_init( 4330 struct ndb_text_search_results *results) { 4331 results->num_results = 0; 4332 } 4333 4334 void ndb_default_text_search_config(struct ndb_text_search_config *cfg) 4335 { 4336 cfg->order = NDB_ORDER_DESCENDING; 4337 cfg->limit = MAX_TEXT_SEARCH_RESULTS; 4338 } 4339 4340 void ndb_text_search_config_set_order(struct ndb_text_search_config *cfg, 4341 enum ndb_search_order order) 4342 { 4343 cfg->order = order; 4344 } 4345 4346 void ndb_text_search_config_set_limit(struct ndb_text_search_config *cfg, int limit) 4347 { 4348 cfg->limit = limit; 4349 } 4350 4351 static int compare_search_words(const void *pa, const void *pb) 4352 { 4353 struct ndb_word *a, *b; 4354 4355 a = (struct ndb_word *)pa; 4356 b = (struct ndb_word *)pb; 4357 4358 if (a->word_len == b->word_len) { 4359 return 0; 4360 } else if (a->word_len > b->word_len) { 4361 // biggest words should be at the front of the list, 4362 // so we say it's "smaller" here 4363 return -1; 4364 } else { 4365 return 1; 4366 } 4367 } 4368 4369 // Sort search words from largest to smallest. Larger words are less likely 4370 // in the index, allowing our scan to walk fewer words at the root when 4371 // recursively matching. 4372 void sort_largest_to_smallest(struct ndb_search_words *words) 4373 { 4374 qsort(words->words, words->num_words, sizeof(words->words[0]), compare_search_words); 4375 } 4376 4377 4378 int ndb_text_search_with(struct ndb_txn *txn, const char *query, 4379 struct ndb_text_search_results *results, 4380 struct ndb_text_search_config *config, 4381 struct ndb_filter *filter) 4382 { 4383 unsigned char buffer[1024], *buf; 4384 unsigned char saved_buf[1024], *saved; 4385 struct ndb_text_search_result *result, *last_result; 4386 struct ndb_text_search_result candidate, last_candidate; 4387 struct ndb_search_words search_words; 4388 //struct ndb_text_search_key search_key; 4389 struct ndb_word *search_word; 4390 struct ndb_note *note; 4391 struct cursor cur; 4392 uint64_t since, until, timestamp_op, *pint; 4393 size_t note_size; 4394 ndb_text_search_key_order_fn key_order_fn; 4395 MDB_dbi text_db; 4396 MDB_cursor *cursor; 4397 MDB_val k, v; 4398 int i, j, keysize, saved_size, limit; 4399 MDB_cursor_op op, order_op; 4400 4401 note_size = 0; 4402 note = 0; 4403 saved = NULL; 4404 ndb_text_search_results_init(results); 4405 ndb_search_words_init(&search_words); 4406 4407 until = UINT64_MAX; 4408 since = 0; 4409 limit = MAX_TEXT_SEARCH_RESULTS; 4410 4411 // until, since from filter 4412 if (filter != NULL) { 4413 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL))) 4414 until = *pint; 4415 4416 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE))) 4417 since = *pint; 4418 4419 if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT))) 4420 limit = *pint; 4421 } 4422 4423 order_op = MDB_PREV; 4424 key_order_fn = ndb_make_text_search_key_high; 4425 timestamp_op = until; 4426 if (config) { 4427 if (config->order == NDB_ORDER_ASCENDING) { 4428 order_op = MDB_NEXT; 4429 // set the min timestamp value to since when ascending 4430 timestamp_op = since; 4431 key_order_fn = ndb_make_text_search_key_low; 4432 } 4433 limit = min(limit, config->limit); 4434 } 4435 // end search config 4436 4437 text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT]; 4438 make_cursor((unsigned char *)query, (unsigned char *)query + strlen(query), &cur); 4439 4440 ndb_parse_words(&cur, &search_words, ndb_parse_search_words); 4441 if (search_words.num_words == 0) 4442 return 0; 4443 4444 if ((i = mdb_cursor_open(txn->mdb_txn, text_db, &cursor))) { 4445 fprintf(stderr, "nd_text_search: mdb_cursor_open failed, error %d\n", i); 4446 return 0; 4447 } 4448 4449 // This should complete the query quicker because the larger words are 4450 // likely to have fewer entries in the search index. This is not always 4451 // true. Words with higher frequency (like bitcoin on nostr in 2024) 4452 // may be slower. TODO: Skip word recursion by leveraging a minimal 4453 // perfect hashmap of parsed words on a note 4454 sort_largest_to_smallest(&search_words); 4455 4456 // for each word, we recursively find all of the submatches 4457 while (results->num_results < limit) { 4458 last_result = NULL; 4459 result = &results->results[results->num_results]; 4460 4461 // if we have saved, then we continue from the last root search 4462 // sequence 4463 if (saved) { 4464 buf = saved_buf; 4465 saved = NULL; 4466 keysize = saved_size; 4467 4468 k.mv_data = buf; 4469 k.mv_size = saved_size; 4470 4471 // reposition the cursor so we can continue 4472 if (mdb_cursor_get(cursor, &k, &v, MDB_SET_RANGE)) 4473 break; 4474 4475 op = order_op; 4476 } else { 4477 // construct a packed fulltext search key using this 4478 // word. This key doesn't contain any timestamp or index 4479 // info, so it should range match instead of exact 4480 // match 4481 if (!key_order_fn(buffer, sizeof(buffer), 4482 search_words.words[0].word_len, 4483 search_words.words[0].word, 4484 timestamp_op, 4485 &keysize)) 4486 { 4487 // word is too big to fit in 1024-sized key 4488 continue; 4489 } 4490 4491 buf = buffer; 4492 op = MDB_SET_RANGE; 4493 } 4494 4495 for (j = 0; j < search_words.num_words; j++) { 4496 search_word = &search_words.words[j]; 4497 4498 // shouldn't happen but let's be defensive a bit 4499 if (search_word->word_len == 0) 4500 continue; 4501 4502 // if we already matched a note in this phrase, make 4503 // sure we're including the note id in the query 4504 if (last_result) { 4505 // we are narrowing down a search. 4506 // if we already have this note id, just continue 4507 for (i = 0; i < results->num_results; i++) { 4508 if (results->results[i].key.note_id == last_result->key.note_id) 4509 // we can't break here to 4510 // leave the word loop so 4511 // have to use a goto 4512 goto cont; 4513 } 4514 4515 if (!ndb_make_noted_text_search_key( 4516 buffer, sizeof(buffer), 4517 search_word->word_len, 4518 search_word->word, 4519 last_result->key.timestamp, 4520 last_result->key.note_id, 4521 &keysize)) 4522 { 4523 continue; 4524 } 4525 4526 buf = buffer; 4527 } 4528 4529 k.mv_data = buf; 4530 k.mv_size = keysize; 4531 4532 // TODO: we can speed this up with the minimal perfect 4533 // hashmap by quickly rejecting the remaining words 4534 // by looking in the word hashmap on the note. This 4535 // would allow us to skip the recursive word lookup 4536 // thing 4537 if (!ndb_text_search_next_word(cursor, op, &k, 4538 search_word, 4539 last_result, 4540 &candidate, 4541 order_op)) { 4542 // we didn't find a match for this note_id 4543 if (j == 0) 4544 // if we're at one of the root words, 4545 // this means that there are no further 4546 // root word matches for any note, so 4547 // we know we're done 4548 goto done; 4549 else 4550 break; 4551 } 4552 4553 *result = candidate; 4554 op = MDB_SET_RANGE; 4555 4556 // save the first key match, since we will continue from 4557 // this on the next root word result 4558 if (j == 0) { 4559 if (!saved) { 4560 memcpy(saved_buf, k.mv_data, k.mv_size); 4561 saved = saved_buf; 4562 saved_size = k.mv_size; 4563 } 4564 4565 // since we will be trying to match the same 4566 // note_id on all subsequent word matches, 4567 // let's lookup this note and make sure it 4568 // matches the filter if we have one. If it 4569 // doesn't match, we can quickly skip the 4570 // remaining word queries 4571 if (filter) { 4572 if ((note = ndb_get_note_by_key(txn, 4573 result->key.note_id, 4574 ¬e_size))) 4575 { 4576 if (!ndb_filter_matches(filter, note)) { 4577 break; 4578 } 4579 result->note = note; 4580 result->note_size = note_size; 4581 } 4582 } 4583 } 4584 4585 result->note = note; 4586 result->note_size = note_size; 4587 last_candidate = *result; 4588 last_result = &last_candidate; 4589 } 4590 4591 // we matched all of the queries! 4592 if (j == search_words.num_words) { 4593 results->num_results++; 4594 } 4595 4596 cont: 4597 ; 4598 } 4599 4600 done: 4601 mdb_cursor_close(cursor); 4602 4603 return 1; 4604 } 4605 4606 int ndb_text_search(struct ndb_txn *txn, const char *query, 4607 struct ndb_text_search_results *results, 4608 struct ndb_text_search_config *config) 4609 { 4610 return ndb_text_search_with(txn, query, results, config, NULL); 4611 } 4612 4613 static void ndb_write_blocks(struct ndb_txn *txn, uint64_t note_key, 4614 struct ndb_blocks *blocks) 4615 { 4616 int rc; 4617 MDB_val key, val; 4618 4619 // make sure we're not writing the owned flag to the db 4620 blocks->flags &= ~NDB_BLOCK_FLAG_OWNED; 4621 4622 key.mv_data = ¬e_key; 4623 key.mv_size = sizeof(note_key); 4624 val.mv_data = blocks; 4625 val.mv_size = ndb_blocks_total_size(blocks); 4626 assert((val.mv_size % 8) == 0); 4627 4628 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_BLOCKS], &key, &val, 0))) { 4629 ndb_debug("write version to note_blocks failed: %s\n", 4630 mdb_strerror(rc)); 4631 return; 4632 } 4633 } 4634 4635 static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note, 4636 uint64_t note_key, unsigned char *scratch, 4637 size_t scratch_size) 4638 { 4639 size_t content_len; 4640 const char *content; 4641 struct ndb_blocks *blocks; 4642 4643 content_len = ndb_note_content_length(note); 4644 content = ndb_note_content(note); 4645 4646 if (!ndb_parse_content(scratch, scratch_size, content, content_len, &blocks)) { 4647 //ndb_debug("failed to parse content '%.*s'\n", content_len, content); 4648 return 0; 4649 } 4650 4651 ndb_write_blocks(txn, note_key, blocks); 4652 return 1; 4653 } 4654 4655 static uint64_t ndb_write_note(struct ndb_txn *txn, 4656 struct ndb_writer_note *note, 4657 unsigned char *scratch, size_t scratch_size, 4658 uint32_t ndb_flags) 4659 { 4660 int rc, relay_len = 0; 4661 uint64_t note_key, kind; 4662 MDB_dbi note_db; 4663 MDB_val key, val; 4664 4665 kind = note->note->kind; 4666 4667 // let's quickly sanity check if we already have this note 4668 if (ndb_get_notekey_by_id(txn, note->note->id)) 4669 return 0; 4670 4671 // get dbs 4672 note_db = txn->lmdb->dbs[NDB_DB_NOTE]; 4673 4674 // get new key 4675 note_key = ndb_get_last_key(txn->mdb_txn, note_db) + 1; 4676 4677 // write note to event store 4678 key.mv_data = ¬e_key; 4679 key.mv_size = sizeof(note_key); 4680 val.mv_data = note->note; 4681 val.mv_size = note->note_len; 4682 4683 if ((rc = mdb_put(txn->mdb_txn, note_db, &key, &val, 0))) { 4684 ndb_debug("write note to db failed: %s\n", mdb_strerror(rc)); 4685 return 0; 4686 } 4687 4688 if (note->relay != NULL) 4689 relay_len = strlen(note->relay); 4690 4691 ndb_write_note_id_index(txn, note->note, note_key); 4692 ndb_write_note_kind_index(txn, note->note, note_key); 4693 ndb_write_note_tag_index(txn, note->note, note_key); 4694 ndb_write_note_pubkey_index(txn, note->note, note_key); 4695 ndb_write_note_pubkey_kind_index(txn, note->note, note_key); 4696 ndb_write_note_relay_kind_index(txn, kind, note_key, 4697 ndb_note_created_at(note->note), 4698 note->relay, relay_len); 4699 ndb_write_note_relay(txn, note_key, note->relay, relay_len); 4700 4701 // only parse content and do fulltext index on text and longform notes 4702 if (kind == 1 || kind == 30023) { 4703 if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_FULLTEXT)) { 4704 if (!ndb_write_note_fulltext_index(txn, note->note, note_key)) 4705 return 0; 4706 } 4707 4708 // write note blocks 4709 if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_NOTE_BLOCKS)) { 4710 ndb_write_new_blocks(txn, note->note, note_key, scratch, scratch_size); 4711 } 4712 } else if (kind == 7 && !ndb_flag_set(ndb_flags, NDB_FLAG_NO_STATS)) { 4713 ndb_write_reaction_stats(txn, note->note); 4714 } 4715 4716 return note_key; 4717 } 4718 4719 static void ndb_monitor_lock(struct ndb_monitor *mon) { 4720 pthread_mutex_lock(&mon->mutex); 4721 } 4722 4723 static void ndb_monitor_unlock(struct ndb_monitor *mon) { 4724 pthread_mutex_unlock(&mon->mutex); 4725 } 4726 4727 struct written_note { 4728 uint64_t note_id; 4729 struct ndb_writer_note *note; 4730 }; 4731 4732 // When the data has been committed to the database, take all of the written 4733 // notes, check them against subscriptions, and then write to the subscription 4734 // inbox for all matching notes 4735 static void ndb_notify_subscriptions(struct ndb_monitor *monitor, 4736 struct written_note *wrote, int num_notes) 4737 { 4738 int i, k; 4739 int pushed; 4740 struct written_note *written; 4741 struct ndb_note *note; 4742 struct ndb_subscription *sub; 4743 4744 ndb_monitor_lock(monitor); 4745 4746 for (i = 0; i < monitor->num_subscriptions; i++) { 4747 sub = &monitor->subscriptions[i]; 4748 ndb_debug("checking subscription %d, %d notes\n", i, num_notes); 4749 4750 pushed = 0; 4751 for (k = 0; k < num_notes; k++) { 4752 written = &wrote[k]; 4753 note = written->note->note; 4754 4755 if (ndb_filter_group_matches(&sub->group, note)) { 4756 ndb_debug("pushing note\n"); 4757 4758 if (!prot_queue_push(&sub->inbox, &written->note_id)) { 4759 ndb_debug("couldn't push note to subscriber"); 4760 } else { 4761 pushed++; 4762 } 4763 } else { 4764 ndb_debug("not pushing note\n"); 4765 } 4766 } 4767 4768 // After pushing all of the matching notes, check to see if we 4769 // have a registered subscription callback. If so, we call it. 4770 // The callback needs to call ndb_poll_for_notes to pull data 4771 // that was just pushed to the queue in the for loop above. 4772 if (monitor->sub_cb != NULL && pushed > 0) { 4773 monitor->sub_cb(monitor->sub_cb_ctx, sub->subid); 4774 } 4775 } 4776 4777 ndb_monitor_unlock(monitor); 4778 } 4779 4780 uint64_t ndb_write_note_and_profile( 4781 struct ndb_txn *txn, 4782 struct ndb_writer_profile *profile, 4783 unsigned char *scratch, 4784 size_t scratch_size, 4785 uint32_t ndb_flags) 4786 { 4787 uint64_t note_nkey; 4788 4789 note_nkey = ndb_write_note(txn, &profile->note, scratch, scratch_size, ndb_flags); 4790 4791 if (profile->record.builder) { 4792 // only write if parsing didn't fail 4793 ndb_write_profile(txn, profile, note_nkey); 4794 } 4795 4796 return note_nkey; 4797 } 4798 4799 // only to be called from the writer thread 4800 static int ndb_write_version(struct ndb_txn *txn, uint64_t version) 4801 { 4802 int rc; 4803 MDB_val key, val; 4804 uint64_t version_key; 4805 4806 version_key = NDB_META_KEY_VERSION; 4807 4808 key.mv_data = &version_key; 4809 key.mv_size = sizeof(version_key); 4810 val.mv_data = &version; 4811 val.mv_size = sizeof(version); 4812 4813 if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { 4814 ndb_debug("write version to ndb_meta failed: %s\n", 4815 mdb_strerror(rc)); 4816 return 0; 4817 } 4818 4819 //fprintf(stderr, "writing version %" PRIu64 "\n", version); 4820 return 1; 4821 } 4822 4823 4824 static int ndb_run_migrations(struct ndb_txn *txn) 4825 { 4826 int64_t version, latest_version, i; 4827 4828 latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); 4829 4830 if ((version = ndb_db_version(txn)) == -1) { 4831 ndb_debug("run_migrations: no version found, assuming new db\n"); 4832 version = latest_version; 4833 4834 // no version found. fresh db? 4835 if (!ndb_write_version(txn, version)) { 4836 fprintf(stderr, "run_migrations: failed writing db version"); 4837 return 0; 4838 } 4839 4840 return 1; 4841 } else { 4842 ndb_debug("ndb: version %" PRIu64 " found\n", version); 4843 } 4844 4845 if (version < latest_version) 4846 fprintf(stderr, "nostrdb: migrating v%d -> v%d\n", 4847 (int)version, (int)latest_version); 4848 4849 for (i = version; i < latest_version; i++) { 4850 if (!MIGRATIONS[i].fn(txn)) { 4851 fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); 4852 return 0; 4853 } 4854 4855 if (!ndb_write_version(txn, i+1)) { 4856 fprintf(stderr, "run_migrations: failed writing db version"); 4857 return 0; 4858 } 4859 4860 version = i+1; 4861 } 4862 4863 return 1; 4864 } 4865 4866 4867 static void *ndb_writer_thread(void *data) 4868 { 4869 ndb_debug("started writer thread\n"); 4870 struct ndb_writer *writer = data; 4871 struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; 4872 struct written_note written_notes[THREAD_QUEUE_BATCH]; 4873 int i, popped, done, relay_len, needs_commit, num_notes; 4874 uint64_t note_nkey; 4875 struct ndb_txn txn; 4876 unsigned char *scratch; 4877 4878 // 2MB scratch buffer for parsing note content 4879 scratch = malloc(writer->scratch_size); 4880 MDB_txn *mdb_txn = NULL; 4881 ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn); 4882 4883 done = 0; 4884 while (!done) { 4885 txn.mdb_txn = NULL; 4886 num_notes = 0; 4887 ndb_debug("writer waiting for items\n"); 4888 popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); 4889 ndb_debug("writer popped %d items\n", popped); 4890 4891 needs_commit = 0; 4892 for (i = 0 ; i < popped; i++) { 4893 msg = &msgs[i]; 4894 switch (msg->type) { 4895 case NDB_WRITER_NOTE: needs_commit = 1; break; 4896 case NDB_WRITER_PROFILE: needs_commit = 1; break; 4897 case NDB_WRITER_DBMETA: needs_commit = 1; break; 4898 case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break; 4899 case NDB_WRITER_BLOCKS: needs_commit = 1; break; 4900 case NDB_WRITER_MIGRATE: needs_commit = 1; break; 4901 case NDB_WRITER_NOTE_RELAY: needs_commit = 1; break; 4902 case NDB_WRITER_QUIT: break; 4903 } 4904 } 4905 4906 if (needs_commit && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn)) 4907 { 4908 fprintf(stderr, "writer thread txn_begin failed"); 4909 // should definitely not happen unless DB is full 4910 // or something ? 4911 continue; 4912 } 4913 4914 for (i = 0; i < popped; i++) { 4915 msg = &msgs[i]; 4916 4917 switch (msg->type) { 4918 case NDB_WRITER_QUIT: 4919 // quits are handled before this 4920 ndb_debug("writer thread got quit message\n"); 4921 done = 1; 4922 continue; 4923 case NDB_WRITER_PROFILE: 4924 note_nkey = 4925 ndb_write_note_and_profile( 4926 &txn, 4927 &msg->profile, 4928 scratch, 4929 writer->scratch_size, 4930 writer->ndb_flags); 4931 4932 if (note_nkey > 0) { 4933 written_notes[num_notes++] = 4934 (struct written_note){ 4935 .note_id = note_nkey, 4936 .note = &msg->profile.note, 4937 }; 4938 } else { 4939 ndb_debug("failed to write note\n"); 4940 } 4941 break; 4942 case NDB_WRITER_NOTE: 4943 note_nkey = ndb_write_note(&txn, &msg->note, 4944 scratch, 4945 writer->scratch_size, 4946 writer->ndb_flags); 4947 4948 if (note_nkey > 0) { 4949 written_notes[num_notes++] = (struct written_note){ 4950 .note_id = note_nkey, 4951 .note = &msg->note, 4952 }; 4953 } 4954 break; 4955 case NDB_WRITER_NOTE_RELAY: 4956 relay_len = strlen(msg->note_relay.relay); 4957 ndb_write_note_relay(&txn, 4958 msg->note_relay.note_key, 4959 msg->note_relay.relay, 4960 relay_len); 4961 ndb_write_note_relay_kind_index( 4962 &txn, 4963 msg->note_relay.kind, 4964 msg->note_relay.note_key, 4965 msg->note_relay.created_at, 4966 msg->note_relay.relay, 4967 relay_len); 4968 break; 4969 case NDB_WRITER_DBMETA: 4970 ndb_write_version(&txn, msg->ndb_meta.version); 4971 break; 4972 case NDB_WRITER_BLOCKS: 4973 ndb_write_blocks(&txn, msg->blocks.note_key, 4974 msg->blocks.blocks); 4975 break; 4976 case NDB_WRITER_MIGRATE: 4977 if (!ndb_run_migrations(&txn)) { 4978 mdb_txn_abort(txn.mdb_txn); 4979 goto bail; 4980 } 4981 break; 4982 case NDB_WRITER_PROFILE_LAST_FETCH: 4983 ndb_writer_last_profile_fetch(&txn, 4984 msg->last_fetch.pubkey, 4985 msg->last_fetch.fetched_at 4986 ); 4987 break; 4988 } 4989 } 4990 4991 // commit writes 4992 if (needs_commit) { 4993 if (!ndb_end_query(&txn)) { 4994 ndb_debug("writer thread txn commit failed\n"); 4995 } else { 4996 ndb_debug("notifying subscriptions, %d notes\n", num_notes); 4997 ndb_notify_subscriptions(writer->monitor, 4998 written_notes, 4999 num_notes); 5000 // update subscriptions 5001 } 5002 } 5003 5004 // free notes 5005 for (i = 0; i < popped; i++) { 5006 msg = &msgs[i]; 5007 if (msg->type == NDB_WRITER_NOTE) { 5008 free(msg->note.note); 5009 if (msg->note.relay) 5010 free((void*)msg->note.relay); 5011 } else if (msg->type == NDB_WRITER_PROFILE) { 5012 free(msg->profile.note.note); 5013 //ndb_profile_record_builder_free(&msg->profile.record); 5014 } else if (msg->type == NDB_WRITER_BLOCKS) { 5015 ndb_blocks_free(msg->blocks.blocks); 5016 } else if (msg->type == NDB_WRITER_NOTE_RELAY) { 5017 free((void*)msg->note_relay.relay); 5018 } 5019 } 5020 } 5021 5022 bail: 5023 free(scratch); 5024 ndb_debug("quitting writer thread\n"); 5025 return NULL; 5026 } 5027 5028 static void *ndb_ingester_thread(void *data) 5029 { 5030 secp256k1_context *ctx; 5031 struct thread *thread = data; 5032 struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx; 5033 struct ndb_lmdb *lmdb = ingester->lmdb; 5034 struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg; 5035 struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out; 5036 int i, to_write, popped, done, any_event; 5037 MDB_txn *read_txn = NULL; 5038 int rc; 5039 5040 ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY); 5041 ndb_debug("started ingester thread\n"); 5042 5043 done = 0; 5044 while (!done) { 5045 to_write = 0; 5046 any_event = 0; 5047 5048 popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH); 5049 ndb_debug("ingester popped %d items\n", popped); 5050 5051 for (i = 0; i < popped; i++) { 5052 msg = &msgs[i]; 5053 if (msg->type == NDB_INGEST_EVENT) { 5054 any_event = 1; 5055 break; 5056 } 5057 } 5058 5059 if (any_event && (rc = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &read_txn))) { 5060 // this is bad 5061 fprintf(stderr, "UNUSUAL ndb_ingester: mdb_txn_begin failed: '%s'\n", 5062 mdb_strerror(rc)); 5063 continue; 5064 } 5065 5066 for (i = 0; i < popped; i++) { 5067 msg = &msgs[i]; 5068 switch (msg->type) { 5069 case NDB_INGEST_QUIT: 5070 done = 1; 5071 break; 5072 5073 case NDB_INGEST_EVENT: 5074 out = &outs[to_write]; 5075 if (ndb_ingester_process_event(ctx, ingester, 5076 &msg->event, out, 5077 read_txn)) { 5078 to_write++; 5079 } 5080 } 5081 } 5082 5083 if (any_event) 5084 mdb_txn_abort(read_txn); 5085 5086 if (to_write > 0) { 5087 ndb_debug("pushing %d events to write queue\n", to_write); 5088 if (!prot_queue_push_all(ingester->writer_inbox, outs, to_write)) { 5089 ndb_debug("failed pushing %d events to write queue\n", to_write); 5090 } 5091 } 5092 } 5093 5094 ndb_debug("quitting ingester thread\n"); 5095 secp256k1_context_destroy(ctx); 5096 return NULL; 5097 } 5098 5099 5100 static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb, 5101 struct ndb_monitor *monitor, uint32_t ndb_flags, 5102 int scratch_size) 5103 { 5104 writer->lmdb = lmdb; 5105 writer->monitor = monitor; 5106 writer->ndb_flags = ndb_flags; 5107 writer->scratch_size = scratch_size; 5108 writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE; 5109 writer->queue_buf = malloc(writer->queue_buflen); 5110 if (writer->queue_buf == NULL) { 5111 fprintf(stderr, "ndb: failed to allocate space for writer queue"); 5112 return 0; 5113 } 5114 5115 // init the writer queue. 5116 prot_queue_init(&writer->inbox, writer->queue_buf, 5117 writer->queue_buflen, sizeof(struct ndb_writer_msg)); 5118 5119 // spin up the writer thread 5120 if (THREAD_CREATE(writer->thread_id, ndb_writer_thread, writer)) 5121 { 5122 fprintf(stderr, "ndb writer thread failed to create\n"); 5123 return 0; 5124 } 5125 5126 return 1; 5127 } 5128 5129 // initialize the ingester queue and then spawn the thread 5130 static int ndb_ingester_init(struct ndb_ingester *ingester, 5131 struct ndb_lmdb *lmdb, 5132 struct prot_queue *writer_inbox, 5133 const struct ndb_config *config) 5134 { 5135 int elem_size, num_elems; 5136 static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT }; 5137 5138 // TODO: configurable queue sizes 5139 elem_size = sizeof(struct ndb_ingester_msg); 5140 num_elems = DEFAULT_QUEUE_SIZE; 5141 5142 ingester->writer_inbox = writer_inbox; 5143 ingester->lmdb = lmdb; 5144 ingester->flags = config->flags; 5145 ingester->filter = config->ingest_filter; 5146 ingester->filter_context = config->filter_context; 5147 5148 if (!threadpool_init(&ingester->tp, config->ingester_threads, 5149 elem_size, num_elems, &quit_msg, ingester, 5150 ndb_ingester_thread)) 5151 { 5152 fprintf(stderr, "ndb ingester threadpool failed to init\n"); 5153 return 0; 5154 } 5155 5156 return 1; 5157 } 5158 5159 static int ndb_writer_destroy(struct ndb_writer *writer) 5160 { 5161 struct ndb_writer_msg msg; 5162 5163 // kill thread 5164 msg.type = NDB_WRITER_QUIT; 5165 ndb_debug("writer: pushing quit message\n"); 5166 if (!prot_queue_push(&writer->inbox, &msg)) { 5167 // queue is too full to push quit message. just kill it. 5168 ndb_debug("writer: terminating thread\n"); 5169 THREAD_TERMINATE(writer->thread_id); 5170 } else { 5171 ndb_debug("writer: joining thread\n"); 5172 THREAD_FINISH(writer->thread_id); 5173 } 5174 5175 // cleanup 5176 ndb_debug("writer: cleaning up protected queue\n"); 5177 prot_queue_destroy(&writer->inbox); 5178 5179 free(writer->queue_buf); 5180 5181 return 1; 5182 } 5183 5184 static int ndb_ingester_destroy(struct ndb_ingester *ingester) 5185 { 5186 threadpool_destroy(&ingester->tp); 5187 return 1; 5188 } 5189 5190 static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize) 5191 { 5192 int rc; 5193 MDB_txn *txn; 5194 5195 if ((rc = mdb_env_create(&lmdb->env))) { 5196 fprintf(stderr, "mdb_env_create failed, error %d\n", rc); 5197 return 0; 5198 } 5199 5200 if ((rc = mdb_env_set_mapsize(lmdb->env, mapsize))) { 5201 fprintf(stderr, "mdb_env_set_mapsize failed, error %d\n", rc); 5202 return 0; 5203 } 5204 5205 if ((rc = mdb_env_set_maxdbs(lmdb->env, NDB_DBS))) { 5206 fprintf(stderr, "mdb_env_set_maxdbs failed, error %d\n", rc); 5207 return 0; 5208 } 5209 5210 if ((rc = mdb_env_open(lmdb->env, filename, 0, 0664))) { 5211 fprintf(stderr, "mdb_env_open failed, error %d\n", rc); 5212 return 0; 5213 } 5214 5215 // Initialize DBs 5216 if ((rc = mdb_txn_begin(lmdb->env, NULL, 0, &txn))) { 5217 fprintf(stderr, "mdb_txn_begin failed, error %d\n", rc); 5218 return 0; 5219 } 5220 5221 // note flatbuffer db 5222 if ((rc = mdb_dbi_open(txn, "note", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NOTE]))) { 5223 fprintf(stderr, "mdb_dbi_open event failed, error %d\n", rc); 5224 return 0; 5225 } 5226 5227 // note metadata db 5228 if ((rc = mdb_dbi_open(txn, "meta", MDB_CREATE, &lmdb->dbs[NDB_DB_META]))) { 5229 fprintf(stderr, "mdb_dbi_open meta failed, error %d\n", rc); 5230 return 0; 5231 } 5232 5233 // profile flatbuffer db 5234 if ((rc = mdb_dbi_open(txn, "profile", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_PROFILE]))) { 5235 fprintf(stderr, "mdb_dbi_open profile failed, error %d\n", rc); 5236 return 0; 5237 } 5238 5239 // profile search db 5240 if ((rc = mdb_dbi_open(txn, "profile_search", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_SEARCH]))) { 5241 fprintf(stderr, "mdb_dbi_open profile_search failed, error %d\n", rc); 5242 return 0; 5243 } 5244 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_SEARCH], ndb_search_key_cmp); 5245 5246 // ndb metadata (db version, etc) 5247 if ((rc = mdb_dbi_open(txn, "ndb_meta", MDB_CREATE | MDB_INTEGERKEY, &lmdb->dbs[NDB_DB_NDB_META]))) { 5248 fprintf(stderr, "mdb_dbi_open ndb_meta failed, error %d\n", rc); 5249 return 0; 5250 } 5251 5252 // profile last fetches 5253 if ((rc = mdb_dbi_open(txn, "profile_last_fetch", MDB_CREATE, &lmdb->dbs[NDB_DB_PROFILE_LAST_FETCH]))) { 5254 fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); 5255 return 0; 5256 } 5257 5258 // relay kind index. maps <relay_url><kind><created><note_id> primary keys to relay records 5259 // see ndb_relay_kind_cmp function for more details on the key format 5260 if ((rc = mdb_dbi_open(txn, "relay_kind", MDB_CREATE, &lmdb->dbs[NDB_DB_NOTE_RELAY_KIND]))) { 5261 fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); 5262 return 0; 5263 } 5264 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_RELAY_KIND], ndb_relay_kind_cmp); 5265 5266 // note_id -> relay index 5267 if ((rc = mdb_dbi_open(txn, "note_relays", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, &lmdb->dbs[NDB_DB_NOTE_RELAYS]))) { 5268 fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); 5269 return 0; 5270 } 5271 5272 // id+ts index flags 5273 unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; 5274 5275 // index dbs 5276 if ((rc = mdb_dbi_open(txn, "note_id", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_ID]))) { 5277 fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc)); 5278 return 0; 5279 } 5280 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare); 5281 5282 if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) { 5283 fprintf(stderr, "mdb_dbi_open profile_pk failed: %s\n", mdb_strerror(rc)); 5284 return 0; 5285 } 5286 mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare); 5287 5288 if ((rc = mdb_dbi_open(txn, "note_kind", 5289 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 5290 &lmdb->dbs[NDB_DB_NOTE_KIND]))) { 5291 fprintf(stderr, "mdb_dbi_open note_kind failed: %s\n", mdb_strerror(rc)); 5292 return 0; 5293 } 5294 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_ts_compare); 5295 5296 if ((rc = mdb_dbi_open(txn, "note_pubkey", 5297 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 5298 &lmdb->dbs[NDB_DB_NOTE_PUBKEY]))) { 5299 fprintf(stderr, "mdb_dbi_open note_pubkey failed: %s\n", mdb_strerror(rc)); 5300 return 0; 5301 } 5302 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_PUBKEY], ndb_tsid_compare); 5303 5304 if ((rc = mdb_dbi_open(txn, "note_pubkey_kind", 5305 MDB_CREATE | MDB_DUPSORT | MDB_INTEGERDUP | MDB_DUPFIXED, 5306 &lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND]))) { 5307 fprintf(stderr, "mdb_dbi_open note_pubkey_kind failed: %s\n", mdb_strerror(rc)); 5308 return 0; 5309 } 5310 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_PUBKEY_KIND], ndb_id_u64_ts_compare); 5311 5312 if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT, 5313 &lmdb->dbs[NDB_DB_NOTE_TEXT]))) { 5314 fprintf(stderr, "mdb_dbi_open note_text failed: %s\n", mdb_strerror(rc)); 5315 return 0; 5316 } 5317 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare); 5318 5319 if ((rc = mdb_dbi_open(txn, "note_blocks", MDB_CREATE | MDB_INTEGERKEY, 5320 &lmdb->dbs[NDB_DB_NOTE_BLOCKS]))) { 5321 fprintf(stderr, "mdb_dbi_open note_blocks failed: %s\n", mdb_strerror(rc)); 5322 return 0; 5323 } 5324 5325 if ((rc = mdb_dbi_open(txn, "note_tags", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, 5326 &lmdb->dbs[NDB_DB_NOTE_TAGS]))) { 5327 fprintf(stderr, "mdb_dbi_open note_tags failed: %s\n", mdb_strerror(rc)); 5328 return 0; 5329 } 5330 mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TAGS], ndb_tag_key_compare); 5331 5332 // Commit the transaction 5333 if ((rc = mdb_txn_commit(txn))) { 5334 fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc); 5335 return 0; 5336 } 5337 5338 return 1; 5339 } 5340 5341 static int ndb_queue_write_version(struct ndb *ndb, uint64_t version) 5342 { 5343 struct ndb_writer_msg msg; 5344 msg.type = NDB_WRITER_DBMETA; 5345 msg.ndb_meta.version = version; 5346 return ndb_writer_queue_msg(&ndb->writer, &msg); 5347 } 5348 5349 static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb, 5350 void *sub_cb_ctx) 5351 { 5352 monitor->num_subscriptions = 0; 5353 monitor->sub_cb = cb; 5354 monitor->sub_cb_ctx = sub_cb_ctx; 5355 pthread_mutex_init(&monitor->mutex, NULL); 5356 } 5357 5358 void ndb_filter_group_destroy(struct ndb_filter_group *group) 5359 { 5360 struct ndb_filter *filter; 5361 int i; 5362 for (i = 0; i < group->num_filters; i++) { 5363 filter = &group->filters[i]; 5364 ndb_filter_destroy(filter); 5365 } 5366 } 5367 5368 static void ndb_subscription_destroy(struct ndb_subscription *sub) 5369 { 5370 ndb_filter_group_destroy(&sub->group); 5371 // this was malloc'd inside ndb_subscribe 5372 free(sub->inbox.buf); 5373 prot_queue_destroy(&sub->inbox); 5374 sub->subid = 0; 5375 } 5376 5377 static void ndb_monitor_destroy(struct ndb_monitor *monitor) 5378 { 5379 int i; 5380 5381 ndb_monitor_lock(monitor); 5382 5383 for (i = 0; i < monitor->num_subscriptions; i++) { 5384 ndb_subscription_destroy(&monitor->subscriptions[i]); 5385 } 5386 5387 monitor->num_subscriptions = 0; 5388 5389 ndb_monitor_unlock(monitor); 5390 5391 pthread_mutex_destroy(&monitor->mutex); 5392 } 5393 5394 int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config) 5395 { 5396 struct ndb *ndb; 5397 //MDB_dbi ind_id; // TODO: ind_pk, etc 5398 5399 ndb = *pndb = calloc(1, sizeof(struct ndb)); 5400 ndb->flags = config->flags; 5401 5402 if (ndb == NULL) { 5403 fprintf(stderr, "ndb_init: malloc failed\n"); 5404 return 0; 5405 } 5406 5407 if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize)) 5408 return 0; 5409 5410 ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx); 5411 5412 if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor, ndb->flags, 5413 config->writer_scratch_buffer_size)) { 5414 fprintf(stderr, "ndb_writer_init failed\n"); 5415 return 0; 5416 } 5417 5418 if (!ndb_ingester_init(&ndb->ingester, &ndb->lmdb, &ndb->writer.inbox, config)) { 5419 fprintf(stderr, "failed to initialize %d ingester thread(s)\n", 5420 config->ingester_threads); 5421 return 0; 5422 } 5423 5424 if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE)) { 5425 struct ndb_writer_msg msg = { .type = NDB_WRITER_MIGRATE }; 5426 ndb_writer_queue_msg(&ndb->writer, &msg); 5427 } 5428 5429 // Initialize LMDB environment and spin up threads 5430 return 1; 5431 } 5432 5433 void ndb_destroy(struct ndb *ndb) 5434 { 5435 if (ndb == NULL) 5436 return; 5437 5438 // ingester depends on writer and must be destroyed first 5439 ndb_debug("destroying ingester\n"); 5440 ndb_ingester_destroy(&ndb->ingester); 5441 ndb_debug("destroying writer\n"); 5442 ndb_writer_destroy(&ndb->writer); 5443 ndb_debug("destroying monitor\n"); 5444 ndb_monitor_destroy(&ndb->monitor); 5445 5446 ndb_debug("closing env\n"); 5447 mdb_env_close(ndb->lmdb.env); 5448 5449 ndb_debug("ndb destroyed\n"); 5450 free(ndb); 5451 } 5452 5453 5454 // Process a nostr event from a client 5455 // 5456 // ie: ["EVENT", {"content":"..."} ...] 5457 // 5458 // The client-sent variation of ndb_process_event 5459 int ndb_process_client_event(struct ndb *ndb, const char *json, int len) 5460 { 5461 struct ndb_ingest_meta meta; 5462 ndb_ingest_meta_init(&meta, 1, NULL); 5463 5464 return ndb_ingest_event(&ndb->ingester, json, len, &meta); 5465 } 5466 5467 // Process anostr event from a relay, 5468 // 5469 // ie: ["EVENT", "subid", {"content":"..."}...] 5470 // 5471 // This function returns as soon as possible, first copying the passed 5472 // json and then queueing it up for processing. Worker threads then take 5473 // the json and process it. 5474 // 5475 // Processing: 5476 // 5477 // 1. The event is parsed into ndb_notes and the signature is validated 5478 // 2. A quick lookup is made on the database to see if we already have 5479 // the note id, if we do we don't need to waste time on json parsing 5480 // or note validation. 5481 // 3. Once validation is done we pass it to the writer queue for writing 5482 // to LMDB. 5483 // 5484 int ndb_process_event(struct ndb *ndb, const char *json, int json_len) 5485 { 5486 struct ndb_ingest_meta meta; 5487 ndb_ingest_meta_init(&meta, 0, NULL); 5488 5489 return ndb_ingest_event(&ndb->ingester, json, json_len, &meta); 5490 } 5491 5492 int ndb_process_event_with(struct ndb *ndb, const char *json, int json_len, 5493 struct ndb_ingest_meta *meta) 5494 { 5495 return ndb_ingest_event(&ndb->ingester, json, json_len, meta); 5496 } 5497 5498 int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, 5499 struct ndb_ingest_meta *meta) 5500 { 5501 const char *start, *end, *very_end; 5502 start = ldjson; 5503 end = start + json_len; 5504 very_end = ldjson + json_len; 5505 #if DEBUG 5506 int processed = 0; 5507 #endif 5508 5509 while ((end = fast_strchr(start, '\n', very_end - start))) { 5510 //printf("processing '%.*s'\n", (int)(end-start), start); 5511 if (!ndb_process_event_with(ndb, start, end - start, meta)) { 5512 ndb_debug("ndb_process_client_event failed\n"); 5513 return 0; 5514 } 5515 start = end + 1; 5516 #if DEBUG 5517 processed++; 5518 #endif 5519 } 5520 5521 #if DEBUG 5522 ndb_debug("ndb_process_events: processed %d events\n", processed); 5523 #endif 5524 5525 return 1; 5526 } 5527 5528 #ifndef _WIN32 5529 // TODO: windows 5530 int ndb_process_events_stream(struct ndb *ndb, FILE* fp) 5531 { 5532 char *line = NULL; 5533 size_t len = 0; 5534 ssize_t nread; 5535 5536 while ((nread = getline(&line, &len, fp)) != -1) { 5537 if (line == NULL) 5538 break; 5539 ndb_process_event(ndb, line, len); 5540 } 5541 5542 if (line) 5543 free(line); 5544 5545 return 1; 5546 } 5547 #endif 5548 5549 int ndb_process_events_with(struct ndb *ndb, const char *ldjson, size_t json_len, 5550 struct ndb_ingest_meta *meta) 5551 { 5552 return _ndb_process_events(ndb, ldjson, json_len, meta); 5553 } 5554 5555 int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len) 5556 { 5557 struct ndb_ingest_meta meta; 5558 ndb_ingest_meta_init(&meta, 1, NULL); 5559 5560 return _ndb_process_events(ndb, ldjson, json_len, &meta); 5561 } 5562 5563 int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len) 5564 { 5565 struct ndb_ingest_meta meta; 5566 ndb_ingest_meta_init(&meta, 0, NULL); 5567 5568 return _ndb_process_events(ndb, ldjson, json_len, &meta); 5569 } 5570 5571 static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag) 5572 { 5573 return cursor_push_u16(cur, tag->count); 5574 } 5575 5576 int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf, 5577 size_t bufsize) 5578 { 5579 struct ndb_note *note; 5580 int half, size, str_indices_size; 5581 5582 // come on bruh 5583 if (bufsize < sizeof(struct ndb_note) * 2) 5584 return 0; 5585 5586 str_indices_size = bufsize / 32; 5587 size = bufsize - str_indices_size; 5588 half = size / 2; 5589 5590 //debug("size %d half %d str_indices %d\n", size, half, str_indices_size); 5591 5592 // make a safe cursor of our available memory 5593 make_cursor(buf, buf + bufsize, &builder->mem); 5594 5595 note = builder->note = (struct ndb_note *)buf; 5596 5597 // take slices of the memory into subcursors 5598 if (!(cursor_slice(&builder->mem, &builder->note_cur, half) && 5599 cursor_slice(&builder->mem, &builder->strings, half) && 5600 cursor_slice(&builder->mem, &builder->str_indices, str_indices_size))) { 5601 return 0; 5602 } 5603 5604 memset(note, 0, sizeof(*note)); 5605 builder->note_cur.p += sizeof(*note); 5606 5607 note->strings = builder->strings.start - buf; 5608 note->version = 1; 5609 5610 return 1; 5611 } 5612 5613 5614 5615 static inline int ndb_json_parser_init(struct ndb_json_parser *p, 5616 const char *json, int json_len, 5617 unsigned char *buf, int bufsize) 5618 { 5619 int half = bufsize / 2; 5620 5621 unsigned char *tok_start = buf + half; 5622 unsigned char *tok_end = buf + bufsize; 5623 5624 p->toks = (jsmntok_t*)tok_start; 5625 p->toks_end = (jsmntok_t*)tok_end; 5626 p->num_tokens = 0; 5627 p->json = json; 5628 p->json_len = json_len; 5629 5630 // ndb_builder gets the first half of the buffer, and jsmn gets the 5631 // second half. I like this way of alloating memory (without actually 5632 // dynamically allocating memory). You get one big chunk upfront and 5633 // then submodules can recursively subdivide it. Maybe you could do 5634 // something even more clever like golden-ratio style subdivision where 5635 // the more important stuff gets a larger chunk and then it spirals 5636 // downward into smaller chunks. Thanks for coming to my TED talk. 5637 5638 if (!ndb_builder_init(&p->builder, buf, half)) 5639 return 0; 5640 5641 jsmn_init(&p->json_parser); 5642 5643 return 1; 5644 } 5645 5646 static inline int ndb_json_parser_parse(struct ndb_json_parser *p, 5647 struct ndb_id_cb *cb) 5648 { 5649 jsmntok_t *tok; 5650 int cap = ((unsigned char *)p->toks_end - (unsigned char*)p->toks)/sizeof(*p->toks); 5651 int res = 5652 jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap, cb != NULL); 5653 5654 // got an ID! 5655 if (res == -42) { 5656 tok = &p->toks[p->json_parser.toknext-1]; 5657 5658 switch (cb->fn(cb->data, p->json + tok->start)) { 5659 case NDB_IDRES_CONT: 5660 res = jsmn_parse(&p->json_parser, p->json, p->json_len, 5661 p->toks, cap, 0); 5662 break; 5663 case NDB_IDRES_STOP: 5664 return -42; 5665 } 5666 } else if (res == 0) { 5667 return 0; 5668 } 5669 5670 p->num_tokens = res; 5671 p->i = 0; 5672 5673 return 1; 5674 } 5675 5676 static inline int toksize(jsmntok_t *tok) 5677 { 5678 return tok->end - tok->start; 5679 } 5680 5681 5682 5683 static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2) 5684 { 5685 switch (c2) { 5686 case 't': return cursor_push_byte(cur, '\t'); 5687 case 'n': return cursor_push_byte(cur, '\n'); 5688 case 'r': return cursor_push_byte(cur, '\r'); 5689 case 'b': return cursor_push_byte(cur, '\b'); 5690 case 'f': return cursor_push_byte(cur, '\f'); 5691 case '\\': return cursor_push_byte(cur, '\\'); 5692 case '/': return cursor_push_byte(cur, '/'); 5693 case '"': return cursor_push_byte(cur, '"'); 5694 case 'u': 5695 // these aren't handled yet 5696 return 0; 5697 default: 5698 return cursor_push_byte(cur, c1) && cursor_push_byte(cur, c2); 5699 } 5700 } 5701 5702 static int cursor_push_escaped_char(struct cursor *cur, char c) 5703 { 5704 switch (c) { 5705 case '"': return cursor_push_str(cur, "\\\""); 5706 case '\\': return cursor_push_str(cur, "\\\\"); 5707 case '\b': return cursor_push_str(cur, "\\b"); 5708 case '\f': return cursor_push_str(cur, "\\f"); 5709 case '\n': return cursor_push_str(cur, "\\n"); 5710 case '\r': return cursor_push_str(cur, "\\r"); 5711 case '\t': return cursor_push_str(cur, "\\t"); 5712 // TODO: \u hex hex hex hex 5713 } 5714 return cursor_push_byte(cur, c); 5715 } 5716 5717 static int cursor_push_hex_str(struct cursor *cur, unsigned char *buf, int len) 5718 { 5719 int i; 5720 5721 if (len % 2 != 0) 5722 return 0; 5723 5724 if (!cursor_push_byte(cur, '"')) 5725 return 0; 5726 5727 for (i = 0; i < len; i++) { 5728 unsigned int c = ((const unsigned char *)buf)[i]; 5729 if (!cursor_push_byte(cur, hexchar(c >> 4))) 5730 return 0; 5731 if (!cursor_push_byte(cur, hexchar(c & 0xF))) 5732 return 0; 5733 } 5734 5735 if (!cursor_push_byte(cur, '"')) 5736 return 0; 5737 5738 return 1; 5739 } 5740 5741 static int cursor_push_jsonstr(struct cursor *cur, const char *str) 5742 { 5743 int i; 5744 int len; 5745 5746 len = strlen(str); 5747 5748 if (!cursor_push_byte(cur, '"')) 5749 return 0; 5750 5751 for (i = 0; i < len; i++) { 5752 if (!cursor_push_escaped_char(cur, str[i])) 5753 return 0; 5754 } 5755 5756 if (!cursor_push_byte(cur, '"')) 5757 return 0; 5758 5759 return 1; 5760 } 5761 5762 5763 static inline int cursor_push_json_tag_str(struct cursor *cur, struct ndb_str str) 5764 { 5765 if (str.flag == NDB_PACKED_ID) 5766 return cursor_push_hex_str(cur, str.id, 32); 5767 5768 return cursor_push_jsonstr(cur, str.str); 5769 } 5770 5771 static int cursor_push_json_tag(struct cursor *cur, struct ndb_note *note, 5772 struct ndb_tag *tag) 5773 { 5774 int i; 5775 5776 if (!cursor_push_byte(cur, '[')) 5777 return 0; 5778 5779 for (i = 0; i < tag->count; i++) { 5780 if (!cursor_push_json_tag_str(cur, ndb_tag_str(note, tag, i))) 5781 return 0; 5782 if (i != tag->count-1 && !cursor_push_byte(cur, ',')) 5783 return 0; 5784 } 5785 5786 return cursor_push_byte(cur, ']'); 5787 } 5788 5789 static int cursor_push_json_tags(struct cursor *cur, struct ndb_note *note) 5790 { 5791 int i; 5792 struct ndb_iterator iter, *it = &iter; 5793 ndb_tags_iterate_start(note, it); 5794 5795 if (!cursor_push_byte(cur, '[')) 5796 return 0; 5797 5798 i = 0; 5799 while (ndb_tags_iterate_next(it)) { 5800 if (!cursor_push_json_tag(cur, note, it->tag)) 5801 return 0; 5802 if (i != note->tags.count-1 && !cursor_push_str(cur, ",")) 5803 return 0; 5804 i++; 5805 } 5806 5807 if (!cursor_push_byte(cur, ']')) 5808 return 0; 5809 5810 return 1; 5811 } 5812 5813 static int ndb_event_commitment(struct ndb_note *ev, unsigned char *buf, int buflen) 5814 { 5815 char timebuf[16] = {0}; 5816 char kindbuf[16] = {0}; 5817 char pubkey[65]; 5818 struct cursor cur; 5819 int ok; 5820 5821 if (!hex_encode(ev->pubkey, sizeof(ev->pubkey), pubkey)) 5822 return 0; 5823 5824 make_cursor(buf, buf + buflen, &cur); 5825 5826 // TODO: update in 2106 ... 5827 snprintf(timebuf, sizeof(timebuf), "%d", (uint32_t)ev->created_at); 5828 snprintf(kindbuf, sizeof(kindbuf), "%d", ev->kind); 5829 5830 ok = 5831 cursor_push_str(&cur, "[0,\"") && 5832 cursor_push_str(&cur, pubkey) && 5833 cursor_push_str(&cur, "\",") && 5834 cursor_push_str(&cur, timebuf) && 5835 cursor_push_str(&cur, ",") && 5836 cursor_push_str(&cur, kindbuf) && 5837 cursor_push_str(&cur, ",") && 5838 cursor_push_json_tags(&cur, ev) && 5839 cursor_push_str(&cur, ",") && 5840 cursor_push_jsonstr(&cur, ndb_note_str(ev, &ev->content).str) && 5841 cursor_push_str(&cur, "]"); 5842 5843 if (!ok) 5844 return 0; 5845 5846 return cur.p - cur.start; 5847 } 5848 5849 static int cursor_push_hex(struct cursor *c, unsigned char *bytes, int len) 5850 { 5851 int i; 5852 unsigned char chr; 5853 if (c->p + (len * 2) >= c->end) 5854 return 0; 5855 5856 for (i = 0; i < len; i++) { 5857 chr = bytes[i]; 5858 5859 *(c->p++) = hexchar(chr >> 4); 5860 *(c->p++) = hexchar(chr & 0xF); 5861 } 5862 5863 return 1; 5864 } 5865 5866 static int cursor_push_int_str(struct cursor *c, uint64_t num) 5867 { 5868 char timebuf[16] = {0}; 5869 snprintf(timebuf, sizeof(timebuf), "%" PRIu64, num); 5870 return cursor_push_str(c, timebuf); 5871 } 5872 5873 int ndb_note_json(struct ndb_note *note, char *buf, int buflen) 5874 { 5875 struct cursor cur, *c = &cur; 5876 5877 make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, &cur); 5878 5879 int ok = cursor_push_str(c, "{\"id\":\"") && 5880 cursor_push_hex(c, ndb_note_id(note), 32) && 5881 cursor_push_str(c, "\",\"pubkey\":\"") && 5882 cursor_push_hex(c, ndb_note_pubkey(note), 32) && 5883 cursor_push_str(c, "\",\"created_at\":") && 5884 cursor_push_int_str(c, ndb_note_created_at(note)) && 5885 cursor_push_str(c, ",\"kind\":") && 5886 cursor_push_int_str(c, ndb_note_kind(note)) && 5887 cursor_push_str(c, ",\"tags\":") && 5888 cursor_push_json_tags(c, note) && 5889 cursor_push_str(c, ",\"content\":") && 5890 cursor_push_jsonstr(c, ndb_note_content(note)) && 5891 cursor_push_str(c, ",\"sig\":\"") && 5892 cursor_push_hex(c, ndb_note_sig(note), 64) && 5893 cursor_push_c_str(c, "\"}"); 5894 5895 if (!ok) { 5896 return 0; 5897 } 5898 5899 return cur.p - cur.start; 5900 } 5901 5902 static int cursor_push_json_elem_array(struct cursor *cur, 5903 const struct ndb_filter *filter, 5904 struct ndb_filter_elements *elems) 5905 { 5906 int i; 5907 unsigned char *id; 5908 const char *str; 5909 uint64_t val; 5910 5911 if (!cursor_push_byte(cur, '[')) 5912 return 0; 5913 5914 for (i = 0; i < elems->count; i++) { 5915 5916 switch (elems->field.elem_type) { 5917 case NDB_ELEMENT_STRING: 5918 str = ndb_filter_get_string_element(filter, elems, i); 5919 if (!cursor_push_jsonstr(cur, str)) 5920 return 0; 5921 break; 5922 case NDB_ELEMENT_ID: 5923 id = ndb_filter_get_id_element(filter, elems, i); 5924 if (!cursor_push_hex_str(cur, id, 32)) 5925 return 0; 5926 break; 5927 case NDB_ELEMENT_INT: 5928 val = ndb_filter_get_int_element(elems, i); 5929 if (!cursor_push_int_str(cur, val)) 5930 return 0; 5931 break; 5932 case NDB_ELEMENT_UNKNOWN: 5933 ndb_debug("unknown element in cursor_push_json_elem_array"); 5934 return 0; 5935 } 5936 5937 if (i != elems->count-1) { 5938 if (!cursor_push_byte(cur, ',')) 5939 return 0; 5940 } 5941 } 5942 5943 if (!cursor_push_str(cur, "]")) 5944 return 0; 5945 5946 return 1; 5947 } 5948 5949 int ndb_filter_json(const struct ndb_filter *filter, char *buf, int buflen) 5950 { 5951 const char *str; 5952 struct cursor cur, *c = &cur; 5953 struct ndb_filter_elements *elems; 5954 int i; 5955 5956 if (!filter->finalized) { 5957 ndb_debug("filter not finalized in ndb_filter_json\n"); 5958 return 0; 5959 } 5960 5961 make_cursor((unsigned char *)buf, (unsigned char*)buf + buflen, c); 5962 5963 if (!cursor_push_str(c, "{")) 5964 return 0; 5965 5966 for (i = 0; i < filter->num_elements; i++) { 5967 elems = ndb_filter_get_elements(filter, i); 5968 switch (elems->field.type) { 5969 case NDB_FILTER_IDS: 5970 if (!cursor_push_str(c, "\"ids\":")) 5971 return 0; 5972 if (!cursor_push_json_elem_array(c, filter, elems)) 5973 return 0; 5974 break; 5975 case NDB_FILTER_SEARCH: 5976 if (!cursor_push_str(c, "\"search\":")) 5977 return 0; 5978 if (!(str = ndb_filter_get_string_element(filter, elems, 0))) 5979 return 0; 5980 if (!cursor_push_jsonstr(c, str)) 5981 return 0; 5982 break; 5983 case NDB_FILTER_AUTHORS: 5984 if (!cursor_push_str(c, "\"authors\":")) 5985 return 0; 5986 if (!cursor_push_json_elem_array(c, filter, elems)) 5987 return 0; 5988 break; 5989 case NDB_FILTER_KINDS: 5990 if (!cursor_push_str(c, "\"kinds\":")) 5991 return 0; 5992 if (!cursor_push_json_elem_array(c, filter, elems)) 5993 return 0; 5994 break; 5995 case NDB_FILTER_TAGS: 5996 if (!cursor_push_str(c, "\"#")) 5997 return 0; 5998 if (!cursor_push_byte(c, elems->field.tag)) 5999 return 0; 6000 if (!cursor_push_str(c, "\":")) 6001 return 0; 6002 if (!cursor_push_json_elem_array(c, filter, elems)) 6003 return 0; 6004 break; 6005 case NDB_FILTER_SINCE: 6006 if (!cursor_push_str(c, "\"since\":")) 6007 return 0; 6008 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 6009 return 0; 6010 break; 6011 case NDB_FILTER_UNTIL: 6012 if (!cursor_push_str(c, "\"until\":")) 6013 return 0; 6014 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 6015 return 0; 6016 break; 6017 case NDB_FILTER_LIMIT: 6018 if (!cursor_push_str(c, "\"limit\":")) 6019 return 0; 6020 if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0))) 6021 return 0; 6022 break; 6023 } 6024 6025 if (i != filter->num_elements-1) { 6026 if (!cursor_push_byte(c, ',')) { 6027 return 0; 6028 } 6029 } 6030 6031 } 6032 6033 if (!cursor_push_c_str(c, "}")) 6034 return 0; 6035 6036 return cur.p - cur.start; 6037 } 6038 6039 int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen) { 6040 int len; 6041 6042 if (!(len = ndb_event_commitment(note, buf, buflen))) 6043 return 0; 6044 6045 //fprintf(stderr, "%.*s\n", len, buf); 6046 6047 sha256((struct sha256*)note->id, buf, len); 6048 6049 return 1; 6050 } 6051 6052 int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32], 6053 unsigned char sig[64]) 6054 { 6055 unsigned char aux[32]; 6056 secp256k1_keypair *pair = (secp256k1_keypair*) keypair->pair; 6057 6058 if (!fill_random(aux, sizeof(aux))) 6059 return 0; 6060 6061 secp256k1_context *ctx = 6062 secp256k1_context_create(SECP256K1_CONTEXT_NONE); 6063 6064 return secp256k1_schnorrsig_sign32(ctx, sig, id, pair, aux); 6065 } 6066 6067 int ndb_create_keypair(struct ndb_keypair *kp) 6068 { 6069 secp256k1_keypair *keypair = (secp256k1_keypair*)kp->pair; 6070 secp256k1_xonly_pubkey pubkey; 6071 6072 secp256k1_context *ctx = 6073 secp256k1_context_create(SECP256K1_CONTEXT_NONE);; 6074 6075 /* Try to create a keypair with a valid context, it should only 6076 * fail if the secret key is zero or out of range. */ 6077 if (!secp256k1_keypair_create(ctx, keypair, kp->secret)) 6078 return 0; 6079 6080 if (!secp256k1_keypair_xonly_pub(ctx, &pubkey, NULL, keypair)) 6081 return 0; 6082 6083 /* Serialize the public key. Should always return 1 for a valid public key. */ 6084 return secp256k1_xonly_pubkey_serialize(ctx, kp->pubkey, &pubkey); 6085 } 6086 6087 int ndb_decode_key(const char *secstr, struct ndb_keypair *keypair) 6088 { 6089 if (!hex_decode(secstr, strlen(secstr), keypair->secret, 32)) { 6090 fprintf(stderr, "could not hex decode secret key\n"); 6091 return 0; 6092 } 6093 6094 return ndb_create_keypair(keypair); 6095 } 6096 6097 int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note, 6098 struct ndb_keypair *keypair) 6099 { 6100 int strings_len = builder->strings.p - builder->strings.start; 6101 unsigned char *note_end = builder->note_cur.p + strings_len; 6102 int total_size = note_end - builder->note_cur.start; 6103 6104 // move the strings buffer next to the end of our ndb_note 6105 memmove(builder->note_cur.p, builder->strings.start, strings_len); 6106 6107 // set the strings location 6108 builder->note->strings = builder->note_cur.p - builder->note_cur.start; 6109 6110 // record the total size 6111 //builder->note->size = total_size; 6112 6113 *note = builder->note; 6114 6115 // generate id and sign if we're building this manually 6116 if (keypair) { 6117 // use the remaining memory for building our id buffer 6118 unsigned char *end = builder->mem.end; 6119 unsigned char *start = (unsigned char*)(*note) + total_size; 6120 6121 ndb_builder_set_pubkey(builder, keypair->pubkey); 6122 6123 if (!ndb_calculate_id(builder->note, start, end - start)) 6124 return 0; 6125 6126 if (!ndb_sign_id(keypair, (*note)->id, (*note)->sig)) 6127 return 0; 6128 } 6129 6130 // make sure we're aligned as a whole 6131 total_size = (total_size + 7) & ~7; 6132 assert((total_size % 8) == 0); 6133 return total_size; 6134 } 6135 6136 struct ndb_note * ndb_builder_note(struct ndb_builder *builder) 6137 { 6138 return builder->note; 6139 } 6140 6141 static union ndb_packed_str ndb_offset_str(uint32_t offset) 6142 { 6143 // ensure accidents like -1 don't corrupt our packed_str 6144 union ndb_packed_str str; 6145 // most significant byte is reserved for ndb_packtype 6146 str.offset = offset & 0xFFFFFF; 6147 return str; 6148 } 6149 6150 6151 /// find an existing string via str_indices. these indices only exist in the 6152 /// builder phase just for this purpose. 6153 static inline int ndb_builder_find_str(struct ndb_builder *builder, 6154 const char *str, int len, 6155 union ndb_packed_str *pstr) 6156 { 6157 // find existing matching string to avoid duplicate strings 6158 int indices = cursor_count(&builder->str_indices, sizeof(uint32_t)); 6159 for (int i = 0; i < indices; i++) { 6160 uint32_t index = ((uint32_t*)builder->str_indices.start)[i]; 6161 const char *some_str = (const char*)builder->strings.start + index; 6162 6163 if (!memcmp(some_str, str, len) && some_str[len] == '\0') { 6164 // found an existing matching str, use that index 6165 *pstr = ndb_offset_str(index); 6166 return 1; 6167 } 6168 } 6169 6170 return 0; 6171 } 6172 6173 static int ndb_builder_push_str(struct ndb_builder *builder, const char *str, 6174 int len, union ndb_packed_str *pstr) 6175 { 6176 uint32_t loc; 6177 6178 // no string found, push a new one 6179 loc = builder->strings.p - builder->strings.start; 6180 if (!(cursor_push(&builder->strings, (unsigned char*)str, len) && 6181 cursor_push_byte(&builder->strings, '\0'))) { 6182 return 0; 6183 } 6184 6185 *pstr = ndb_offset_str(loc); 6186 6187 // record in builder indices. ignore return value, if we can't cache it 6188 // then whatever 6189 cursor_push_u32(&builder->str_indices, loc); 6190 6191 return 1; 6192 } 6193 6194 static int ndb_builder_push_packed_id(struct ndb_builder *builder, 6195 unsigned char *id, 6196 union ndb_packed_str *pstr) 6197 { 6198 // Don't both find id duplicates. very rarely are they duplicated 6199 // and it slows things down quite a bit. If we really care about this 6200 // We can switch to a hash table. 6201 //if (ndb_builder_find_str(builder, (const char*)id, 32, pstr)) { 6202 // pstr->packed.flag = NDB_PACKED_ID; 6203 // return 1; 6204 //} 6205 6206 if (ndb_builder_push_str(builder, (const char*)id, 32, pstr)) { 6207 pstr->packed.flag = NDB_PACKED_ID; 6208 return 1; 6209 } 6210 6211 return 0; 6212 } 6213 6214 union ndb_packed_str ndb_chars_to_packed_str(char c1, char c2) 6215 { 6216 union ndb_packed_str str; 6217 str.packed.flag = NDB_PACKED_STR; 6218 str.packed.str[0] = c1; 6219 str.packed.str[1] = c2; 6220 str.packed.str[2] = '\0'; 6221 return str; 6222 } 6223 6224 static union ndb_packed_str ndb_char_to_packed_str(char c) 6225 { 6226 union ndb_packed_str str; 6227 str.packed.flag = NDB_PACKED_STR; 6228 str.packed.str[0] = c; 6229 str.packed.str[1] = '\0'; 6230 return str; 6231 } 6232 6233 6234 /// Check for small strings to pack 6235 static inline int ndb_builder_try_compact_str(struct ndb_builder *builder, 6236 const char *str, int len, 6237 union ndb_packed_str *pstr, 6238 int pack_ids) 6239 { 6240 unsigned char id_buf[32]; 6241 6242 if (len == 0) { 6243 *pstr = ndb_char_to_packed_str(0); 6244 return 1; 6245 } else if (len == 1) { 6246 *pstr = ndb_char_to_packed_str(str[0]); 6247 return 1; 6248 } else if (len == 2) { 6249 *pstr = ndb_chars_to_packed_str(str[0], str[1]); 6250 return 1; 6251 } else if (pack_ids && len == 64 && hex_decode(str, 64, id_buf, 32)) { 6252 return ndb_builder_push_packed_id(builder, id_buf, pstr); 6253 } 6254 6255 return 0; 6256 } 6257 6258 6259 static int ndb_builder_push_unpacked_str(struct ndb_builder *builder, 6260 const char *str, int len, 6261 union ndb_packed_str *pstr) 6262 { 6263 if (ndb_builder_find_str(builder, str, len, pstr)) 6264 return 1; 6265 6266 return ndb_builder_push_str(builder, str, len, pstr); 6267 } 6268 6269 int ndb_builder_make_str(struct ndb_builder *builder, const char *str, int len, 6270 union ndb_packed_str *pstr, int pack_ids) 6271 { 6272 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 6273 return 1; 6274 6275 return ndb_builder_push_unpacked_str(builder, str, len, pstr); 6276 } 6277 6278 int ndb_builder_set_content(struct ndb_builder *builder, const char *content, 6279 int len) 6280 { 6281 int pack_ids = 0; 6282 builder->note->content_length = len; 6283 return ndb_builder_make_str(builder, content, len, 6284 &builder->note->content, pack_ids); 6285 } 6286 6287 6288 static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len, 6289 const char *s) 6290 { 6291 if (tok->type == JSMN_STRING && (int)strlen(s) == tok_len && 6292 memcmp(json + tok->start, s, tok_len) == 0) { 6293 return 1; 6294 } 6295 return 0; 6296 } 6297 6298 static int ndb_builder_finalize_tag(struct ndb_builder *builder, 6299 union ndb_packed_str offset) 6300 { 6301 if (!cursor_push_u32(&builder->note_cur, offset.offset)) 6302 return 0; 6303 builder->current_tag->count++; 6304 return 1; 6305 } 6306 6307 /// Unescape and push json strings 6308 static int ndb_builder_make_json_str(struct ndb_builder *builder, 6309 const char *str, int len, 6310 union ndb_packed_str *pstr, 6311 int *written, int pack_ids) 6312 { 6313 // let's not care about de-duping these. we should just unescape 6314 // in-place directly into the strings table. 6315 if (written) 6316 *written = len; 6317 6318 const char *p, *end, *start; 6319 unsigned char *builder_start; 6320 6321 // always try compact strings first 6322 if (ndb_builder_try_compact_str(builder, str, len, pstr, pack_ids)) 6323 return 1; 6324 6325 end = str + len; 6326 start = str; // Initialize start to the beginning of the string 6327 6328 *pstr = ndb_offset_str(builder->strings.p - builder->strings.start); 6329 builder_start = builder->strings.p; 6330 6331 for (p = str; p < end; p++) { 6332 if (*p == '\\' && p+1 < end) { 6333 // Push the chunk of unescaped characters before this escape sequence 6334 if (start < p && !cursor_push(&builder->strings, 6335 (unsigned char *)start, 6336 p - start)) { 6337 return 0; 6338 } 6339 6340 if (!cursor_push_unescaped_char(&builder->strings, *p, *(p+1))) 6341 return 0; 6342 6343 p++; // Skip the character following the backslash 6344 start = p + 1; // Update the start pointer to the next character 6345 } 6346 } 6347 6348 // Handle the last chunk after the last escape sequence (or if there are no escape sequences at all) 6349 if (start < p && !cursor_push(&builder->strings, (unsigned char *)start, 6350 p - start)) { 6351 return 0; 6352 } 6353 6354 if (written) 6355 *written = builder->strings.p - builder_start; 6356 6357 // TODO: dedupe these!? 6358 return cursor_push_byte(&builder->strings, '\0'); 6359 } 6360 6361 static int ndb_builder_push_json_tag(struct ndb_builder *builder, 6362 const char *str, int len) 6363 { 6364 union ndb_packed_str pstr; 6365 int pack_ids = 1; 6366 if (!ndb_builder_make_json_str(builder, str, len, &pstr, NULL, pack_ids)) 6367 return 0; 6368 return ndb_builder_finalize_tag(builder, pstr); 6369 } 6370 6371 // Push a json array into an ndb tag ["p", "abcd..."] -> struct ndb_tag 6372 static int ndb_builder_tag_from_json_array(struct ndb_json_parser *p, 6373 jsmntok_t *array) 6374 { 6375 jsmntok_t *str_tok; 6376 const char *str; 6377 6378 if (array->size == 0) 6379 return 0; 6380 6381 if (!ndb_builder_new_tag(&p->builder)) 6382 return 0; 6383 6384 for (int i = 0; i < array->size; i++) { 6385 str_tok = &array[i+1]; 6386 str = p->json + str_tok->start; 6387 6388 if (!ndb_builder_push_json_tag(&p->builder, str, 6389 toksize(str_tok))) { 6390 return 0; 6391 } 6392 } 6393 6394 return 1; 6395 } 6396 6397 // Push json tags into ndb data 6398 // [["t", "hashtag"], ["p", "abcde..."]] -> struct ndb_tags 6399 static inline int ndb_builder_process_json_tags(struct ndb_json_parser *p, 6400 jsmntok_t *array) 6401 { 6402 jsmntok_t *tag = array; 6403 6404 if (array->size == 0) 6405 return 1; 6406 6407 for (int i = 0; i < array->size; i++) { 6408 if (!ndb_builder_tag_from_json_array(p, &tag[i+1])) 6409 return 0; 6410 tag += tag[i+1].size; 6411 } 6412 6413 return 1; 6414 } 6415 6416 static int parse_unsigned_int(const char *start, int len, unsigned int *num) 6417 { 6418 unsigned int number = 0; 6419 const char *p = start, *end = start + len; 6420 int digits = 0; 6421 6422 while (p < end) { 6423 char c = *p; 6424 6425 if (c < '0' || c > '9') 6426 break; 6427 6428 // Check for overflow 6429 char digit = c - '0'; 6430 if (number > (UINT_MAX - digit) / 10) 6431 return 0; // Overflow detected 6432 6433 number = number * 10 + digit; 6434 6435 p++; 6436 digits++; 6437 } 6438 6439 if (digits == 0) 6440 return 0; 6441 6442 *num = number; 6443 return 1; 6444 } 6445 6446 int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce, 6447 unsigned char *buf, int bufsize, struct ndb_id_cb *cb) 6448 { 6449 jsmntok_t *tok = NULL; 6450 int tok_len, res; 6451 struct ndb_json_parser parser; 6452 struct ndb_event *ev = &fce->event; 6453 6454 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6455 6456 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 6457 return res; 6458 6459 if (parser.toks[0].type == JSMN_OBJECT) { 6460 ndb_debug("got raw json in client_event_from_json\n"); 6461 fce->evtype = NDB_FCE_EVENT; 6462 return ndb_parse_json_note(&parser, &ev->note); 6463 } 6464 6465 if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY) 6466 return 0; 6467 6468 parser.i = 1; 6469 tok = &parser.toks[parser.i++]; 6470 tok_len = toksize(tok); 6471 if (tok->type != JSMN_STRING) 6472 return 0; 6473 6474 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 6475 fce->evtype = NDB_FCE_EVENT; 6476 return ndb_parse_json_note(&parser, &ev->note); 6477 } 6478 6479 return 0; 6480 } 6481 6482 6483 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, 6484 unsigned char *buf, int bufsize, 6485 struct ndb_id_cb *cb) 6486 { 6487 jsmntok_t *tok = NULL; 6488 int tok_len, res; 6489 struct ndb_json_parser parser; 6490 struct ndb_event *ev = &tce->event; 6491 6492 tce->subid_len = 0; 6493 tce->subid = ""; 6494 6495 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6496 6497 if ((res = ndb_json_parser_parse(&parser, cb)) < 0) 6498 return res; 6499 6500 if (parser.toks[0].type == JSMN_OBJECT) { 6501 ndb_debug("got raw json in ws_event_from_json\n"); 6502 tce->evtype = NDB_TCE_EVENT; 6503 return ndb_parse_json_note(&parser, &ev->note); 6504 } 6505 6506 if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY) 6507 return 0; 6508 6509 parser.i = 1; 6510 tok = &parser.toks[parser.i++]; 6511 tok_len = toksize(tok); 6512 if (tok->type != JSMN_STRING) 6513 return 0; 6514 6515 if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) { 6516 tce->evtype = NDB_TCE_EVENT; 6517 6518 tok = &parser.toks[parser.i++]; 6519 if (tok->type != JSMN_STRING) 6520 return 0; 6521 6522 tce->subid = json + tok->start; 6523 tce->subid_len = toksize(tok); 6524 6525 return ndb_parse_json_note(&parser, &ev->note); 6526 } else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) { 6527 tce->evtype = NDB_TCE_EOSE; 6528 6529 tok = &parser.toks[parser.i++]; 6530 if (tok->type != JSMN_STRING) 6531 return 0; 6532 6533 tce->subid = json + tok->start; 6534 tce->subid_len = toksize(tok); 6535 return 1; 6536 } else if (tok_len == 2 && !memcmp("OK", json + tok->start, 2)) { 6537 if (parser.num_tokens != 5) 6538 return 0; 6539 6540 struct ndb_command_result *cr = &tce->command_result; 6541 6542 tce->evtype = NDB_TCE_OK; 6543 6544 tok = &parser.toks[parser.i++]; 6545 if (tok->type != JSMN_STRING) 6546 return 0; 6547 6548 tce->subid = json + tok->start; 6549 tce->subid_len = toksize(tok); 6550 6551 tok = &parser.toks[parser.i++]; 6552 if (tok->type != JSMN_PRIMITIVE || toksize(tok) == 0) 6553 return 0; 6554 6555 cr->ok = (json + tok->start)[0] == 't'; 6556 6557 tok = &parser.toks[parser.i++]; 6558 if (tok->type != JSMN_STRING) 6559 return 0; 6560 6561 tce->command_result.msg = json + tok->start; 6562 tce->command_result.msglen = toksize(tok); 6563 6564 return 1; 6565 } else if (tok_len == 4 && !memcmp("AUTH", json + tok->start, 4)) { 6566 tce->evtype = NDB_TCE_AUTH; 6567 6568 tok = &parser.toks[parser.i++]; 6569 if (tok->type != JSMN_STRING) 6570 return 0; 6571 6572 tce->subid = json + tok->start; 6573 tce->subid_len = toksize(tok); 6574 6575 return 1; 6576 } 6577 6578 return 0; 6579 } 6580 6581 static enum ndb_filter_fieldtype 6582 ndb_filter_parse_field(const char *tok, int len, char *tagchar) 6583 { 6584 *tagchar = 0; 6585 6586 if (len == 0) 6587 return 0; 6588 6589 if (len == 7 && !strncmp(tok, "authors", 7)) { 6590 return NDB_FILTER_AUTHORS; 6591 } else if (len == 3 && !strncmp(tok, "ids", 3)) { 6592 return NDB_FILTER_IDS; 6593 } else if (len == 5 && !strncmp(tok, "kinds", 5)) { 6594 return NDB_FILTER_KINDS; 6595 } else if (len == 2 && tok[0] == '#') { 6596 *tagchar = tok[1]; 6597 return NDB_FILTER_TAGS; 6598 } else if (len == 5 && !strncmp(tok, "since", 5)) { 6599 return NDB_FILTER_SINCE; 6600 } else if (len == 5 && !strncmp(tok, "until", 5)) { 6601 return NDB_FILTER_UNTIL; 6602 } else if (len == 5 && !strncmp(tok, "limit", 5)) { 6603 return NDB_FILTER_LIMIT; 6604 } else if (len == 6 && !strncmp(tok, "search", 6)) { 6605 return NDB_FILTER_SEARCH; 6606 } 6607 6608 return 0; 6609 } 6610 6611 static int ndb_filter_parse_json_ids(struct ndb_json_parser *parser, 6612 struct ndb_filter *filter) 6613 { 6614 jsmntok_t *tok; 6615 const char *start; 6616 unsigned char hexbuf[32]; 6617 int tok_len, i, size; 6618 6619 tok = &parser->toks[parser->i++]; 6620 6621 if (tok->type != JSMN_ARRAY) { 6622 ndb_debug("parse_json_ids: not an array\n"); 6623 return 0; 6624 } 6625 6626 size = tok->size; 6627 6628 for (i = 0; i < size; parser->i++, i++) { 6629 tok = &parser->toks[parser->i]; 6630 start = parser->json + tok->start; 6631 tok_len = toksize(tok); 6632 6633 if (tok->type != JSMN_STRING) { 6634 ndb_debug("parse_json_ids: not a string '%d'\n", tok->type); 6635 return 0; 6636 } 6637 6638 if (tok_len != 64) { 6639 ndb_debug("parse_json_ids: not len 64: '%.*s'\n", tok_len, start); 6640 return 0; 6641 } 6642 6643 // id 6644 if (!hex_decode(start, tok_len, hexbuf, sizeof(hexbuf))) { 6645 ndb_debug("parse_json_ids: hex decode failed\n"); 6646 return 0; 6647 } 6648 6649 ndb_debug("adding id elem\n"); 6650 if (!ndb_filter_add_id_element(filter, hexbuf)) { 6651 ndb_debug("parse_json_ids: failed to add id element\n"); 6652 return 0; 6653 } 6654 } 6655 6656 parser->i--; 6657 return 1; 6658 } 6659 6660 static int ndb_filter_parse_json_elems(struct ndb_json_parser *parser, 6661 struct ndb_filter *filter) 6662 { 6663 jsmntok_t *tok; 6664 const char *start; 6665 int tok_len; 6666 unsigned char hexbuf[32]; 6667 enum ndb_generic_element_type typ; 6668 tok = NULL; 6669 int i, size; 6670 6671 tok = &parser->toks[parser->i++]; 6672 6673 if (tok->type != JSMN_ARRAY) 6674 return 0; 6675 6676 size = tok->size; 6677 6678 for (i = 0; i < size; i++, parser->i++) { 6679 tok = &parser->toks[parser->i]; 6680 start = parser->json + tok->start; 6681 tok_len = toksize(tok); 6682 6683 if (tok->type != JSMN_STRING) 6684 return 0; 6685 6686 if (i == 0) { 6687 if (tok_len == 64 && hex_decode(start, 64, hexbuf, sizeof(hexbuf))) { 6688 typ = NDB_ELEMENT_ID; 6689 if (!ndb_filter_add_id_element(filter, hexbuf)) { 6690 ndb_debug("failed to push id elem\n"); 6691 return 0; 6692 } 6693 } else { 6694 typ = NDB_ELEMENT_STRING; 6695 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 6696 return 0; 6697 } 6698 } else if (typ == NDB_ELEMENT_ID) { 6699 if (!hex_decode(start, 64, hexbuf, sizeof(hexbuf))) 6700 return 0; 6701 if (!ndb_filter_add_id_element(filter, hexbuf)) 6702 return 0; 6703 } else if (typ == NDB_ELEMENT_STRING) { 6704 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 6705 return 0; 6706 } else { 6707 // ??? 6708 return 0; 6709 } 6710 } 6711 6712 parser->i--; 6713 return 1; 6714 } 6715 6716 static int ndb_filter_parse_json_str(struct ndb_json_parser *parser, 6717 struct ndb_filter *filter) 6718 { 6719 jsmntok_t *tok; 6720 const char *start; 6721 int tok_len; 6722 6723 tok = &parser->toks[parser->i]; 6724 start = parser->json + tok->start; 6725 tok_len = toksize(tok); 6726 6727 if (tok->type != JSMN_STRING) 6728 return 0; 6729 6730 if (!ndb_filter_add_str_element_len(filter, start, tok_len)) 6731 return 0; 6732 6733 ndb_debug("added str elem '%.*s'\n", tok_len, start); 6734 6735 return 1; 6736 } 6737 6738 static int ndb_filter_parse_json_int(struct ndb_json_parser *parser, 6739 struct ndb_filter *filter) 6740 { 6741 jsmntok_t *tok; 6742 const char *start; 6743 int tok_len; 6744 unsigned int value; 6745 6746 tok = &parser->toks[parser->i]; 6747 start = parser->json + tok->start; 6748 tok_len = toksize(tok); 6749 6750 if (tok->type != JSMN_PRIMITIVE) 6751 return 0; 6752 6753 if (!parse_unsigned_int(start, tok_len, &value)) 6754 return 0; 6755 6756 if (!ndb_filter_add_int_element(filter, (uint64_t)value)) 6757 return 0; 6758 6759 ndb_debug("added int elem %d\n", value); 6760 6761 return 1; 6762 } 6763 6764 6765 static int ndb_filter_parse_json_ints(struct ndb_json_parser *parser, 6766 struct ndb_filter *filter) 6767 { 6768 jsmntok_t *tok; 6769 int size, i; 6770 6771 tok = &parser->toks[parser->i++]; 6772 6773 if (tok->type != JSMN_ARRAY) 6774 return 0; 6775 6776 size = tok->size; 6777 6778 for (i = 0; i < size; parser->i++, i++) { 6779 if (!ndb_filter_parse_json_int(parser, filter)) 6780 return 0; 6781 } 6782 6783 parser->i--; 6784 return 1; 6785 } 6786 6787 6788 static int ndb_filter_parse_json(struct ndb_json_parser *parser, 6789 struct ndb_filter *filter) 6790 { 6791 jsmntok_t *tok = NULL; 6792 const char *json = parser->json; 6793 const char *start; 6794 char tag; 6795 int tok_len; 6796 enum ndb_filter_fieldtype field; 6797 6798 if (parser->toks[parser->i++].type != JSMN_OBJECT) 6799 return 0; 6800 6801 for (; parser->i < parser->num_tokens; parser->i++) { 6802 tok = &parser->toks[parser->i++]; 6803 start = json + tok->start; 6804 tok_len = toksize(tok); 6805 6806 if (!(field = ndb_filter_parse_field(start, tok_len, &tag))) { 6807 ndb_debug("failed field '%.*s'\n", tok_len, start); 6808 continue; 6809 } 6810 6811 if (tag) { 6812 ndb_debug("starting tag field '%c'\n", tag); 6813 if (!ndb_filter_start_tag_field(filter, tag)) { 6814 ndb_debug("failed to start tag field '%c'\n", tag); 6815 return 0; 6816 } 6817 } else if (!ndb_filter_start_field(filter, field)) { 6818 ndb_debug("field already started\n"); 6819 return 0; 6820 } 6821 6822 // we parsed a top-level field 6823 switch(field) { 6824 case NDB_FILTER_AUTHORS: 6825 case NDB_FILTER_IDS: 6826 if (!ndb_filter_parse_json_ids(parser, filter)) { 6827 ndb_debug("failed to parse filter ids/authors\n"); 6828 return 0; 6829 } 6830 break; 6831 case NDB_FILTER_SEARCH: 6832 if (!ndb_filter_parse_json_str(parser, filter)) { 6833 ndb_debug("failed to parse filter search str\n"); 6834 return 0; 6835 } 6836 break; 6837 case NDB_FILTER_SINCE: 6838 case NDB_FILTER_UNTIL: 6839 case NDB_FILTER_LIMIT: 6840 if (!ndb_filter_parse_json_int(parser, filter)) { 6841 ndb_debug("failed to parse filter since/until/limit\n"); 6842 return 0; 6843 } 6844 break; 6845 case NDB_FILTER_KINDS: 6846 if (!ndb_filter_parse_json_ints(parser, filter)) { 6847 ndb_debug("failed to parse filter kinds\n"); 6848 return 0; 6849 } 6850 break; 6851 case NDB_FILTER_TAGS: 6852 if (!ndb_filter_parse_json_elems(parser, filter)) { 6853 ndb_debug("failed to parse filter tags\n"); 6854 return 0; 6855 } 6856 break; 6857 } 6858 6859 ndb_filter_end_field(filter); 6860 } 6861 6862 return ndb_filter_end(filter); 6863 } 6864 6865 int ndb_parse_json_note(struct ndb_json_parser *parser, struct ndb_note **note) 6866 { 6867 jsmntok_t *tok = NULL; 6868 unsigned char hexbuf[64]; 6869 const char *json = parser->json; 6870 const char *start; 6871 int i, tok_len, parsed; 6872 6873 parsed = 0; 6874 6875 if (parser->toks[parser->i].type != JSMN_OBJECT) 6876 return 0; 6877 6878 // TODO: build id buffer and verify at end 6879 6880 for (i = parser->i + 1; i < parser->num_tokens; i++) { 6881 tok = &parser->toks[i]; 6882 start = json + tok->start; 6883 tok_len = toksize(tok); 6884 6885 //printf("toplevel %.*s %d\n", tok_len, json + tok->start, tok->type); 6886 if (tok_len == 0 || i + 1 >= parser->num_tokens) 6887 continue; 6888 6889 if (start[0] == 'p' && jsoneq(json, tok, tok_len, "pubkey")) { 6890 // pubkey 6891 tok = &parser->toks[i+1]; 6892 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6893 parsed |= NDB_PARSED_PUBKEY; 6894 ndb_builder_set_pubkey(&parser->builder, hexbuf); 6895 } else if (tok_len == 2 && start[0] == 'i' && start[1] == 'd') { 6896 // id 6897 tok = &parser->toks[i+1]; 6898 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6899 parsed |= NDB_PARSED_ID; 6900 ndb_builder_set_id(&parser->builder, hexbuf); 6901 } else if (tok_len == 3 && start[0] == 's' && start[1] == 'i' && start[2] == 'g') { 6902 // sig 6903 tok = &parser->toks[i+1]; 6904 hex_decode(json + tok->start, toksize(tok), hexbuf, sizeof(hexbuf)); 6905 parsed |= NDB_PARSED_SIG; 6906 ndb_builder_set_sig(&parser->builder, hexbuf); 6907 } else if (start[0] == 'k' && jsoneq(json, tok, tok_len, "kind")) { 6908 // kind 6909 tok = &parser->toks[i+1]; 6910 start = json + tok->start; 6911 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 6912 return 0; 6913 if (!parse_unsigned_int(start, toksize(tok), 6914 &parser->builder.note->kind)) 6915 return 0; 6916 parsed |= NDB_PARSED_KIND; 6917 } else if (start[0] == 'c') { 6918 if (jsoneq(json, tok, tok_len, "created_at")) { 6919 // created_at 6920 tok = &parser->toks[i+1]; 6921 start = json + tok->start; 6922 if (tok->type != JSMN_PRIMITIVE || tok_len <= 0) 6923 return 0; 6924 // TODO: update to int64 in 2106 ... xD 6925 unsigned int bigi; 6926 if (!parse_unsigned_int(start, toksize(tok), &bigi)) 6927 return 0; 6928 parser->builder.note->created_at = bigi; 6929 parsed |= NDB_PARSED_CREATED_AT; 6930 } else if (jsoneq(json, tok, tok_len, "content")) { 6931 // content 6932 tok = &parser->toks[i+1]; 6933 union ndb_packed_str pstr; 6934 tok_len = toksize(tok); 6935 int written, pack_ids = 0; 6936 if (!ndb_builder_make_json_str(&parser->builder, 6937 json + tok->start, 6938 tok_len, &pstr, 6939 &written, pack_ids)) { 6940 ndb_debug("ndb_builder_make_json_str failed\n"); 6941 return 0; 6942 } 6943 parser->builder.note->content_length = written; 6944 parser->builder.note->content = pstr; 6945 parsed |= NDB_PARSED_CONTENT; 6946 } 6947 } else if (start[0] == 't' && jsoneq(json, tok, tok_len, "tags")) { 6948 tok = &parser->toks[i+1]; 6949 ndb_builder_process_json_tags(parser, tok); 6950 i += tok->size; 6951 parsed |= NDB_PARSED_TAGS; 6952 } 6953 } 6954 6955 //ndb_debug("parsed %d = %d, &->%d", parsed, NDB_PARSED_ALL, parsed & NDB_PARSED_ALL); 6956 if (parsed != NDB_PARSED_ALL) 6957 return 0; 6958 6959 return ndb_builder_finalize(&parser->builder, note, NULL); 6960 } 6961 6962 int ndb_filter_from_json(const char *json, int len, struct ndb_filter *filter, 6963 unsigned char *buf, int bufsize) 6964 { 6965 struct ndb_json_parser parser; 6966 int res; 6967 6968 if (filter->finalized) 6969 return 0; 6970 6971 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6972 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 6973 return res; 6974 6975 if (parser.num_tokens < 1) 6976 return 0; 6977 6978 return ndb_filter_parse_json(&parser, filter); 6979 } 6980 6981 int ndb_note_from_json(const char *json, int len, struct ndb_note **note, 6982 unsigned char *buf, int bufsize) 6983 { 6984 struct ndb_json_parser parser; 6985 int res; 6986 6987 ndb_json_parser_init(&parser, json, len, buf, bufsize); 6988 if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) 6989 return res; 6990 6991 if (parser.num_tokens < 1) 6992 return 0; 6993 6994 return ndb_parse_json_note(&parser, note); 6995 } 6996 6997 void ndb_builder_set_pubkey(struct ndb_builder *builder, unsigned char *pubkey) 6998 { 6999 memcpy(builder->note->pubkey, pubkey, 32); 7000 } 7001 7002 void ndb_builder_set_id(struct ndb_builder *builder, unsigned char *id) 7003 { 7004 memcpy(builder->note->id, id, 32); 7005 } 7006 7007 void ndb_builder_set_sig(struct ndb_builder *builder, unsigned char *sig) 7008 { 7009 memcpy(builder->note->sig, sig, 64); 7010 } 7011 7012 void ndb_builder_set_kind(struct ndb_builder *builder, uint32_t kind) 7013 { 7014 builder->note->kind = kind; 7015 } 7016 7017 void ndb_builder_set_created_at(struct ndb_builder *builder, uint64_t created_at) 7018 { 7019 builder->note->created_at = created_at; 7020 } 7021 7022 int ndb_builder_new_tag(struct ndb_builder *builder) 7023 { 7024 builder->note->tags.count++; 7025 struct ndb_tag tag = {0}; 7026 builder->current_tag = (struct ndb_tag *)builder->note_cur.p; 7027 return cursor_push_tag(&builder->note_cur, &tag); 7028 } 7029 7030 void ndb_stat_counts_init(struct ndb_stat_counts *counts) 7031 { 7032 counts->count = 0; 7033 counts->key_size = 0; 7034 counts->value_size = 0; 7035 } 7036 7037 static void ndb_stat_init(struct ndb_stat *stat) 7038 { 7039 // init stats 7040 int i; 7041 7042 for (i = 0; i < NDB_CKIND_COUNT; i++) { 7043 ndb_stat_counts_init(&stat->common_kinds[i]); 7044 } 7045 7046 for (i = 0; i < NDB_DBS; i++) { 7047 ndb_stat_counts_init(&stat->dbs[i]); 7048 } 7049 7050 ndb_stat_counts_init(&stat->other_kinds); 7051 } 7052 7053 int ndb_stat(struct ndb *ndb, struct ndb_stat *stat) 7054 { 7055 int rc; 7056 MDB_cursor *cur; 7057 MDB_val k, v; 7058 MDB_dbi db; 7059 struct ndb_txn txn; 7060 struct ndb_note *note; 7061 int i; 7062 enum ndb_common_kind common_kind; 7063 7064 // initialize to 0 7065 ndb_stat_init(stat); 7066 7067 if (!ndb_begin_query(ndb, &txn)) { 7068 fprintf(stderr, "ndb_stat failed at ndb_begin_query\n"); 7069 return 0; 7070 } 7071 7072 // stat each dbi in the database 7073 for (i = 0; i < NDB_DBS; i++) 7074 { 7075 db = ndb->lmdb.dbs[i]; 7076 7077 if ((rc = mdb_cursor_open(txn.mdb_txn, db, &cur))) { 7078 fprintf(stderr, "ndb_stat: mdb_cursor_open failed, error '%s'\n", 7079 mdb_strerror(rc)); 7080 return 0; 7081 } 7082 7083 // loop over every entry and count kv sizes 7084 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 7085 // we gather more detailed per-kind stats if we're in 7086 // the notes db 7087 if (i == NDB_DB_NOTE) { 7088 note = v.mv_data; 7089 common_kind = ndb_kind_to_common_kind(note->kind); 7090 7091 // uncommon kind? just count them in bulk 7092 if ((int)common_kind == -1) { 7093 stat->other_kinds.count++; 7094 stat->other_kinds.key_size += k.mv_size; 7095 stat->other_kinds.value_size += v.mv_size; 7096 } else { 7097 stat->common_kinds[common_kind].count++; 7098 stat->common_kinds[common_kind].key_size += k.mv_size; 7099 stat->common_kinds[common_kind].value_size += v.mv_size; 7100 } 7101 } 7102 7103 stat->dbs[i].count++; 7104 stat->dbs[i].key_size += k.mv_size; 7105 stat->dbs[i].value_size += v.mv_size; 7106 } 7107 7108 // close the cursor, they are per-dbi 7109 mdb_cursor_close(cur); 7110 } 7111 7112 ndb_end_query(&txn); 7113 7114 return 1; 7115 } 7116 7117 /// Push an element to the current tag 7118 /// 7119 /// Basic idea is to call ndb_builder_new_tag 7120 int ndb_builder_push_tag_str(struct ndb_builder *builder, 7121 const char *str, int len) 7122 { 7123 union ndb_packed_str pstr; 7124 int pack_ids = 1; 7125 if (!ndb_builder_make_str(builder, str, len, &pstr, pack_ids)) 7126 return 0; 7127 return ndb_builder_finalize_tag(builder, pstr); 7128 } 7129 7130 // 7131 // CONFIG 7132 // 7133 void ndb_default_config(struct ndb_config *config) 7134 { 7135 int cores = get_cpu_cores(); 7136 config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB 7137 config->ingester_threads = cores == -1 ? 4 : cores; 7138 config->flags = 0; 7139 config->ingest_filter = NULL; 7140 config->filter_context = NULL; 7141 config->sub_cb_ctx = NULL; 7142 config->sub_cb = NULL; 7143 config->writer_scratch_buffer_size = DEFAULT_WRITER_SCRATCH_SIZE; 7144 } 7145 7146 void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *context) 7147 { 7148 config->sub_cb_ctx = context; 7149 config->sub_cb = fn; 7150 } 7151 7152 void ndb_config_set_writer_scratch_buffer_size(struct ndb_config *config, int scratch_size) 7153 { 7154 config->writer_scratch_buffer_size = scratch_size; 7155 } 7156 7157 void ndb_config_set_ingest_threads(struct ndb_config *config, int threads) 7158 { 7159 config->ingester_threads = threads; 7160 } 7161 7162 void ndb_config_set_flags(struct ndb_config *config, int flags) 7163 { 7164 config->flags = flags; 7165 } 7166 7167 void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize) 7168 { 7169 config->mapsize = mapsize; 7170 } 7171 7172 void ndb_config_set_ingest_filter(struct ndb_config *config, 7173 ndb_ingest_filter_fn fn, void *filter_ctx) 7174 { 7175 config->ingest_filter = fn; 7176 config->filter_context = filter_ctx; 7177 } 7178 7179 int ndb_print_relay_kind_index(struct ndb_txn *txn) 7180 { 7181 MDB_cursor *cur; 7182 MDB_val k, v; 7183 int i; 7184 7185 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAY_KIND], &cur)) 7186 return 0; 7187 7188 i = 1; 7189 printf("relay\tkind\tcreated_at\tnote_id\n"); 7190 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 7191 printf("%s\t", (const char *)(k.mv_data + 25)); 7192 printf("%" PRIu64 "\t", *(uint64_t*)(k.mv_data + 8)); 7193 printf("%" PRIu64 "\t", *(uint64_t*)(k.mv_data + 16)); 7194 printf("%" PRIu64 "\n", *(uint64_t*)(k.mv_data + 0)); 7195 i++; 7196 } 7197 7198 return i; 7199 } 7200 7201 int ndb_print_tag_index(struct ndb_txn *txn) 7202 { 7203 MDB_cursor *cur; 7204 MDB_val k, v; 7205 int i; 7206 7207 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TAGS], &cur)) 7208 return 0; 7209 7210 i = 1; 7211 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 7212 printf("%d ", i); 7213 print_tag_kv(txn, &k, &v); 7214 i++; 7215 } 7216 7217 return 1; 7218 } 7219 7220 int ndb_print_kind_keys(struct ndb_txn *txn) 7221 { 7222 MDB_cursor *cur; 7223 MDB_val k, v; 7224 int i; 7225 struct ndb_u64_ts *tsid; 7226 7227 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_KIND], &cur)) 7228 return 0; 7229 7230 i = 1; 7231 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 7232 tsid = k.mv_data; 7233 printf("%d note_kind %" PRIu64 " %" PRIu64 "\n", 7234 i, tsid->u64, tsid->timestamp); 7235 7236 i++; 7237 } 7238 7239 return 1; 7240 } 7241 7242 // used by ndb.c 7243 int ndb_print_search_keys(struct ndb_txn *txn) 7244 { 7245 MDB_cursor *cur; 7246 MDB_val k, v; 7247 int i; 7248 struct ndb_text_search_key search_key; 7249 7250 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TEXT], &cur)) 7251 return 0; 7252 7253 i = 1; 7254 while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) { 7255 if (!ndb_unpack_text_search_key(k.mv_data, k.mv_size, &search_key)) { 7256 fprintf(stderr, "error decoding key %d\n", i); 7257 continue; 7258 } 7259 7260 ndb_print_text_search_key(k.mv_size, &search_key); 7261 printf("\n"); 7262 7263 i++; 7264 } 7265 7266 return 1; 7267 } 7268 7269 struct ndb_tags *ndb_note_tags(struct ndb_note *note) 7270 { 7271 return ¬e->tags; 7272 } 7273 7274 struct ndb_str ndb_note_str(struct ndb_note *note, union ndb_packed_str *pstr) 7275 { 7276 struct ndb_str str; 7277 str.flag = pstr->packed.flag; 7278 7279 if (str.flag == NDB_PACKED_STR) { 7280 str.str = pstr->packed.str; 7281 return str; 7282 } 7283 7284 str.str = ((const char *)note) + note->strings + (pstr->offset & 0xFFFFFF); 7285 return str; 7286 } 7287 7288 struct ndb_str ndb_tag_str(struct ndb_note *note, struct ndb_tag *tag, int ind) 7289 { 7290 return ndb_note_str(note, &tag->strs[ind]); 7291 } 7292 7293 int ndb_str_len(struct ndb_str *str) 7294 { 7295 if (str->flag == NDB_PACKED_ID) 7296 return 32; 7297 return strlen(str->str); 7298 } 7299 7300 struct ndb_str ndb_iter_tag_str(struct ndb_iterator *iter, int ind) 7301 { 7302 return ndb_tag_str(iter->note, iter->tag, ind); 7303 } 7304 7305 unsigned char * ndb_note_id(struct ndb_note *note) 7306 { 7307 return note->id; 7308 } 7309 7310 unsigned char * ndb_note_pubkey(struct ndb_note *note) 7311 { 7312 return note->pubkey; 7313 } 7314 7315 unsigned char * ndb_note_sig(struct ndb_note *note) 7316 { 7317 return note->sig; 7318 } 7319 7320 uint32_t ndb_note_created_at(struct ndb_note *note) 7321 { 7322 return note->created_at; 7323 } 7324 7325 uint32_t ndb_note_kind(struct ndb_note *note) 7326 { 7327 return note->kind; 7328 } 7329 7330 void _ndb_note_set_kind(struct ndb_note *note, uint32_t kind) 7331 { 7332 note->kind = kind; 7333 } 7334 7335 const char *ndb_note_content(struct ndb_note *note) 7336 { 7337 return ndb_note_str(note, ¬e->content).str; 7338 } 7339 7340 uint32_t ndb_note_content_length(struct ndb_note *note) 7341 { 7342 return note->content_length; 7343 } 7344 7345 struct ndb_note * ndb_note_from_bytes(unsigned char *bytes) 7346 { 7347 struct ndb_note *note = (struct ndb_note *)bytes; 7348 if (note->version != 1) 7349 return 0; 7350 return note; 7351 } 7352 7353 int ndb_note_relay_iterate_start(struct ndb_txn *txn, 7354 struct ndb_note_relay_iterator *iter, 7355 uint64_t note_key) 7356 { 7357 if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAYS], 7358 (MDB_cursor**)&iter->mdb_cur)) { 7359 return 0; 7360 } 7361 7362 iter->txn = txn; 7363 iter->cursor_op = MDB_SET_KEY; 7364 iter->note_key = note_key; 7365 7366 return 1; 7367 } 7368 7369 const char *ndb_note_relay_iterate_next(struct ndb_note_relay_iterator *iter) 7370 { 7371 int rc; 7372 MDB_val k, v; 7373 7374 if (iter->mdb_cur == NULL) 7375 return NULL; 7376 7377 k.mv_data = &iter->note_key; 7378 k.mv_size = sizeof(iter->note_key); 7379 7380 if ((rc = mdb_cursor_get((MDB_cursor *)iter->mdb_cur, &k, &v, 7381 (MDB_cursor_op)iter->cursor_op))) 7382 { 7383 //fprintf(stderr, "autoclosing %d '%s'\n", iter->cursor_op, mdb_strerror(rc)); 7384 // autoclose 7385 ndb_note_relay_iterate_close(iter); 7386 return NULL; 7387 } 7388 7389 iter->cursor_op = MDB_NEXT_DUP; 7390 7391 return (const char*)v.mv_data; 7392 } 7393 7394 void ndb_note_relay_iterate_close(struct ndb_note_relay_iterator *iter) 7395 { 7396 if (!iter || iter->mdb_cur == NULL) 7397 return; 7398 7399 mdb_cursor_close((MDB_cursor*)iter->mdb_cur); 7400 7401 iter->mdb_cur = NULL; 7402 } 7403 7404 void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter) 7405 { 7406 iter->note = note; 7407 iter->tag = NULL; 7408 iter->index = -1; 7409 } 7410 7411 // Helper function to get a pointer to the nth tag 7412 static struct ndb_tag *ndb_tags_tag(struct ndb_tags *tags, size_t index) { 7413 return (struct ndb_tag *)((uint8_t *)tags + sizeof(struct ndb_tags) + index * sizeof(struct ndb_tag)); 7414 } 7415 7416 int ndb_tags_iterate_next(struct ndb_iterator *iter) 7417 { 7418 struct ndb_tags *tags; 7419 7420 if (iter->tag == NULL || iter->index == -1) { 7421 iter->tag = ndb_tags_tag(&iter->note->tags, 0); 7422 iter->index = 0; 7423 return iter->note->tags.count != 0; 7424 } 7425 7426 tags = &iter->note->tags; 7427 7428 if (++iter->index < tags->count) { 7429 uint32_t tag_data_size = iter->tag->count * sizeof(iter->tag->strs[0]); 7430 iter->tag = (struct ndb_tag *)(iter->tag->strs[0].bytes + tag_data_size); 7431 return 1; 7432 } 7433 7434 return 0; 7435 } 7436 7437 uint16_t ndb_tags_count(struct ndb_tags *tags) 7438 { 7439 return tags->count; 7440 } 7441 7442 uint16_t ndb_tag_count(struct ndb_tag *tags) 7443 { 7444 return tags->count; 7445 } 7446 7447 enum ndb_common_kind ndb_kind_to_common_kind(int kind) 7448 { 7449 switch (kind) 7450 { 7451 case 0: return NDB_CKIND_PROFILE; 7452 case 1: return NDB_CKIND_TEXT; 7453 case 3: return NDB_CKIND_CONTACTS; 7454 case 4: return NDB_CKIND_DM; 7455 case 5: return NDB_CKIND_DELETE; 7456 case 6: return NDB_CKIND_REPOST; 7457 case 7: return NDB_CKIND_REACTION; 7458 case 9735: return NDB_CKIND_ZAP; 7459 case 9734: return NDB_CKIND_ZAP_REQUEST; 7460 case 23194: return NDB_CKIND_NWC_REQUEST; 7461 case 23195: return NDB_CKIND_NWC_RESPONSE; 7462 case 27235: return NDB_CKIND_HTTP_AUTH; 7463 case 30000: return NDB_CKIND_LIST; 7464 case 30023: return NDB_CKIND_LONGFORM; 7465 case 30315: return NDB_CKIND_STATUS; 7466 } 7467 7468 return -1; 7469 } 7470 7471 const char *ndb_kind_name(enum ndb_common_kind ck) 7472 { 7473 switch (ck) { 7474 case NDB_CKIND_PROFILE: return "profile"; 7475 case NDB_CKIND_TEXT: return "text"; 7476 case NDB_CKIND_CONTACTS: return "contacts"; 7477 case NDB_CKIND_DM: return "dm"; 7478 case NDB_CKIND_DELETE: return "delete"; 7479 case NDB_CKIND_REPOST: return "repost"; 7480 case NDB_CKIND_REACTION: return "reaction"; 7481 case NDB_CKIND_ZAP: return "zap"; 7482 case NDB_CKIND_ZAP_REQUEST: return "zap_request"; 7483 case NDB_CKIND_NWC_REQUEST: return "nwc_request"; 7484 case NDB_CKIND_NWC_RESPONSE: return "nwc_response"; 7485 case NDB_CKIND_HTTP_AUTH: return "http_auth"; 7486 case NDB_CKIND_LIST: return "list"; 7487 case NDB_CKIND_LONGFORM: return "longform"; 7488 case NDB_CKIND_STATUS: return "status"; 7489 case NDB_CKIND_COUNT: return "unknown"; 7490 } 7491 7492 return "unknown"; 7493 } 7494 7495 const char *ndb_db_name(enum ndb_dbs db) 7496 { 7497 switch (db) { 7498 case NDB_DB_NOTE: 7499 return "note"; 7500 case NDB_DB_META: 7501 return "note_metadata"; 7502 case NDB_DB_PROFILE: 7503 return "profile"; 7504 case NDB_DB_NOTE_ID: 7505 return "note_index"; 7506 case NDB_DB_PROFILE_PK: 7507 return "profile_pubkey_index"; 7508 case NDB_DB_NDB_META: 7509 return "nostrdb_metadata"; 7510 case NDB_DB_PROFILE_SEARCH: 7511 return "profile_search"; 7512 case NDB_DB_PROFILE_LAST_FETCH: 7513 return "profile_last_fetch"; 7514 case NDB_DB_NOTE_KIND: 7515 return "note_kind_index"; 7516 case NDB_DB_NOTE_TEXT: 7517 return "note_fulltext"; 7518 case NDB_DB_NOTE_BLOCKS: 7519 return "note_blocks"; 7520 case NDB_DB_NOTE_TAGS: 7521 return "note_tags"; 7522 case NDB_DB_NOTE_PUBKEY: 7523 return "note_pubkey_index"; 7524 case NDB_DB_NOTE_PUBKEY_KIND: 7525 return "note_pubkey_kind_index"; 7526 case NDB_DB_NOTE_RELAY_KIND: 7527 return "note_relay_kind_index"; 7528 case NDB_DB_NOTE_RELAYS: 7529 return "note_relays"; 7530 case NDB_DBS: 7531 return "count"; 7532 } 7533 7534 return "unknown"; 7535 } 7536 7537 static struct ndb_blocks *ndb_note_to_blocks(struct ndb_note *note) 7538 { 7539 const char *content; 7540 size_t content_len; 7541 struct ndb_blocks *blocks; 7542 7543 content = ndb_note_content(note); 7544 content_len = ndb_note_content_length(note); 7545 7546 // something weird is going on 7547 if (content_len >= INT32_MAX) 7548 return NULL; 7549 7550 unsigned char *buffer = malloc(content_len); 7551 if (!buffer) 7552 return NULL; 7553 7554 if (!ndb_parse_content(buffer, content_len, content, content_len, &blocks)) { 7555 free(buffer); 7556 return NULL; 7557 } 7558 7559 //blocks = realloc(blocks, ndb_blocks_total_size(blocks)); 7560 //if (blocks == NULL) 7561 //return NULL; 7562 7563 blocks->flags |= NDB_BLOCK_FLAG_OWNED; 7564 7565 return blocks; 7566 } 7567 7568 struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, uint64_t note_key) 7569 { 7570 struct ndb_blocks *blocks, *blocks_to_writer; 7571 size_t blocks_size; 7572 struct ndb_note *note; 7573 size_t note_len; 7574 7575 if ((blocks = ndb_lookup_by_key(txn, note_key, NDB_DB_NOTE_BLOCKS, ¬e_len))) { 7576 return blocks; 7577 } 7578 7579 // If we don't have note blocks, let's lazily generate them. This is 7580 // migration-friendly instead of doing them all at once 7581 if (!(note = ndb_get_note_by_key(txn, note_key, ¬e_len))) { 7582 // no note found, can't return note blocks 7583 return NULL; 7584 } 7585 7586 if (!(blocks = ndb_note_to_blocks(note))) 7587 return NULL; 7588 7589 // send a copy to the writer 7590 blocks_size = ndb_blocks_total_size(blocks); 7591 blocks_to_writer = malloc(blocks_size); 7592 memcpy(blocks_to_writer, blocks, blocks_size); 7593 assert(blocks->flags & NDB_BLOCK_FLAG_OWNED); 7594 7595 // we generated new blocks, let's store them in the DB 7596 struct ndb_writer_blocks write_blocks = { 7597 .blocks = blocks_to_writer, 7598 .note_key = note_key 7599 }; 7600 7601 assert(write_blocks.blocks != blocks); 7602 7603 struct ndb_writer_msg msg = { .type = NDB_WRITER_BLOCKS }; 7604 msg.blocks = write_blocks; 7605 7606 ndb_writer_queue_msg(&ndb->writer, &msg); 7607 7608 return blocks; 7609 } 7610 7611 // please call ndb_monitor_lock before calling this 7612 static struct ndb_subscription * 7613 ndb_monitor_find_subscription(struct ndb_monitor *monitor, uint64_t subid, int *index) 7614 { 7615 struct ndb_subscription *sub, *tsub; 7616 int i; 7617 7618 for (i = 0, sub = NULL; i < monitor->num_subscriptions; i++) { 7619 tsub = &monitor->subscriptions[i]; 7620 if (tsub->subid == subid) { 7621 sub = tsub; 7622 if (index) 7623 *index = i; 7624 break; 7625 } 7626 } 7627 7628 return sub; 7629 } 7630 7631 int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, 7632 int note_id_capacity) 7633 { 7634 struct ndb_subscription *sub; 7635 int res; 7636 7637 if (subid == 0) 7638 return 0; 7639 7640 ndb_monitor_lock(&ndb->monitor); 7641 7642 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) 7643 res = 0; 7644 else 7645 res = prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity); 7646 7647 ndb_monitor_unlock(&ndb->monitor); 7648 7649 return res; 7650 } 7651 7652 int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, 7653 int note_id_capacity) 7654 { 7655 struct ndb_subscription *sub; 7656 struct prot_queue *queue_inbox; 7657 7658 // this is not a valid subscription id 7659 if (subid == 0) 7660 return 0; 7661 7662 ndb_monitor_lock(&ndb->monitor); 7663 7664 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) { 7665 ndb_monitor_unlock(&ndb->monitor); 7666 return 0; 7667 } 7668 7669 queue_inbox = &sub->inbox; 7670 7671 ndb_monitor_unlock(&ndb->monitor); 7672 7673 // there is technically a race condition if the thread yeilds at this 7674 // comment and a subscription is added/removed. A deadlock in the 7675 // writer queue would be much worse though. This function is dubious 7676 // anyways. 7677 7678 return prot_queue_pop_all(queue_inbox, note_ids, note_id_capacity); 7679 } 7680 7681 int ndb_unsubscribe(struct ndb *ndb, uint64_t subid) 7682 { 7683 struct ndb_subscription *sub; 7684 int index, res, elems_to_move; 7685 7686 ndb_monitor_lock(&ndb->monitor); 7687 7688 if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, &index))) { 7689 res = 0; 7690 goto done; 7691 } 7692 7693 ndb_subscription_destroy(sub); 7694 7695 elems_to_move = (--ndb->monitor.num_subscriptions) - index; 7696 7697 memmove(&ndb->monitor.subscriptions[index], 7698 &ndb->monitor.subscriptions[index+1], 7699 elems_to_move * sizeof(*sub)); 7700 7701 res = 1; 7702 7703 done: 7704 ndb_monitor_unlock(&ndb->monitor); 7705 7706 return res; 7707 } 7708 7709 int ndb_num_subscriptions(struct ndb *ndb) 7710 { 7711 return ndb->monitor.num_subscriptions; 7712 } 7713 7714 uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filters) 7715 { 7716 static uint64_t subids = 0; 7717 struct ndb_subscription *sub; 7718 size_t buflen; 7719 uint64_t subid; 7720 char *buf; 7721 7722 ndb_monitor_lock(&ndb->monitor); 7723 7724 if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) { 7725 fprintf(stderr, "too many subscriptions\n"); 7726 subid = 0; 7727 goto done; 7728 } 7729 7730 sub = &ndb->monitor.subscriptions[ndb->monitor.num_subscriptions]; 7731 subid = ++subids; 7732 sub->subid = subid; 7733 7734 ndb_filter_group_init(&sub->group); 7735 if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) { 7736 subid = 0; 7737 goto done; 7738 } 7739 7740 // 500k ought to be enough for anyone 7741 buflen = sizeof(uint64_t) * DEFAULT_QUEUE_SIZE; 7742 buf = malloc(buflen); 7743 7744 if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) { 7745 fprintf(stderr, "failed to push prot queue\n"); 7746 subid = 0; 7747 goto done; 7748 } 7749 7750 ndb->monitor.num_subscriptions++; 7751 done: 7752 ndb_monitor_unlock(&ndb->monitor); 7753 7754 return subid; 7755 }