nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

commit e03588e29e0d6a8ab5371047c8e8bd8e99e7e1d1
parent 7365c20e5277431632d43f730ae48e79750373c4
Author: William Casarin <jb55@jb55.com>
Date:   Thu, 28 Nov 2024 10:55:35 -0800

Merge Windows support

Diffstat:
Mccan/ccan/tal/str/str.c | 53++---------------------------------------------------
Mccan/ccan/tal/tal.c | 3++-
Msrc/content_parser.c | 17+++++++++++++++++
Msrc/nostr_bech32.c | 6++++--
Msrc/nostrdb.c | 50+++++++++++++++++++++++++++++++++++---------------
Msrc/nostrdb.h | 4++++
Msrc/print_util.h | 4++--
Msrc/protected_queue.h | 5++++-
Asrc/thread.h | 63+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/threadpool.h | 6+++---
Msrc/util.h | 8--------
Asrc/win.h | 9+++++++++
12 files changed, 145 insertions(+), 83 deletions(-)

diff --git a/ccan/ccan/tal/str/str.c b/ccan/ccan/tal/str/str.c @@ -1,14 +1,13 @@ /* Licensed under BSD-MIT - see LICENSE file for details */ -#include <unistd.h> +//#include <unistd.h> #include <stdint.h> #include <string.h> #include <limits.h> #include <stdlib.h> #include "str.h" #include <sys/types.h> -#include <regex.h> +//#include <regex.h> #include <stdarg.h> -#include <unistd.h> #include <stdio.h> #include <ccan/str/str.h> @@ -236,51 +235,3 @@ static size_t count_open_braces(const char *string) #endif } -bool tal_strreg_(const tal_t *ctx, const char *string, const char *label, - const char *regex, ...) -{ - size_t nmatch = 1 + count_open_braces(regex); - regmatch_t matches[nmatch]; - regex_t r; - bool ret = false; - unsigned int i; - va_list ap; - - if (regcomp(&r, regex, REG_EXTENDED) != 0) - goto fail_no_re; - - if (regexec(&r, string, nmatch, matches, 0) != 0) - goto fail; - - ret = true; - va_start(ap, regex); - for (i = 1; i < nmatch; i++) { - char **arg = va_arg(ap, char **); - if (arg) { - /* eg. ([a-z])? can give "no match". */ - if (matches[i].rm_so == -1) - *arg = NULL; - else { - *arg = tal_strndup_(ctx, - string + matches[i].rm_so, - matches[i].rm_eo - - matches[i].rm_so, - label); - /* FIXME: If we fail, we set some and leak! */ - if (!*arg) { - ret = false; - break; - } - } - } - } - va_end(ap); -fail: - regfree(&r); -fail_no_re: - if (taken(regex)) - tal_free(regex); - if (taken(string)) - tal_free(string); - return ret; -} diff --git a/ccan/ccan/tal/tal.c b/ccan/ccan/tal/tal.c @@ -952,9 +952,10 @@ static bool check_err(struct tal_hdr *t, const char *errorstr, { if (errorstr) { /* Try not to malloc: it may be corrupted. */ - char msg[strlen(errorstr) + 20 + strlen(errmsg) + 1]; + char *msg = malloc(strlen(errorstr) + 20 + strlen(errmsg) + 1); sprintf(msg, "%s:%p %s", errorstr, from_tal_hdr(t), errmsg); call_error(msg); + free(msg); } return false; } diff --git a/src/content_parser.c b/src/content_parser.c @@ -3,8 +3,13 @@ #include "block.h" #include "nostrdb.h" #include "invoice.h" + +#ifndef _WIN32 #include "bolt11/bolt11.h" +#endif + #include "bolt11/bech32.h" + #include <stdlib.h> #include <string.h> @@ -164,6 +169,11 @@ fail: static int push_invoice_str(struct ndb_content_parser *p, struct ndb_str_block *str) { +#ifdef _WIN32 + // we shouldn't be pushing invoices on windows until we fix + // bolt11 parser portability + return 0; +#else unsigned char *start; struct bolt11 *bolt11; char *fail; @@ -186,6 +196,7 @@ static int push_invoice_str(struct ndb_content_parser *p, struct ndb_str_block * tal_free(bolt11); return 1; +#endif } int push_block(struct ndb_content_parser *p, struct ndb_block *block); @@ -455,6 +466,11 @@ static int parse_url(struct cursor *cur, struct ndb_block *block) { static int parse_invoice(struct cursor *cur, struct ndb_block *block) { unsigned char *start, *end; +#ifdef _WIN32 + // bolt11 stuff requires non-portable cc stuff, so ignore for now + return 0; +#else + // optional parse_str(cur, "lightning:"); @@ -478,6 +494,7 @@ static int parse_invoice(struct cursor *cur, struct ndb_block *block) { cur->p = end; return 1; +#endif } diff --git a/src/nostr_bech32.c b/src/nostr_bech32.c @@ -286,7 +286,7 @@ int parse_nostr_bech32(unsigned char *buf, int buflen, unsigned char *start; size_t parsed_len, u5_out_len, u8_out_len; enum nostr_bech32_type type; - static const int MAX_PREFIX = 8; +#define MAX_PREFIX 8 struct cursor cur, bech32, u8; make_cursor(buf, buf + buflen, &cur); @@ -302,7 +302,7 @@ int parse_nostr_bech32(unsigned char *buf, int buflen, if (parsed_len < 10 || parsed_len > 10000) return 0; - unsigned char u5[parsed_len]; + unsigned char *u5 = malloc(parsed_len); char prefix[MAX_PREFIX]; if (bech32_decode_len(prefix, u5, &u5_out_len, (const char*)start, @@ -314,6 +314,8 @@ int parse_nostr_bech32(unsigned char *buf, int buflen, if (!bech32_convert_bits(cur.p, &u8_out_len, 8, u5, u5_out_len, 5, 0)) return 0; + free(u5); + make_cursor(cur.p, cur.p + u8_out_len, &u8); return parse_nostr_bech32_buffer(&u8, type, obj); diff --git a/src/nostrdb.c b/src/nostrdb.c @@ -12,6 +12,7 @@ #include "cpu.h" #include "block.h" #include "threadpool.h" +#include "thread.h" #include "protected_queue.h" #include "memchr.h" #include "print_util.h" @@ -32,7 +33,7 @@ #define min(a,b) ((a) < (b) ? (a) : (b)) // the maximum number of things threads pop and push in bulk -static const int THREAD_QUEUE_BATCH = 4096; +#define THREAD_QUEUE_BATCH 4096 // maximum number of active subscriptions #define MAX_SUBSCRIPTIONS 256 @@ -84,7 +85,6 @@ struct ndb_tag { struct ndb_tags { uint16_t padding; uint16_t count; - struct ndb_tag tag[0]; }; // v1 @@ -364,8 +364,8 @@ static int ndb_tag_key_compare(const MDB_val *a, const MDB_val *b) if ((cmp = mdb_cmp_memn(&va, &vb))) return cmp; - ts_a = *(uint64_t*)(va.mv_data + va.mv_size); - ts_b = *(uint64_t*)(vb.mv_data + vb.mv_size); + ts_a = *(uint64_t*)((unsigned char *)va.mv_data + va.mv_size); + ts_b = *(uint64_t*)((unsigned char *)vb.mv_data + vb.mv_size); if (ts_a < ts_b) return -1; @@ -381,8 +381,8 @@ static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b) uint64_t sa, sb, nid_a, nid_b; MDB_val a2, b2; - make_cursor(a->mv_data, a->mv_data + a->mv_size, &ca); - make_cursor(b->mv_data, b->mv_data + b->mv_size, &cb); + make_cursor(a->mv_data, (unsigned char *)a->mv_data + a->mv_size, &ca); + make_cursor(b->mv_data, (unsigned char *)b->mv_data + b->mv_size, &cb); // note_id if (unlikely(!cursor_pull_varint(&ca, &nid_a) || !cursor_pull_varint(&cb, &nid_b))) @@ -3035,7 +3035,7 @@ static int ndb_query_plan_execute_tags(struct ndb_txn *txn, if (taglen != k.mv_size - 9) break; - if (memcmp(k.mv_data+1, tag, k.mv_size-9)) + if (memcmp((unsigned char *)k.mv_data+1, tag, k.mv_size-9)) break; note_id = *(uint64_t*)v.mv_data; @@ -3538,7 +3538,7 @@ static int ndb_text_search_next_word(MDB_cursor *cursor, MDB_cursor_op op, int retries; retries = -1; - make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor); + make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor); // When op is MDB_SET_RANGE, this initializes the search. Position // the cursor at the next key greater than or equal to the specified @@ -3567,7 +3567,7 @@ retry: printf("\n"); */ - make_cursor(k->mv_data, k->mv_data + k->mv_size, &key_cursor); + make_cursor(k->mv_data, (unsigned char *)k->mv_data + k->mv_size, &key_cursor); if (unlikely(!ndb_unpack_text_search_key_noteid(&key_cursor, &result->key.note_id))) { fprintf(stderr, "UNUSUAL: failed to unpack text search key note_id\n"); @@ -3980,6 +3980,7 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor, static void *ndb_writer_thread(void *data) { + ndb_debug("started writer thread\n"); struct ndb_writer *writer = data; struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; struct written_note written_notes[THREAD_QUEUE_BATCH]; @@ -3999,6 +4000,7 @@ static void *ndb_writer_thread(void *data) while (!done) { txn.mdb_txn = NULL; num_notes = 0; + ndb_debug("writer waiting for items\n"); popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); ndb_debug("writer popped %d items\n", popped); @@ -4029,6 +4031,7 @@ static void *ndb_writer_thread(void *data) switch (msg->type) { case NDB_WRITER_QUIT: // quits are handled before this + ndb_debug("writer thread got quit message\n"); done = 1; continue; case NDB_WRITER_PROFILE: @@ -4098,7 +4101,7 @@ static void *ndb_writer_thread(void *data) free(msg->note.note); } else if (msg->type == NDB_WRITER_PROFILE) { free(msg->profile.note.note); - ndb_profile_record_builder_free(&msg->profile.record); + //ndb_profile_record_builder_free(&msg->profile.record); } else if (msg->type == NDB_WRITER_BLOCKS) { ndb_blocks_free(msg->blocks.blocks); } @@ -4199,7 +4202,7 @@ static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb, writer->queue_buflen, sizeof(struct ndb_writer_msg)); // spin up the writer thread - if (pthread_create(&writer->thread_id, NULL, ndb_writer_thread, writer)) + if (THREAD_CREATE(writer->thread_id, ndb_writer_thread, writer)) { fprintf(stderr, "ndb writer thread failed to create\n"); return 0; @@ -4242,14 +4245,18 @@ static int ndb_writer_destroy(struct ndb_writer *writer) // kill thread msg.type = NDB_WRITER_QUIT; + ndb_debug("writer: pushing quit message\n"); if (!prot_queue_push(&writer->inbox, &msg)) { // queue is too full to push quit message. just kill it. - pthread_exit(&writer->thread_id); + ndb_debug("writer: terminating thread\n"); + THREAD_TERMINATE(writer->thread_id); } else { - pthread_join(writer->thread_id, NULL); + ndb_debug("writer: joining thread\n"); + THREAD_FINISH(writer->thread_id); } // cleanup + ndb_debug("writer: cleaning up protected queue\n"); prot_queue_destroy(&writer->inbox); free(writer->queue_buf); @@ -4515,12 +4522,17 @@ void ndb_destroy(struct ndb *ndb) return; // ingester depends on writer and must be destroyed first + ndb_debug("destroying ingester\n"); ndb_ingester_destroy(&ndb->ingester); + ndb_debug("destroying writer\n"); ndb_writer_destroy(&ndb->writer); + ndb_debug("destroying monitor\n"); ndb_monitor_destroy(&ndb->monitor); + ndb_debug("closing env\n"); mdb_env_close(ndb->lmdb.env); + ndb_debug("ndb destroyed\n"); free(ndb); } @@ -4588,6 +4600,8 @@ int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, in return 1; } +#ifndef _WIN32 +// TODO: windows int ndb_process_events_stream(struct ndb *ndb, FILE* fp) { char *line = NULL; @@ -4605,6 +4619,7 @@ int ndb_process_events_stream(struct ndb *ndb, FILE* fp) return 1; } +#endif int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len) { @@ -6126,7 +6141,7 @@ int ndb_stat(struct ndb *ndb, struct ndb_stat *stat) /// Push an element to the current tag /// /// Basic idea is to call ndb_builder_new_tag -inline int ndb_builder_push_tag_str(struct ndb_builder *builder, +int ndb_builder_push_tag_str(struct ndb_builder *builder, const char *str, int len) { union ndb_packed_str pstr; @@ -6338,12 +6353,17 @@ void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter) iter->index = -1; } +// Helper function to get a pointer to the nth tag +static struct ndb_tag *ndb_tags_tag(struct ndb_tags *tags, size_t index) { + return (struct ndb_tag *)((uint8_t *)tags + sizeof(struct ndb_tags) + index * sizeof(struct ndb_tag)); +} + int ndb_tags_iterate_next(struct ndb_iterator *iter) { struct ndb_tags *tags; if (iter->tag == NULL || iter->index == -1) { - iter->tag = iter->note->tags.tag; + iter->tag = ndb_tags_tag(&iter->note->tags, 0); iter->index = 0; return iter->note->tags.count != 0; } diff --git a/src/nostrdb.h b/src/nostrdb.h @@ -2,6 +2,7 @@ #define NOSTRDB_H #include <inttypes.h> +#include "win.h" #include "cursor.h" // maximum number of filters allowed in a filter group @@ -455,7 +456,10 @@ int ndb_init(struct ndb **ndb, const char *dbdir, const struct ndb_config *); int ndb_db_version(struct ndb *ndb); int ndb_process_event(struct ndb *, const char *json, int len); int ndb_process_events(struct ndb *, const char *ldjson, size_t len); +#ifndef _WIN32 +// TODO: fix on windows int ndb_process_events_stream(struct ndb *, FILE* fp); +#endif int ndb_process_client_event(struct ndb *, const char *json, int len); int ndb_process_client_events(struct ndb *, const char *json, size_t len); int ndb_begin_query(struct ndb *, struct ndb_txn *); diff --git a/src/print_util.h b/src/print_util.h @@ -22,12 +22,12 @@ static void print_tag_kv(struct ndb_txn *txn, MDB_val *k, MDB_val *v) struct ndb_note *note; uint64_t ts; - ts = *(uint64_t*)(k->mv_data+(k->mv_size-8)); + ts = *(uint64_t*)((uint8_t*)k->mv_data+(k->mv_size-8)); // TODO: p tags, etc if (((const char*)k->mv_data)[0] == 'e' && k->mv_size == (1 + 32 + 8)) { printf("note_tags 'e"); - print_hex(k->mv_data+1, 32); + print_hex((uint8_t*)k->mv_data+1, 32); printf("' %" PRIu64, ts); } else { printf("note_tags '%.*s' %" PRIu64, (int)k->mv_size-8, diff --git a/src/protected_queue.h b/src/protected_queue.h @@ -12,12 +12,15 @@ #ifndef PROT_QUEUE_H #define PROT_QUEUE_H -#include <pthread.h> #include <stdbool.h> #include <stddef.h> #include <string.h> #include "cursor.h" #include "util.h" +#include "thread.h" + +#define max(a,b) ((a) > (b) ? (a) : (b)) +#define min(a,b) ((a) < (b) ? (a) : (b)) /* * The prot_queue structure represents a thread-safe queue that can hold diff --git a/src/thread.h b/src/thread.h @@ -0,0 +1,63 @@ +#ifndef NDB_THREAD_H +#define NDB_THREAD_H + +#ifdef _WIN32 + #include <windows.h> + + #define ErrCode() GetLastError() +// Define POSIX-like thread types +typedef HANDLE pthread_t; +typedef CRITICAL_SECTION pthread_mutex_t; +typedef CONDITION_VARIABLE pthread_cond_t; + +#define ErrCode() GetLastError() + +// Mutex functions +#define pthread_mutex_init(mutex, attr) \ + (InitializeCriticalSection(mutex), 0) + +#define pthread_mutex_destroy(mutex) \ + (DeleteCriticalSection(mutex), 0) + +#define pthread_mutex_lock(mutex) \ + (EnterCriticalSection(mutex), 0) + +#define pthread_mutex_unlock(mutex) \ + (LeaveCriticalSection(mutex), 0) + +// Condition variable functions +#define pthread_cond_init(cond, attr) \ + (InitializeConditionVariable(cond), 0) + +#define pthread_cond_destroy(cond) + +#define pthread_cond_signal(cond) \ + (WakeConditionVariable(cond), 0) + +#define pthread_cond_wait(cond, mutex) \ + (SleepConditionVariableCS(cond, mutex, INFINITE) ? 0 : ErrCode()) + +// Thread functions +#define THREAD_CREATE(thr, start, arg) \ + (((thr = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)start, arg, 0, NULL)) != NULL) ? 0 : ErrCode()) + +#define THREAD_FINISH(thr) \ + (WaitForSingleObject(thr, INFINITE), CloseHandle(thr), 0) + +#define THREAD_TERMINATE(thr) \ + (TerminateThread(thr, 0) ? ErrCode() : 0) + +#else // _WIN32 + #include <pthread.h> + + //#define ErrCode() errno + #define THREAD_CREATE(thr,start,arg) pthread_create(&thr,NULL,start,arg) + #define THREAD_FINISH(thr) pthread_join(thr,NULL) + #define THREAD_TERMINATE(thr) pthread_exit(&thr) + + #define LOCK_MUTEX(mutex) pthread_mutex_lock(mutex) + #define UNLOCK_MUTEX(mutex) pthread_mutex_unlock(mutex) + +#endif + +#endif // NDB_THREAD_H diff --git a/src/threadpool.h b/src/threadpool.h @@ -55,7 +55,7 @@ static int threadpool_init(struct threadpool *tp, int num_threads, return 0; } - if (pthread_create(&t->thread_id, NULL, thread_fn, t) != 0) { + if (THREAD_CREATE(t->thread_id, thread_fn, t) != 0) { fprintf(stderr, "threadpool_init: failed to create thread\n"); return 0; } @@ -90,9 +90,9 @@ static inline void threadpool_destroy(struct threadpool *tp) for (int i = 0; i < tp->num_threads; i++) { t = &tp->pool[i]; if (!prot_queue_push(&t->inbox, tp->quit_msg)) { - pthread_exit(&t->thread_id); + THREAD_TERMINATE(t->thread_id); } else { - pthread_join(t->thread_id, NULL); + THREAD_FINISH(t->thread_id); } prot_queue_destroy(&t->inbox); free(t->qmem); diff --git a/src/util.h b/src/util.h @@ -2,14 +2,6 @@ #ifndef NDB_UTIL_H #define NDB_UTIL_H -static inline int min(int a, int b) { - return a < b ? a : b; -} - -static inline int max(int a, int b) { - return a > b ? a : b; -} - static inline void* memdup(const void* src, size_t size) { void* dest = malloc(size); if (dest == NULL) { diff --git a/src/win.h b/src/win.h @@ -0,0 +1,9 @@ +#ifndef NOSTRDB_WIN_H +#define NOSTRDB_WIN_H +#ifdef _WIN32 + +typedef ptrdiff_t ssize_t; // ssize_t is typically a signed version of size_t + +#endif +#endif // NOSTRDB_WIN_H +