commit e247bbc30d8d6a17a582861219151ed8ca92a501
parent b18c8a7119152923fdcaef99606da4d6f92f737b
Author: William Casarin <jb55@jb55.com>
Date: Sun, 24 Sep 2023 16:21:09 -0700
add ndb_process_client_event(s)
This interface makes more sense for clients sending events to relays and
eventually for initiating subscriptions.
Diffstat:
4 files changed, 180 insertions(+), 66 deletions(-)
diff --git a/nostrdb.c b/nostrdb.c
@@ -414,7 +414,8 @@ enum ndb_writer_msgtype {
struct ndb_ingester_event {
char *json;
- int len;
+ unsigned client : 1; // ["EVENT", {...}] messages
+ unsigned len : 31;
};
struct ndb_writer_note {
@@ -730,6 +731,41 @@ static int ndb_process_profile_note(struct ndb_note *note,
return 1;
}
+static int ndb_ingester_process_note(secp256k1_context *ctx,
+ struct ndb_note *note,
+ size_t note_size,
+ struct ndb_writer_msg *out)
+{
+ // Verify! If it's an invalid note we don't need to
+ // bothter writing it to the database
+ if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
+ ndb_debug("signature verification failed\n");
+ return 0;
+ }
+
+ // we didn't find anything. let's send it
+ // to the writer thread
+ note = realloc(note, note_size);
+ assert(((uint64_t)note % 4) == 0);
+
+ if (note->kind == 0) {
+ struct ndb_profile_record_builder *b =
+ &out->profile.record;
+
+ ndb_process_profile_note(note, b);
+
+ out->type = NDB_WRITER_PROFILE;
+ out->profile.note.note = note;
+ out->profile.note.note_len = note_size;
+ } else {
+ out->type = NDB_WRITER_NOTE;
+ out->note.note = note;
+ out->note.note_len = note_size;
+ }
+
+ return 1;
+}
+
static int ndb_ingester_process_event(secp256k1_context *ctx,
struct ndb_ingester *ingester,
@@ -739,12 +775,16 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
)
{
struct ndb_tce tce;
+ struct ndb_fce fce;
struct ndb_note *note;
struct ndb_ingest_controller controller;
struct ndb_id_cb cb;
void *buf;
+ int ok;
size_t bufsize, note_size;
+ ok = 0;
+
// we will use this to check if we already have it in the DB during
// ID parsing
controller.read_txn = read_txn;
@@ -762,6 +802,8 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
}
note_size =
+ ev->client ?
+ ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) :
ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb);
if (note_size == -42) {
@@ -775,54 +817,51 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
//ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json);
- switch (tce.evtype) {
- case NDB_TCE_NOTICE: goto cleanup;
- case NDB_TCE_EOSE: goto cleanup;
- case NDB_TCE_OK: goto cleanup;
- case NDB_TCE_EVENT:
- note = tce.event.note;
- if (note != buf) {
- ndb_debug("note buffer not equal to malloc'd buffer\n");
- goto cleanup;
- }
+ if (ev->client) {
+ switch (fce.evtype) {
+ case NDB_FCE_EVENT:
+ note = fce.event.note;
+ if (note != buf) {
+ ndb_debug("note buffer not equal to malloc'd buffer\n");
+ goto cleanup;
+ }
- // Verify! If it's an invalid note we don't need to
- // bothter writing it to the database
- if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
- ndb_debug("signature verification failed\n");
- goto cleanup;
+ if (!ndb_ingester_process_note(ctx, note, note_size, out))
+ goto cleanup;
+ else {
+ // we're done with the original json, free it
+ free(ev->json);
+ return 1;
+ }
}
+ } else {
+ switch (tce.evtype) {
+ case NDB_TCE_NOTICE: goto cleanup;
+ case NDB_TCE_EOSE: goto cleanup;
+ case NDB_TCE_OK: goto cleanup;
+ case NDB_TCE_EVENT:
+ note = tce.event.note;
+ if (note != buf) {
+ ndb_debug("note buffer not equal to malloc'd buffer\n");
+ goto cleanup;
+ }
- // we didn't find anything. let's send it
- // to the writer thread
- note = realloc(note, note_size);
- assert(((uint64_t)note % 4) == 0);
-
- if (note->kind == 0) {
- struct ndb_profile_record_builder *b =
- &out->profile.record;
-
- ndb_process_profile_note(note, b);
-
- out->type = NDB_WRITER_PROFILE;
- out->profile.note.note = note;
- out->profile.note.note_len = note_size;
- } else {
- out->type = NDB_WRITER_NOTE;
- out->note.note = note;
- out->note.note_len = note_size;
+ if (!ndb_ingester_process_note(ctx, note, note_size, out))
+ goto cleanup;
+ else {
+ // we're done with the original json, free it
+ free(ev->json);
+ return 1;
+ }
}
-
- // there's nothing left to do with the original json, so free it
- free(ev->json);
- return 1;
}
+
cleanup:
free(ev->json);
free(buf);
- return 0;
+ return ok;
}
static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db)
@@ -1317,13 +1356,14 @@ static int ndb_ingester_destroy(struct ndb_ingester *ingester)
}
static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
- char *json, int len)
+ char *json, unsigned len, unsigned client)
{
struct ndb_ingester_msg msg;
msg.type = NDB_INGEST_EVENT;
msg.event.json = json;
msg.event.len = len;
+ msg.event.client = client;
return threadpool_dispatch(&ingester->tp, &msg);
}
@@ -1516,7 +1556,29 @@ void ndb_destroy(struct ndb *ndb)
free(ndb);
}
-// Process a nostr event, ie: ["EVENT", "subid", {"content":"..."}...]
+// Process a nostr event from a client
+//
+// ie: ["EVENT", {"content":"..."} ...]
+//
+// The client-sent variation of ndb_process_event
+int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
+{
+ // Since we need to return as soon as possible, and we're not
+ // making any assumptions about the lifetime of the string, we
+ // definitely need to copy the json here. In the future once we
+ // have our thread that manages a websocket connection, we can
+ // avoid the copy and just use the buffer we get from that
+ // thread.
+ char *json_copy = strdupn(json, len);
+ if (json_copy == NULL)
+ return 0;
+
+ return ndb_ingester_queue_event(&ndb->ingester, json_copy, len, 1);
+}
+
+// Process anostr event from a relay,
+//
+// ie: ["EVENT", "subid", {"content":"..."}...]
//
// This function returns as soon as possible, first copying the passed
// json and then queueing it up for processing. Worker threads then take
@@ -1543,23 +1605,26 @@ int ndb_process_event(struct ndb *ndb, const char *json, int json_len)
if (json_copy == NULL)
return 0;
- return ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len);
+ return ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len, 0);
}
-int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len)
+
+int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client)
{
const char *start, *end, *very_end;
start = ldjson;
end = start + json_len;
very_end = ldjson + json_len;
+ int (* process)(struct ndb *, const char *, int);
#if DEBUG
int processed = 0;
#endif
+ process = client ? ndb_process_client_event : ndb_process_event;
while ((end = fast_strchr(start, '\n', very_end - start))) {
//printf("processing '%.*s'\n", (int)(end-start), start);
- if (!ndb_process_event(ndb, start, end - start)) {
- ndb_debug("ndb_process_event failed\n");
+ if (!process(ndb, start, end - start)) {
+ ndb_debug("ndb_process_client_event failed\n");
return 0;
}
start = end + 1;
@@ -1573,6 +1638,16 @@ int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len)
return 1;
}
+int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len)
+{
+ return _ndb_process_events(ndb, ldjson, json_len, 1);
+}
+
+int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len)
+{
+ return _ndb_process_events(ndb, ldjson, json_len, 0);
+}
+
static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag)
{
return cursor_push_u16(cur, tag->count);
@@ -1678,6 +1753,13 @@ static inline int ndb_json_parser_parse(struct ndb_json_parser *p,
return 1;
}
+static inline int toksize(jsmntok_t *tok)
+{
+ return tok->end - tok->start;
+}
+
+
+
static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2)
{
switch (c2) {
@@ -2074,11 +2156,6 @@ static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len,
return 0;
}
-static inline int toksize(jsmntok_t *tok)
-{
- return tok->end - tok->start;
-}
-
static int ndb_builder_finalize_tag(struct ndb_builder *builder,
union ndb_packed_str offset)
{
@@ -2227,6 +2304,37 @@ static int parse_unsigned_int(const char *start, int len, unsigned int *num)
return 1;
}
+int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce,
+ unsigned char *buf, int bufsize, struct ndb_id_cb *cb)
+{
+ jsmntok_t *tok = NULL;
+ int tok_len, res;
+ struct ndb_json_parser parser;
+
+ ndb_json_parser_init(&parser, json, len, buf, bufsize);
+
+ if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
+ return res;
+
+ if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY)
+ return 0;
+
+ parser.i = 1;
+ tok = &parser.toks[parser.i++];
+ tok_len = toksize(tok);
+ if (tok->type != JSMN_STRING)
+ return 0;
+
+ if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
+ fce->evtype = NDB_FCE_EVENT;
+ struct ndb_event *ev = &fce->event;
+ return ndb_parse_json_note(&parser, &ev->note);
+ }
+
+ return 0;
+}
+
+
int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
unsigned char *buf, int bufsize,
struct ndb_id_cb *cb)
@@ -2239,22 +2347,12 @@ int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
tce->subid = "";
ndb_json_parser_init(&parser, json, len, buf, bufsize);
+
if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
return res;
- if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY) {
- /*
- tok = &parser.toks[parser.json_parser.toknext-1];
- ndb_debug("failing at not enough takens (%d) or != JSMN_ARRAY @ '%.*s', '%.*s'\n",
- parser.num_tokens, 10, json + parser.json_parser.pos,
- toksize(tok), json + tok->start);
- tok = &parser.toks[parser.json_parser.toknext-2];
- ndb_debug("failing at not enough takens (%d) or != JSMN_ARRAY @ '%.*s', '%.*s'\n",
- parser.num_tokens, 10, json + parser.json_parser.pos,
- toksize(tok), json + tok->start);
- */
+ if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY)
return 0;
- }
parser.i = 1;
tok = &parser.toks[parser.i++];
diff --git a/nostrdb.h b/nostrdb.h
@@ -44,6 +44,11 @@ struct ndb_txn {
void *mdb_txn;
};
+// From-client event types
+enum fce_type {
+ NDB_FCE_EVENT = 0x1
+};
+
// To-client event types
enum tce_type {
NDB_TCE_EVENT = 0x1,
@@ -80,6 +85,14 @@ struct ndb_command_result {
};
+// From-client event
+struct ndb_fce {
+ enum fce_type evtype;
+ union {
+ struct ndb_event event;
+ };
+};
+
// To-client event
struct ndb_tce {
enum tce_type evtype;
@@ -177,6 +190,8 @@ int ndb_init(struct ndb **ndb, const char *dbdir, size_t mapsize, int ingester_t
int ndb_db_version(struct ndb *ndb);
int ndb_process_event(struct ndb *, const char *json, int len);
int ndb_process_events(struct ndb *, const char *ldjson, size_t len);
+int ndb_process_client_event(struct ndb *, const char *json, int len);
+int ndb_process_client_events(struct ndb *, const char *json, size_t len);
int ndb_begin_query(struct ndb *, struct ndb_txn *);
int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query);
int ndb_search_profile_next(struct ndb_search *search);
@@ -192,6 +207,7 @@ void ndb_destroy(struct ndb *);
// BUILDER
int ndb_parse_json_note(struct ndb_json_parser *, struct ndb_note **);
+int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce, unsigned char *buf, int bufsize, struct ndb_id_cb *cb);
int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, unsigned char *buf, int bufsize, struct ndb_id_cb *);
int ndb_note_from_json(const char *json, int len, struct ndb_note **, unsigned char *buf, int buflen);
int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf, int bufsize);
diff --git a/test.c b/test.c
@@ -95,7 +95,7 @@ static void test_profile_updates()
read_file("testdata/profile-updates.json", (unsigned char*)json, alloc_size, &written);
- assert(ndb_process_events(ndb, json, written));
+ assert(ndb_process_client_events(ndb, json, written));
ndb_destroy(ndb);
diff --git a/testdata/profile-updates.json b/testdata/profile-updates.json
@@ -1,3 +1,3 @@
-["EVENT","a",{"id": "9b2861dda8fc602ec2753f92f1a443c9565de606e0c8f4fd2db4f2506a3b13ca","pubkey": "87fbc6d59831a823a45d101f86942c41cde29023f4092024a27c50103c154001","created_at": 1695593347,"kind": 0,"tags": [],"content": "{\"name\":\"a\"}","sig": "f48da228f8967d33c3caf0a78f853b5144631eb86c7777fd25949123a5272a92765a0963d4686dd0efe05b7a9b986bfac8d43070b234153acbae5006d5a90f31"}]
-["EVENT","b",{"id": "a44eb8fb6931d6155b04038bef0624407e46c85c61e5758392cbb615f00184ca","pubkey": "87fbc6d59831a823a45d101f86942c41cde29023f4092024a27c50103c154001","created_at": 1695593354,"kind": 0,"tags": [],"content": "{\"name\":\"b\"}","sig": "7540bbde4b4479275e20d95acaa64027359a73989927f878825093cba2f468bd8e195919a77b4c230acecddf92e6b4bee26918b0c0842f84ec7c1fae82453906"}]
-["EVENT","c",{"id": "3090cdba2889acf1279efbf598e7e1cc701259c329cb11309a495f6c5204a7f6","pubkey": "87fbc6d59831a823a45d101f86942c41cde29023f4092024a27c50103c154001","created_at": 1695593357,"kind": 0,"tags": [],"content": "{\"name\":\"c\"}","sig": "f143edd4a8e60d882b0a19c7a4ee896a0c99da285fceab1ea704c5a9b6110f7b6983fe3c4a0a316c8ab1aa971323639a6049033c253a8c3684f99f76e781aa7c"}]
+["EVENT",{"id": "9b2861dda8fc602ec2753f92f1a443c9565de606e0c8f4fd2db4f2506a3b13ca","pubkey": "87fbc6d59831a823a45d101f86942c41cde29023f4092024a27c50103c154001","created_at": 1695593347,"kind": 0,"tags": [],"content": "{\"name\":\"a\"}","sig": "f48da228f8967d33c3caf0a78f853b5144631eb86c7777fd25949123a5272a92765a0963d4686dd0efe05b7a9b986bfac8d43070b234153acbae5006d5a90f31"}]
+["EVENT",{"id": "a44eb8fb6931d6155b04038bef0624407e46c85c61e5758392cbb615f00184ca","pubkey": "87fbc6d59831a823a45d101f86942c41cde29023f4092024a27c50103c154001","created_at": 1695593354,"kind": 0,"tags": [],"content": "{\"name\":\"b\"}","sig": "7540bbde4b4479275e20d95acaa64027359a73989927f878825093cba2f468bd8e195919a77b4c230acecddf92e6b4bee26918b0c0842f84ec7c1fae82453906"}]
+["EVENT",{"id": "3090cdba2889acf1279efbf598e7e1cc701259c329cb11309a495f6c5204a7f6","pubkey": "87fbc6d59831a823a45d101f86942c41cde29023f4092024a27c50103c154001","created_at": 1695593357,"kind": 0,"tags": [],"content": "{\"name\":\"c\"}","sig": "f143edd4a8e60d882b0a19c7a4ee896a0c99da285fceab1ea704c5a9b6110f7b6983fe3c4a0a316c8ab1aa971323639a6049033c253a8c3684f99f76e781aa7c"}]