commit dacade299d245d444b1de6381105d73eb4c412e8
parent cdacbcfdca3b255876c154dc87a4b51fdc51aada
Author: William Casarin <jb55@jb55.com>
Date:   Sun, 24 Sep 2023 17:06:04 -0700
ndb: bump nostrdb to support client->relay note processing
Diffstat:
2 files changed, 180 insertions(+), 66 deletions(-)
diff --git a/nostrdb/nostrdb.c b/nostrdb/nostrdb.c
@@ -414,7 +414,8 @@ enum ndb_writer_msgtype {
 
 struct ndb_ingester_event {
 	char *json;
-	int len;
+	unsigned client : 1; // ["EVENT", {...}] messages
+	unsigned len : 31;
 };
 
 struct ndb_writer_note {
@@ -730,6 +731,41 @@ static int ndb_process_profile_note(struct ndb_note *note,
 	return 1;
 }
 
+static int ndb_ingester_process_note(secp256k1_context *ctx,
+				     struct ndb_note *note,
+				     size_t note_size,
+				     struct ndb_writer_msg *out)
+{
+	// Verify! If it's an invalid note we don't need to
+	// bothter writing it to the database
+	if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
+		ndb_debug("signature verification failed\n");
+		return 0;
+	}
+
+	// we didn't find anything. let's send it
+	// to the writer thread
+	note = realloc(note, note_size);
+	assert(((uint64_t)note % 4) == 0);
+
+	if (note->kind == 0) {
+		struct ndb_profile_record_builder *b = 
+			&out->profile.record;
+
+		ndb_process_profile_note(note, b);
+
+		out->type = NDB_WRITER_PROFILE;
+		out->profile.note.note = note;
+		out->profile.note.note_len = note_size;
+	} else {
+		out->type = NDB_WRITER_NOTE;
+		out->note.note = note;
+		out->note.note_len = note_size;
+	}
+
+	return 1;
+}
+
 
 static int ndb_ingester_process_event(secp256k1_context *ctx,
 				      struct ndb_ingester *ingester,
@@ -739,12 +775,16 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
 				      )
 {
 	struct ndb_tce tce;
+	struct ndb_fce fce;
 	struct ndb_note *note;
 	struct ndb_ingest_controller controller;
 	struct ndb_id_cb cb;
 	void *buf;
+	int ok;
 	size_t bufsize, note_size;
 
+	ok = 0;
+
 	// we will use this to check if we already have it in the DB during
 	// ID parsing
 	controller.read_txn = read_txn;
@@ -762,6 +802,8 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
 	}
 
 	note_size =
+		ev->client ? 
+		ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) :
 		ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb);
 
 	if (note_size == -42) {
@@ -775,54 +817,51 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
 
 	//ndb_debug("parsed evtype:%d '%.*s'\n", tce.evtype, ev->len, ev->json);
 
-	switch (tce.evtype) {
-	case NDB_TCE_NOTICE: goto cleanup;
-	case NDB_TCE_EOSE:   goto cleanup;
-	case NDB_TCE_OK:     goto cleanup;
-	case NDB_TCE_EVENT:
-		note = tce.event.note;
-		if (note != buf) {
-			ndb_debug("note buffer not equal to malloc'd buffer\n");
-			goto cleanup;
-		}
+	if (ev->client) {
+		switch (fce.evtype) {
+		case NDB_FCE_EVENT:
+			note = fce.event.note;
+			if (note != buf) {
+				ndb_debug("note buffer not equal to malloc'd buffer\n");
+				goto cleanup;
+			}
 
-		// Verify! If it's an invalid note we don't need to
-		// bothter writing it to the database
-		if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
-			ndb_debug("signature verification failed\n");
-			goto cleanup;
+			if (!ndb_ingester_process_note(ctx, note, note_size, out))
+				goto cleanup;
+			else {
+				// we're done with the original json, free it
+				free(ev->json);
+				return 1;
+			}
 		}
+	} else {
+		switch (tce.evtype) {
+		case NDB_TCE_NOTICE: goto cleanup;
+		case NDB_TCE_EOSE:   goto cleanup;
+		case NDB_TCE_OK:     goto cleanup;
+		case NDB_TCE_EVENT:
+			note = tce.event.note;
+			if (note != buf) {
+				ndb_debug("note buffer not equal to malloc'd buffer\n");
+				goto cleanup;
+			}
 
-		// we didn't find anything. let's send it
-		// to the writer thread
-		note = realloc(note, note_size);
-		assert(((uint64_t)note % 4) == 0);
-
-		if (note->kind == 0) {
-			struct ndb_profile_record_builder *b = 
-				&out->profile.record;
-
-			ndb_process_profile_note(note, b);
-
-			out->type = NDB_WRITER_PROFILE;
-			out->profile.note.note = note;
-			out->profile.note.note_len = note_size;
-		} else {
-			out->type = NDB_WRITER_NOTE;
-			out->note.note = note;
-			out->note.note_len = note_size;
+			if (!ndb_ingester_process_note(ctx, note, note_size, out))
+				goto cleanup;
+			else {
+				// we're done with the original json, free it
+				free(ev->json);
+				return 1;
+			}
 		}
-
-		// there's nothing left to do with the original json, so free it
-		free(ev->json);
-		return 1;
 	}
 
+
 cleanup:
 	free(ev->json);
 	free(buf);
 
-	return 0;
+	return ok;
 }
 
 static uint64_t ndb_get_last_key(MDB_txn *txn, MDB_dbi db)
@@ -1089,7 +1128,7 @@ static void ndb_write_version(struct ndb_lmdb *lmdb, MDB_txn *txn, uint64_t vers
 		return;
 	}
 
-	fprintf(stderr, "writing version %" PRIu64 "\n", version);
+	//fprintf(stderr, "writing version %" PRIu64 "\n", version);
 }
 
 static void *ndb_writer_thread(void *data)
@@ -1317,13 +1356,14 @@ static int ndb_ingester_destroy(struct ndb_ingester *ingester)
 }
 
 static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
