nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

commit e87228d19ad6965fd143bdf9a962f7d904fc4e33
parent 181d8d5a9b352a1f88a0cffdf7652c9a4b386e89
Author: William Casarin <jb55@jb55.com>
Date:   Wed,  9 Aug 2023 22:21:52 -0700

insane optimization: check id existence before parsing entire json

Diffstat:
Mhex.h | 4++--
Mjsmn.h | 24++++++++++++++++++++----
Mnostrdb.c | 126+++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------
Mnostrdb.h | 11++++++++++-
Mtest.c | 12++++++------
Mthreadpool.h | 2+-
6 files changed, 127 insertions(+), 52 deletions(-)

diff --git a/hex.h b/hex.h @@ -4,7 +4,7 @@ #include <stdlib.h> -static int char_to_hex(unsigned char *val, char c) +static inline int char_to_hex(unsigned char *val, char c) { if (c >= '0' && c <= '9') { *val = c - '0'; @@ -21,7 +21,7 @@ static int char_to_hex(unsigned char *val, char c) return 0; } -static int hex_decode(const char *str, size_t slen, void *buf, size_t bufsize) +static inline int hex_decode(const char *str, size_t slen, void *buf, size_t bufsize) { unsigned char v1, v2; unsigned char *p = buf; diff --git a/jsmn.h b/jsmn.h @@ -100,7 +100,7 @@ JSMN_API void jsmn_init(jsmn_parser *parser); * a single JSON object. */ JSMN_API int jsmn_parse(jsmn_parser *parser, const char *js, const size_t len, - jsmntok_t *tokens, const unsigned int num_tokens); + jsmntok_t *tokens, const unsigned int num_tokens, int stop_at_id); #ifndef JSMN_HEADER /** @@ -269,12 +269,13 @@ static int jsmn_parse_string(jsmn_parser *parser, const char *js, * Parse JSON string and fill tokens. */ JSMN_API int jsmn_parse(jsmn_parser *parser, const char *js, const size_t len, - jsmntok_t *tokens, const unsigned int num_tokens) { - int r; - int i; + jsmntok_t *tokens, const unsigned int num_tokens, int stop_at_id) { + int r, i, idkey; jsmntok_t *token; int count = parser->toknext; + idkey = 0; + for (; parser->pos < len && js[parser->pos] != '\0'; parser->pos++) { char c; jsmntype_t type; @@ -370,6 +371,21 @@ JSMN_API int jsmn_parse(jsmn_parser *parser, const char *js, const size_t len, if (parser->toksuper != -1 && tokens != NULL) { tokens[parser->toksuper].size++; } + + // big hack. resumable parsing when encountering the id field + if (stop_at_id) { + token = &tokens[parser->toknext-1]; + if (idkey == 1 && (token->end - token->start) == 64) { + //printf("jsmn: found id '%.*s'\n", token->end - token->start, js + token->start); + return -42; + } else if (idkey == 0 && (token->end - token->start) == 2 && + (js + token->start)[0] == 'i' && + (js + token->start)[1] == 'd') { + //printf("jsmn: found id key\n"); + idkey = 1; + } + } + break; case '\t': case '\r': diff --git a/nostrdb.c b/nostrdb.c @@ -23,6 +23,20 @@ static const int THREAD_QUEUE_BATCH = 1024; // the maximum size of inbox queues static const int DEFAULT_QUEUE_SIZE = 50000; + +// controls whether to continue or stop the json parser +enum ndb_idres { + NDB_IDRES_CONT, + NDB_IDRES_STOP, +}; + +// closure data for the id-detecting ingest controller +struct ndb_ingest_controller +{ + MDB_txn *read_txn; + struct ndb_lmdb *lmdb; +}; + enum ndb_dbs { NDB_DBI_ID, }; @@ -143,6 +157,25 @@ static int ndb_writer_queue_note(struct ndb_writer *writer, return prot_queue_push(&writer->inbox, &msg); } +static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid) +{ + unsigned char id[32]; + struct ndb_ingest_controller *c = data; + int rc; + + hex_decode(hexid, 64, id, sizeof(id)); + + // let's see if we already have it + MDB_val key, val; + key.mv_size = 32; + key.mv_data = id; + rc = mdb_get(c->read_txn, c->lmdb->dbis[NDB_DBI_ID], &key, &val); + + if (rc == MDB_NOTFOUND) + return NDB_IDRES_CONT; + + return NDB_IDRES_STOP; +} static int ndb_ingester_process_event(secp256k1_context *ctx, struct ndb_ingester *ingester, @@ -153,13 +186,17 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, { struct ndb_tce tce; struct ndb_note *note; - struct ndb_lmdb *lmdb; + struct ndb_ingest_controller controller; + struct ndb_id_cb cb; void *buf; size_t bufsize, note_size; - int rc; - // we will use this to check if we already have it in the DB - lmdb = ingester->writer->lmdb; + // we will use this to check if we already have it in the DB during + // ID parsing + controller.read_txn = read_txn; + controller.lmdb = ingester->writer->lmdb; + cb.fn = ndb_ingester_json_controller; + cb.data = &controller; // since we're going to be passing this allocated note to a different // thread, we can't use thread-local buffers. just allocate a block @@ -169,7 +206,12 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, return 0; note_size = - ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize); + ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb); + + if (note_size == -42) { + // we already have this! + goto cleanup; + } switch (tce.evtype) { case NDB_TCE_NOTICE: goto cleanup; @@ -182,35 +224,24 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, goto cleanup; } - // let's see if we already have it - MDB_val key, val; - key.mv_size = 32; - key.mv_data = note->id; - rc = mdb_get(read_txn, lmdb->dbis[NDB_DBI_ID], &key, &val); - - if (rc == MDB_NOTFOUND) { - // Verify! If it's an invalid note we don't need to - // bothter writing it to the database - if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) { - ndb_debug("signature verification failed\n"); - goto cleanup; - } - - // we didn't find anything. let's send it - // to the writer thread - note = realloc(note, note_size); + // Verify! If it's an invalid note we don't need to + // bothter writing it to the database + if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) { + ndb_debug("signature verification failed\n"); + goto cleanup; + } - out->type = NDB_WRITER_NOTE; - out->note.note = note; - out->note.note_len = note_size; + // we didn't find anything. let's send it + // to the writer thread + note = realloc(note, note_size); - // there's nothing left to do with the original json, so free it - free(ev->json); - return 1; - } + out->type = NDB_WRITER_NOTE; + out->note.note = note; + out->note.note_len = note_size; - // we already have it or there's an error, either way - // cleanup and return 0 + // there's nothing left to do with the original json, so free it + free(ev->json); + return 1; } cleanup: @@ -409,7 +440,7 @@ static int ndb_writer_destroy(struct ndb_writer *writer) msg.type = NDB_WRITER_QUIT; if (!prot_queue_push(&writer->inbox, &msg)) { // queue is too full to push quit message. just kill it. - pthread_exit(writer->thread_id); + pthread_exit(&writer->thread_id); } else { pthread_join(writer->thread_id, NULL); } @@ -641,11 +672,29 @@ static inline int ndb_json_parser_init(struct ndb_json_parser *p, return 1; } -static inline int ndb_json_parser_parse(struct ndb_json_parser *p) +static inline int ndb_json_parser_parse(struct ndb_json_parser *p, + struct ndb_id_cb *cb) { + jsmntok_t *tok; int cap = ((unsigned char *)p->toks_end - (unsigned char*)p->toks)/sizeof(*p->toks); - p->num_tokens = - jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap); + int res = + jsmn_parse(&p->json_parser, p->json, p->json_len, p->toks, cap, cb != NULL); + + // got an ID! + if (res == -42) { + tok = &p->toks[p->json_parser.toknext-1]; + + switch (cb->fn(cb->data, p->json + tok->start)) { + case NDB_IDRES_CONT: + res = jsmn_parse(&p->json_parser, p->json, p->json_len, + p->toks, cap, 0); + break; + case NDB_IDRES_STOP: + return -42; + } + } else { + p->num_tokens = res; + } p->i = 0; @@ -1197,7 +1246,8 @@ static int parse_unsigned_int(const char *start, int len, unsigned int *num) } int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, - unsigned char *buf, int bufsize) + unsigned char *buf, int bufsize, + struct ndb_id_cb *cb) { jsmntok_t *tok = NULL; int tok_len, res; @@ -1207,7 +1257,7 @@ int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, tce->subid = ""; ndb_json_parser_init(&parser, json, len, buf, bufsize); - if ((res = ndb_json_parser_parse(&parser)) < 0) + if ((res = ndb_json_parser_parse(&parser, cb)) < 0) return res; if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY) @@ -1364,7 +1414,7 @@ int ndb_note_from_json(const char *json, int len, struct ndb_note **note, int res; ndb_json_parser_init(&parser, json, len, buf, bufsize); - if ((res = ndb_json_parser_parse(&parser)) < 0) + if ((res = ndb_json_parser_parse(&parser, NULL)) < 0) return res; if (parser.num_tokens < 1) diff --git a/nostrdb.h b/nostrdb.h @@ -26,6 +26,15 @@ enum tce_type { NDB_TCE_EOSE = 0x4, }; +// function pointer for controlling what to do after we parse an id +typedef enum ndb_idres (*ndb_id_fn)(void *, const char *); + +// id callback + closure data +struct ndb_id_cb { + ndb_id_fn fn; + void *data; +}; + struct ndb_str { unsigned char flag; union { @@ -144,7 +153,7 @@ void ndb_destroy(struct ndb *); // BUILDER int ndb_parse_json_note(struct ndb_json_parser *, struct ndb_note **); -int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, unsigned char *buf, int bufsize); +int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, unsigned char *buf, int bufsize, struct ndb_id_cb *); int ndb_note_from_json(const char *json, int len, struct ndb_note **, unsigned char *buf, int buflen); int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf, int bufsize); int ndb_builder_finalize(struct ndb_builder *builder, struct ndb_note **note, struct ndb_keypair *privkey); diff --git a/test.c b/test.c @@ -230,7 +230,7 @@ static void test_parse_contact_event() assert(read_file("testdata/contacts-event.json", (unsigned char*)json, alloc_size, &written)); - assert(ndb_ws_event_from_json(json, written, &tce, buf, alloc_size)); + assert(ndb_ws_event_from_json(json, written, &tce, buf, alloc_size, NULL)); assert(tce.evtype == NDB_TCE_EVENT); @@ -248,7 +248,7 @@ static void test_content_len() assert(read_file("testdata/failed_size.json", (unsigned char*)json, alloc_size, &written)); - assert(ndb_ws_event_from_json(json, written, &tce, buf, alloc_size)); + assert(ndb_ws_event_from_json(json, written, &tce, buf, alloc_size, NULL)); assert(tce.evtype == NDB_TCE_EVENT); assert(ndb_note_content_length(tce.event.note) == 0); @@ -315,7 +315,7 @@ static void test_tce_eose() { struct ndb_tce tce; int ok; - ok = ndb_ws_event_from_json(json, sizeof(json), &tce, buf, sizeof(buf)); + ok = ndb_ws_event_from_json(json, sizeof(json), &tce, buf, sizeof(buf), NULL); assert(ok); assert(tce.evtype == NDB_TCE_EOSE); @@ -329,7 +329,7 @@ static void test_tce_command_result() { struct ndb_tce tce; int ok; - ok = ndb_ws_event_from_json(json, sizeof(json), &tce, buf, sizeof(buf)); + ok = ndb_ws_event_from_json(json, sizeof(json), &tce, buf, sizeof(buf), NULL); assert(ok); assert(tce.evtype == NDB_TCE_OK); @@ -344,7 +344,7 @@ static void test_tce_command_result_empty_msg() { struct ndb_tce tce; int ok; - ok = ndb_ws_event_from_json(json, sizeof(json), &tce, buf, sizeof(buf)); + ok = ndb_ws_event_from_json(json, sizeof(json), &tce, buf, sizeof(buf), NULL); assert(ok); assert(tce.evtype == NDB_TCE_OK); @@ -365,7 +365,7 @@ static void test_tce() { struct ndb_tce tce; int ok; - ok = ndb_ws_event_from_json(json, sizeof(json), &tce, buf, sizeof(buf)); + ok = ndb_ws_event_from_json(json, sizeof(json), &tce, buf, sizeof(buf), NULL); assert(ok); assert(tce.evtype == NDB_TCE_EVENT); diff --git a/threadpool.h b/threadpool.h @@ -90,7 +90,7 @@ static inline void threadpool_destroy(struct threadpool *tp) for (uint64_t i = 0; i < tp->num_threads; i++) { t = &tp->pool[i]; if (!prot_queue_push(&t->inbox, tp->quit_msg)) { - pthread_exit(t->thread_id); + pthread_exit(&t->thread_id); } else { pthread_join(t->thread_id, NULL); }