commit d275f9fc55b8b3e6931366677223081761afc0a6
parent cd0f26ae81e845e405fb2f12ba50110b1a8aa252
Author: William Casarin <jb55@jb55.com>
Date: Wed, 10 Apr 2024 18:06:45 -0700
add ndb_unsubscribe
We didn't have a way to unsubscribe from subscriptions. Now we do!
Apps like notecrumbs may open up many local subscriptions based on
incoming requests. We may need to make the MAX_SUBSCRIPTIONS size much
larger, but this should be okish for now.
Changelog-Added: Add ndb_unsubscribe to unsubscribe from subscriptions
Signed-off-by: William Casarin <jb55@jb55.com>
Diffstat:
3 files changed, 46 insertions(+), 14 deletions(-)
diff --git a/src/nostrdb.c b/src/nostrdb.c
@@ -35,7 +35,7 @@
static const int THREAD_QUEUE_BATCH = 4096;
// maximum number of active subscriptions
-#define MAX_SUBSCRIPTIONS 32
+#define MAX_SUBSCRIPTIONS 256
#define MAX_SCAN_CURSORS 12
#define MAX_FILTERS 16
@@ -4236,7 +4236,7 @@ static int ndb_run_migrations(struct ndb *ndb)
static void ndb_monitor_init(struct ndb_monitor *monitor)
{
- monitor->num_subscriptions = 0;
+ memset(monitor, 0, sizeof(*monitor));
}
void ndb_filter_group_destroy(struct ndb_filter_group *group)
@@ -4249,18 +4249,19 @@ void ndb_filter_group_destroy(struct ndb_filter_group *group)
}
}
+static void ndb_subscription_destroy(struct ndb_subscription *sub)
+{
+ ndb_filter_group_destroy(&sub->group);
+ prot_queue_destroy(&sub->inbox);
+ sub->subid = 0;
+}
+
static void ndb_monitor_destroy(struct ndb_monitor *monitor)
{
int i;
- struct ndb_subscription *sub;
- struct ndb_filter_group *group;
for (i = 0; i < monitor->num_subscriptions; i++) {
- sub = &monitor->subscriptions[i];
- group = &sub->group;
-
- ndb_filter_group_destroy(group);
- prot_queue_destroy(&sub->inbox);
+ ndb_subscription_destroy(&monitor->subscriptions[i]);
}
}
@@ -5867,7 +5868,7 @@ struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, u
return blocks;
}
-struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid)
+struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid, int *index)
{
struct ndb_subscription *sub, *tsub;
int i;
@@ -5876,6 +5877,8 @@ struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid)
tsub = &ndb->monitor.subscriptions[i];
if (tsub->subid == subid) {
sub = tsub;
+ if (index)
+ *index = i;
break;
}
}
@@ -5891,7 +5894,7 @@ int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
if (subid == 0)
return 0;
- if (!(sub = ndb_find_subscription(ndb, subid)))
+ if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
return 0;
return prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity);
@@ -5906,12 +5909,36 @@ int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
if (subid == 0)
return 0;
- if (!(sub = ndb_find_subscription(ndb, subid)))
+ if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
return 0;
return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity);
}
+int ndb_unsubscribe(struct ndb *ndb, uint64_t subid)
+{
+ struct ndb_subscription *sub;
+ int index, elems_to_move;
+
+ if (!(sub = ndb_find_subscription(ndb, subid, &index)))
+ return 0;
+
+ ndb_subscription_destroy(sub);
+
+ elems_to_move = (--ndb->monitor.num_subscriptions) - index;
+
+ memmove(&ndb->monitor.subscriptions[index],
+ &ndb->monitor.subscriptions[index+1],
+ elems_to_move * sizeof(*sub));
+
+ return 1;
+}
+
+int ndb_num_subscriptions(struct ndb *ndb)
+{
+ return ndb->monitor.num_subscriptions;
+}
+
uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filters)
{
static uint64_t subids = 0;
diff --git a/src/nostrdb.h b/src/nostrdb.h
@@ -493,7 +493,8 @@ void ndb_filter_destroy(struct ndb_filter *);
uint64_t ndb_subscribe(struct ndb *, struct ndb_filter *, int num_filters);
int ndb_wait_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int note_id_capacity);
int ndb_poll_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int note_id_capacity);
-int ndb_unsubscribe(int subid);
+int ndb_unsubscribe(struct ndb *, uint64_t subid);
+int ndb_num_subscriptions(struct ndb *);
// FULLTEXT SEARCH
int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *, struct ndb_text_search_config *);
diff --git a/test.c b/test.c
@@ -44,7 +44,6 @@ static void print_search(struct ndb_txn *txn, struct ndb_search *search)
printf("\n");
}
-
static void test_filters()
{
struct ndb_filter filter, *f;
@@ -1409,6 +1408,11 @@ static void test_subscriptions()
assert((note = ndb_get_note_by_key(&txn, note_id, NULL)));
assert(!strcmp(ndb_note_content(note), "test"));
+ // unsubscribe
+ assert(ndb_num_subscriptions(ndb) == 1);
+ assert(ndb_unsubscribe(ndb, subid));
+ assert(ndb_num_subscriptions(ndb) == 0);
+
ndb_end_query(&txn);
ndb_destroy(ndb);
}