nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

commit 66a4fe3f24b212c5e8c7c240149f48916c910b22
parent 8faf6ce7b740f405b2cc94dfe512533de97c0b56
Author: William Casarin <jb55@jb55.com>
Date:   Thu,  4 Apr 2024 21:22:35 +0100

add ability to register a subscription callback

Since Damus iOS is not an immediate-mode UI like android, we would
rather not poll for results. Instead we need a way to register a
callback function that is called when we get new subscription results.

This is also useful on the android side, allowing us to request a new
frame to draw when we have new results, instead of drawing every second.

Signed-off-by: William Casarin <jb55@jb55.com>

Diffstat:
Msrc/nostrdb.c | 31++++++++++++++++++++++++++++---
Msrc/nostrdb.h | 6++++++
2 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/src/nostrdb.c b/src/nostrdb.c @@ -191,6 +191,8 @@ struct ndb_subscription { struct ndb_monitor { struct ndb_subscription subscriptions[MAX_SUBSCRIPTIONS]; + ndb_sub_fn sub_cb; + void *sub_cb_ctx; int num_subscriptions; }; @@ -3738,6 +3740,7 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor, struct written_note *wrote, int num_notes) { int i, k; + int pushed; struct written_note *written; struct ndb_note *note; struct ndb_subscription *sub; @@ -3746,20 +3749,31 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor, sub = &monitor->subscriptions[i]; ndb_debug("checking subscription %d, %d notes\n", i, num_notes); + pushed = 0; for (k = 0; k < num_notes; k++) { written = &wrote[k]; note = written->note->note; if (ndb_filter_group_matches(&sub->group, note)) { ndb_debug("pushing note\n"); + if (!prot_queue_push(&sub->inbox, &written->note_id)) { ndb_debug("couldn't push note to subscriber"); + } else { + pushed++; } } else { ndb_debug("not pushing note\n"); } } + // After pushing all of the matching notes, check to see if we + // have a registered subscription callback. If so, we call it. + // The callback needs to call ndb_poll_for_notes to pull data + // that was just pushed to the queue in the for loop above. + if (monitor->sub_cb != NULL && pushed > 0) { + monitor->sub_cb(monitor->sub_cb_ctx, sub->subid); + } } } @@ -4234,9 +4248,12 @@ static int ndb_run_migrations(struct ndb *ndb) return 1; } -static void ndb_monitor_init(struct ndb_monitor *monitor) +static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb, + void *sub_cb_ctx) { - memset(monitor, 0, sizeof(*monitor)); + monitor->num_subscriptions = 0; + monitor->sub_cb = cb; + monitor->sub_cb_ctx = sub_cb_ctx; } void ndb_filter_group_destroy(struct ndb_filter_group *group) @@ -4281,7 +4298,7 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize)) return 0; - ndb_monitor_init(&ndb->monitor); + ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx); if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor)) { fprintf(stderr, "ndb_writer_init failed\n"); @@ -5496,6 +5513,14 @@ void ndb_default_config(struct ndb_config *config) config->flags = 0; config->ingest_filter = NULL; config->filter_context = NULL; + config->sub_cb_ctx = NULL; + config->sub_cb = NULL; +} + +void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *context) +{ + config->sub_cb_ctx = context; + config->sub_cb = fn; } void ndb_config_set_ingest_threads(struct ndb_config *config, int threads) diff --git a/src/nostrdb.h b/src/nostrdb.h @@ -64,6 +64,9 @@ struct ndb_keypair { // function pointer for controlling what to do after we parse an id typedef enum ndb_idres (*ndb_id_fn)(void *, const char *); +// callback function for when we receive new subscription results +typedef void (*ndb_sub_fn)(void *, uint64_t subid); + // id callback + closure data struct ndb_id_cb { ndb_id_fn fn; @@ -256,6 +259,8 @@ struct ndb_config { size_t mapsize; void *filter_context; ndb_ingest_filter_fn ingest_filter; + void *sub_cb_ctx; + ndb_sub_fn sub_cb; }; struct ndb_text_search_config { @@ -426,6 +431,7 @@ void ndb_config_set_ingest_threads(struct ndb_config *config, int threads); void ndb_config_set_flags(struct ndb_config *config, int flags); void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize); void ndb_config_set_ingest_filter(struct ndb_config *config, ndb_ingest_filter_fn fn, void *); +void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *ctx); // HELPERS int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen);