commit 67f3a5e4df3ecf9be0f1971d14123e7e0d17381f
parent d90666a206d14095d1b9d53642c2711a96f4cac4
Author: William Casarin <jb55@jb55.com>
Date: Mon, 9 Dec 2024 14:48:31 -0800
make the subscription monitor threadsafe
This was the only thing that wasn't threadsafe. Add a simple mutex
instead of a queue so that polling is quick.
This also means we can't really return the internal subscriptions
anymore, so we remove that for now until we have a safer
interface.
Fixes: https://github.com/damus-io/nostrdb/issues/55
Signed-off-by: William Casarin <jb55@jb55.com>
Diffstat:
2 files changed, 87 insertions(+), 48 deletions(-)
diff --git a/src/nostrdb.c b/src/nostrdb.c
@@ -170,9 +170,10 @@ struct ndb_writer {
};
struct ndb_ingester {
+ struct ndb_lmdb *lmdb;
uint32_t flags;
struct threadpool tp;
- struct ndb_writer *writer;
+ struct prot_queue *writer_inbox;
void *filter_context;
ndb_ingest_filter_fn filter;
};
@@ -193,6 +194,11 @@ struct ndb_monitor {
ndb_sub_fn sub_cb;
void *sub_cb_ctx;
int num_subscriptions;
+
+ // monitor isn't a full inbox. We want pollers to be able to poll
+ // subscriptions efficiently without going through a message queue, so
+ // we use a simple mutex here.
+ pthread_mutex_t mutex;
};
struct ndb {
@@ -1721,13 +1727,6 @@ int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32],
return 1;
}
-static inline int ndb_writer_queue_msgs(struct ndb_writer *writer,
- struct ndb_writer_msg *msgs,
- int num_msgs)
-{
- return prot_queue_push_all(&writer->inbox, msgs, num_msgs);
-}
-
static int ndb_writer_queue_note(struct ndb_writer *writer,
struct ndb_note *note, size_t note_len)
{
@@ -2200,7 +2199,7 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
// we will use this to check if we already have it in the DB during
// ID parsing
controller.read_txn = read_txn;
- controller.lmdb = ingester->writer->lmdb;
+ controller.lmdb = ingester->lmdb;
cb.fn = ndb_ingester_json_controller;
cb.data = &controller;
@@ -3928,6 +3927,14 @@ static void ndb_write_version(struct ndb_txn *txn, uint64_t version)
//fprintf(stderr, "writing version %" PRIu64 "\n", version);
}
+static void ndb_monitor_lock(struct ndb_monitor *mon) {
+ pthread_mutex_lock(&mon->mutex);
+}
+
+static void ndb_monitor_unlock(struct ndb_monitor *mon) {
+ pthread_mutex_unlock(&mon->mutex);
+}
+
struct written_note {
uint64_t note_id;
struct ndb_writer_note *note;
@@ -3945,6 +3952,8 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
struct ndb_note *note;
struct ndb_subscription *sub;
+ ndb_monitor_lock(monitor);
+
for (i = 0; i < monitor->num_subscriptions; i++) {
sub = &monitor->subscriptions[i];
ndb_debug("checking subscription %d, %d notes\n", i, num_notes);
@@ -3975,6 +3984,8 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
monitor->sub_cb(monitor->sub_cb_ctx, sub->subid);
}
}
+
+ ndb_monitor_unlock(monitor);
}
static void *ndb_writer_thread(void *data)
@@ -4117,7 +4128,7 @@ static void *ndb_ingester_thread(void *data)
secp256k1_context *ctx;
struct thread *thread = data;
struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx;
- struct ndb_lmdb *lmdb = ingester->writer->lmdb;
+ struct ndb_lmdb *lmdb = ingester->lmdb;
struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg;
struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out;
int i, to_write, popped, done, any_event;
@@ -4172,7 +4183,7 @@ static void *ndb_ingester_thread(void *data)
if (to_write > 0) {
ndb_debug("pushing %d events to write queue\n", to_write);
- if (!ndb_writer_queue_msgs(ingester->writer, outs, to_write)) {
+ if (!prot_queue_push_all(ingester->writer_inbox, outs, to_write)) {
ndb_debug("failed pushing %d events to write queue\n", to_write);
}
}
@@ -4212,7 +4223,8 @@ static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb,
// initialize the ingester queue and then spawn the thread
static int ndb_ingester_init(struct ndb_ingester *ingester,
- struct ndb_writer *writer,
+ struct ndb_lmdb *lmdb,
+ struct prot_queue *writer_inbox,
const struct ndb_config *config)
{
int elem_size, num_elems;
@@ -4222,7 +4234,8 @@ static int ndb_ingester_init(struct ndb_ingester *ingester,
elem_size = sizeof(struct ndb_ingester_msg);
num_elems = DEFAULT_QUEUE_SIZE;
- ingester->writer = writer;
+ ingester->writer_inbox = writer_inbox;
+ ingester->lmdb = lmdb;
ingester->flags = config->flags;
ingester->filter = config->ingest_filter;
ingester->filter_context = config->filter_context;
@@ -4448,6 +4461,7 @@ static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb,
monitor->num_subscriptions = 0;
monitor->sub_cb = cb;
monitor->sub_cb_ctx = sub_cb_ctx;
+ pthread_mutex_init(&monitor->mutex, NULL);
}
void ndb_filter_group_destroy(struct ndb_filter_group *group)
@@ -4499,7 +4513,7 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c
return 0;
}
- if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) {
+ if (!ndb_ingester_init(&ndb->ingester, &ndb->lmdb, &ndb->writer.inbox, config)) {
fprintf(stderr, "failed to initialize %d ingester thread(s)\n",
config->ingester_threads);
return 0;
@@ -6544,13 +6558,15 @@ struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, u
return blocks;
}
-struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid, int *index)
+// please call ndb_monitor_lock before calling this
+static struct ndb_subscription *
+ndb_monitor_find_subscription(struct ndb_monitor *monitor, uint64_t subid, int *index)
{
struct ndb_subscription *sub, *tsub;
int i;
- for (i = 0, sub = NULL; i < ndb->monitor.num_subscriptions; i++) {
- tsub = &ndb->monitor.subscriptions[i];
+ for (i = 0, sub = NULL; i < monitor->num_subscriptions; i++) {
+ tsub = &monitor->subscriptions[i];
if (tsub->subid == subid) {
sub = tsub;
if (index)
@@ -6566,38 +6582,63 @@ int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
int note_id_capacity)
{
struct ndb_subscription *sub;
+ int res;
if (subid == 0)
return 0;
- if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
- return 0;
+ ndb_monitor_lock(&ndb->monitor);
+
+ if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL)))
+ res = 0;
+ else
+ res = prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity);
+
+ ndb_monitor_unlock(&ndb->monitor);
- return prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity);
+ return res;
}
int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
- int note_id_capacity)
+ int note_id_capacity)
{
struct ndb_subscription *sub;
+ struct prot_queue *queue_inbox;
- // this is not a valid subscription id
+ // this is not a valid subscription id
if (subid == 0)
return 0;
- if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
+ ndb_monitor_lock(&ndb->monitor);
+
+ if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) {
+ ndb_monitor_unlock(&ndb->monitor);
return 0;
+ }
+
+ queue_inbox = &sub->inbox;
- return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity);
+ ndb_monitor_unlock(&ndb->monitor);
+
+ // there is technically a race condition if the thread yeilds at this
+ // comment and a subscription is added/removed. A deadlock in the
+ // writer queue would be much worse though. This function is dubious
+ // anyways.
+
+ return prot_queue_pop_all(queue_inbox, note_ids, note_id_capacity);
}
int ndb_unsubscribe(struct ndb *ndb, uint64_t subid)
{
struct ndb_subscription *sub;
- int index, elems_to_move;
+ int index, res, elems_to_move;
- if (!(sub = ndb_find_subscription(ndb, subid, &index)))
- return 0;
+ ndb_monitor_lock(&ndb->monitor);
+
+ if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, &index))) {
+ res = 0;
+ goto done;
+ }
ndb_subscription_destroy(sub);
@@ -6607,21 +6648,12 @@ int ndb_unsubscribe(struct ndb *ndb, uint64_t subid)
&ndb->monitor.subscriptions[index+1],
elems_to_move * sizeof(*sub));
- return 1;
-}
+ res = 1;
-struct ndb_filter *ndb_subscription_filters(struct ndb *ndb, uint64_t subid, int *filters)
-{
- struct ndb_subscription *sub;
+done:
+ ndb_monitor_unlock(&ndb->monitor);
- sub = ndb_find_subscription(ndb, subid, NULL);
- if (sub) {
- *filters = sub->group.num_filters;
- return sub->group.filters;
- }
-
- *filters = 0;
- return NULL;
+ return res;
}
int ndb_num_subscriptions(struct ndb *ndb)
@@ -6633,24 +6665,27 @@ uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filt
{
static uint64_t subids = 0;
struct ndb_subscription *sub;
- int index;
size_t buflen;
uint64_t subid;
char *buf;
+ ndb_monitor_lock(&ndb->monitor);
+
if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) {
fprintf(stderr, "too many subscriptions\n");
- return 0;
+ subid = 0;
+ goto done;
}
- index = ndb->monitor.num_subscriptions++;
- sub = &ndb->monitor.subscriptions[index];
+ sub = &ndb->monitor.subscriptions[ndb->monitor.num_subscriptions];
subid = ++subids;
sub->subid = subid;
ndb_filter_group_init(&sub->group);
- if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters))
- return 0;
+ if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) {
+ subid = 0;
+ goto done;
+ }
// 500k ought to be enough for anyone
buflen = sizeof(uint64_t) * DEFAULT_QUEUE_SIZE;
@@ -6658,8 +6693,13 @@ uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filt
if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) {
fprintf(stderr, "failed to push prot queue\n");
- return 0;
+ subid = 0;
+ goto done;
}
+ ndb->monitor.num_subscriptions++;
+done:
+ ndb_monitor_unlock(&ndb->monitor);
+
return subid;
}
diff --git a/src/nostrdb.h b/src/nostrdb.h
@@ -530,7 +530,6 @@ int ndb_wait_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int not
int ndb_poll_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int note_id_capacity);
int ndb_unsubscribe(struct ndb *, uint64_t subid);
int ndb_num_subscriptions(struct ndb *);
-struct ndb_filter *ndb_subscription_filters(struct ndb *, uint64_t subid, int *filters);
// FULLTEXT SEARCH
int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *, struct ndb_text_search_config *);