-				    char *json, int len)
+				    char *json, unsigned len, unsigned client)
 {
 	struct ndb_ingester_msg msg;
 	msg.type = NDB_INGEST_EVENT;
 
 	msg.event.json = json;
 	msg.event.len = len;
+	msg.event.client = client;
 
 	return threadpool_dispatch(&ingester->tp, &msg);
 }
@@ -1431,7 +1471,7 @@ static int ndb_run_migrations(struct ndb *ndb)
 	latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]);
 
 	if ((version = ndb_db_version(ndb)) == -1) {
-		fprintf(stderr, "run_migrations: no version found, assuming new db\n");
+		ndb_debug("run_migrations: no version found, assuming new db\n");
 		version = latest_version;
 
 		// no version found. fresh db?
@@ -1442,11 +1482,11 @@ static int ndb_run_migrations(struct ndb *ndb)
 
 		return 1;
 	} else {
-		fprintf(stderr, "ndb: version %" PRIu64 " found\n", version);
+		ndb_debug("ndb: version %" PRIu64 " found\n", version);
 	}
 
 	if (version < latest_version)
-		fprintf(stderr, "nostrdb: migrating v%d -> v%d\n",
+		ndb_debug("nostrdb: migrating v%d -> v%d\n",
 				(int)version, (int)latest_version);
 
 	for (i = version; i < latest_version; i++) {
@@ -1516,7 +1556,29 @@ void ndb_destroy(struct ndb *ndb)
 	free(ndb);
 }
 
-// Process a nostr event, ie: ["EVENT", "subid", {"content":"..."}...]
+// Process a nostr event from a client
+//
+// ie: ["EVENT", {"content":"..."} ...]
+//
+// The client-sent variation of ndb_process_event
+int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
+{
+	// Since we need to return as soon as possible, and we're not
+	// making any assumptions about the lifetime of the string, we
+	// definitely need to copy the json here. In the future once we
+	// have our thread that manages a websocket connection, we can
+	// avoid the copy and just use the buffer we get from that
+	// thread.
+	char *json_copy = strdupn(json, len);
+	if (json_copy == NULL)
+		return 0;
+
+	return ndb_ingester_queue_event(&ndb->ingester, json_copy, len, 1);
+}
+
+// Process anostr event from a relay,
+//
+// ie: ["EVENT", "subid", {"content":"..."}...]
 // 
 // This function returns as soon as possible, first copying the passed
 // json and then queueing it up for processing. Worker threads then take
@@ -1543,23 +1605,26 @@ int ndb_process_event(struct ndb *ndb, const char *json, int json_len)
 	if (json_copy == NULL)
 		return 0;
 
-	return ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len);
+	return ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len, 0);
 }
 
-int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len)
+
+int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client)
 {
 	const char *start, *end, *very_end;
 	start = ldjson;
 	end = start + json_len;
 	very_end = ldjson + json_len;
+	int (* process)(struct ndb *, const char *, int);
 #if DEBUG
 	int processed = 0;
 #endif
+	process = client ? ndb_process_client_event : ndb_process_event;
 
 	while ((end = fast_strchr(start, '\n', very_end - start))) {
 		//printf("processing '%.*s'\n", (int)(end-start), start);
-		if (!ndb_process_event(ndb, start, end - start)) {
-			ndb_debug("ndb_process_event failed\n");
+		if (!process(ndb, start, end - start)) {
+			ndb_debug("ndb_process_client_event failed\n");
 			return 0;
 		}
 		start = end + 1;
@@ -1573,6 +1638,16 @@ int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len)
 	return 1;
 }
 
+int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len)
+{
+	return _ndb_process_events(ndb, ldjson, json_len, 1);
+}
+
+int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len)
+{
+	return _ndb_process_events(ndb, ldjson, json_len, 0);
+}
+
 static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag)
 {
 	return cursor_push_u16(cur, tag->count);
@@ -1678,6 +1753,13 @@ static inline int ndb_json_parser_parse(struct ndb_json_parser *p,
 	return 1;
 }
 
+static inline int toksize(jsmntok_t *tok)
+{
+	return tok->end - tok->start;
+}
+
+
+
 static int cursor_push_unescaped_char(struct cursor *cur, char c1, char c2)
 {
 	switch (c2) {
@@ -2074,11 +2156,6 @@ static inline int jsoneq(const char *json, jsmntok_t *tok, int tok_len,
 	return 0;
 }
 
-static inline int toksize(jsmntok_t *tok)
-{
-	return tok->end - tok->start;
-}
-
 static int ndb_builder_finalize_tag(struct ndb_builder *builder,
 				    union ndb_packed_str offset)
 {
@@ -2227,6 +2304,37 @@ static int parse_unsigned_int(const char *start, int len, unsigned int *num)
 	return 1;
 }
 
+int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce,
+			       unsigned char *buf, int bufsize, struct ndb_id_cb *cb)
+{
+	jsmntok_t *tok = NULL;
+	int tok_len, res;
+	struct ndb_json_parser parser;
+
+	ndb_json_parser_init(&parser, json, len, buf, bufsize);
+
+	if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
+		return res;
+
+	if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY)
+		return 0;
+
+	parser.i = 1;
+	tok = &parser.toks[parser.i++];
+	tok_len = toksize(tok);
+	if (tok->type != JSMN_STRING)
+		return 0;
+
+	if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
+		fce->evtype = NDB_FCE_EVENT;
+		struct ndb_event *ev = &fce->event;
+		return ndb_parse_json_note(&parser, &ev->note);
+	}
+
+	return 0;
+}
+
+
 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
 			   unsigned char *buf, int bufsize,
 			   struct ndb_id_cb *cb)
@@ -2239,22 +2347,12 @@ int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
 	tce->subid = "";
 
 	ndb_json_parser_init(&parser, json, len, buf, bufsize);
+
 	if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
 		return res;
 
-	if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY) {
-		/*
-		tok = &parser.toks[parser.json_parser.toknext-1];
-		ndb_debug("failing at not enough takens (%d) or != JSMN_ARRAY @ '%.*s', '%.*s'\n",
-				parser.num_tokens, 10, json + parser.json_parser.pos,
-				toksize(tok), json + tok->start);
-		tok = &parser.toks[parser.json_parser.toknext-2];
-		ndb_debug("failing at not enough takens (%d) or != JSMN_ARRAY @ '%.*s', '%.*s'\n",
-				parser.num_tokens, 10, json + parser.json_parser.pos,
-				toksize(tok), json + tok->start);
-				*/
+	if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY)
 		return 0;
-	}
 
 	parser.i = 1;
 	tok = &parser.toks[parser.i++];
diff --git a/nostrdb/nostrdb.h b/nostrdb/nostrdb.h
@@ -44,6 +44,11 @@ struct ndb_txn {
 	void *mdb_txn;
 };
 
+// From-client event types
+enum fce_type {
+	NDB_FCE_EVENT = 0x1
+};
+
 // To-client event types
 enum tce_type {
 	NDB_TCE_EVENT  = 0x1,
@@ -80,6 +85,14 @@ struct ndb_command_result {
 };
 
 
+// From-client event
+struct ndb_fce {
+	enum fce_type evtype;
+	union {
+		struct ndb_event event;
+	};
+};
+
 // To-client event
 struct ndb_tce {
 	enum tce_type evtype;
@@ -177,6 +190,8 @@ int ndb_init(struct ndb **ndb, const char *dbdir, size_t mapsize, int ingester_t
 int ndb_db_version(struct ndb *ndb);
 int ndb_process_event(struct ndb *, const char *json, int len);
 int ndb_process_events(struct ndb *, const char *ldjson, size_t len);
+int ndb_process_client_event(struct ndb *, const char *json, int len);
+int ndb_process_client_events(struct ndb *, const char *json, size_t len);
 int ndb_begin_query(struct ndb *, struct ndb_txn *);
 int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query);
 int ndb_search_profile_next(struct ndb_search *search);
@@ -192,6 +207,7 @@ void ndb_destroy(struct ndb *);
 
 // BUILDER
 int ndb_parse_json_note(struct ndb_json_parser *, struct ndb_note **);
+int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce, unsigned char *buf, int bufsize, struct ndb_id_cb *cb);
 int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce, unsigned char *buf, int bufsize, struct ndb_id_cb *);
 int ndb_note_from_json(const char *json, int len, struct ndb_note **, unsigned char *buf, int buflen);
 int ndb_builder_init(struct ndb_builder *builder, unsigned char *buf, int bufsize);