nostrdb

an unfairly fast embedded nostr database backed by lmdb
git clone git://jb55.com/nostrdb
Log | Files | Refs | Submodules | README | LICENSE

commit 565e6895915d187fcbb9c05c90895dea4363b8f5
parent 3be8f4f5fee0a3df02d58278cdcfe77cbb3987aa
Author: William Casarin <jb55@jb55.com>
Date:   Wed, 19 Mar 2025 15:12:52 -0700

Initial relay index implementation

Add relay indexing for existing notes

This patch introduces a relay index for new notes and notes that have
already been stored, allowing the database to track additional relay
sources for a given note.

Changes:

- Added `NDB_WRITER_NOTE_RELAY` to handle relay indexing separately from
  new note ingestion.

- Implemented `ndb_write_note_relay()` and
  `ndb_write_note_relay_kind_index()` to store relay URLs.

- Modified `ndb_ingester_process_event()` to check for existing notes
  and append relay info if necessary.

- Introduced `ndb_note_has_relay()` to prevent duplicate relay entries.

- Updated LMDB schema with `NDB_DB_NOTE_RELAYS` (note_id -> relay) and
  `NDB_DB_NOTE_RELAY_KIND` (relay + kind + created_at -> note).

- Refactored `ndb_process_event()` to use `ndb_ingest_meta` for tracking
  relay sources.

- Ensured proper memory management for relay strings in writer thread.

With this change, nostrdb can better track where notes are seen across
different relays, improving query capabilities for relay-based data
retrieval.

Signed-off-by: William Casarin <jb55@jb55.com>

Diffstat:
M.gitignore | 2++
MTODO | 5-----
Msrc/nostrdb.c | 430+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Msrc/nostrdb.h | 17+++++++++++++++++
Mtest.c | 57+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 475 insertions(+), 36 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -1,4 +1,6 @@ tags +build.log +*.swp data.mdb lock.mdb v0-lock diff --git a/TODO b/TODO @@ -1,5 +0,0 @@ -subscription polling -execution plan for created_at query -note kind index rebuild migration -(A) filter from json -tags index migration diff --git a/src/nostrdb.c b/src/nostrdb.c @@ -129,6 +129,8 @@ struct ndb_ingest_controller { MDB_txn *read_txn; struct ndb_lmdb *lmdb; + struct ndb_note *note; + uint64_t note_key; }; enum ndb_writer_msgtype { @@ -139,6 +141,7 @@ enum ndb_writer_msgtype { NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched NDB_WRITER_BLOCKS, // write parsed note blocks NDB_WRITER_MIGRATE, // migrate the database + NDB_WRITER_NOTE_RELAY, // we already have the note, but we have more relays to write }; // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) @@ -1475,6 +1478,7 @@ static int ndb_db_is_index(enum ndb_dbs index) case NDB_DB_NDB_META: case NDB_DB_PROFILE_SEARCH: case NDB_DB_PROFILE_LAST_FETCH: + case NDB_DB_NOTE_RELAYS: case NDB_DBS: return 0; case NDB_DB_PROFILE_PK: @@ -1484,6 +1488,7 @@ static int ndb_db_is_index(enum ndb_dbs index) case NDB_DB_NOTE_TAGS: case NDB_DB_NOTE_PUBKEY: case NDB_DB_NOTE_PUBKEY_KIND: + case NDB_DB_NOTE_RELAY_KIND: return 1; } @@ -1499,6 +1504,125 @@ static inline void ndb_id_u64_ts_init(struct ndb_id_u64_ts *key, key->timestamp = timestamp; } +// formats the relay url buffer for the NDB_DB_NOTE_RELAYS value. It's a +// null terminated string padded to 8 bytes (we must keep the entire database +// aligned to 8 bytes at all times) +static int prepare_relay_buf(char *relay_buf, int bufsize, const char *relay, + int relay_len) +{ + struct cursor cur; + + // make sure the size of the buffer is aligned + assert((bufsize % 8) == 0); + + make_cursor((unsigned char *)relay_buf, (unsigned char *)relay_buf + bufsize, &cur); + + // push the relay string + if (!cursor_push(&cur, (unsigned char *)relay, relay_len)) + return 0; + + // relay urls are null terminated for convenience + if (!cursor_push_byte(&cur, 0)) + return 0; + + // align the buffer + if (!cursor_align(&cur, 8)) + return 0; + + return cur.p - cur.start; +} + +// Write to the note_id -> relay_url database. This records where notes +// have been seen +static int ndb_write_note_relay(struct ndb_txn *txn, uint64_t note_key, + const char *relay, int relay_len) +{ + char relay_buf[256]; + int rc, len; + MDB_val k, v; + + if (relay == NULL || relay_len == 0) + return 0; + + if (!(len = prepare_relay_buf(relay_buf, sizeof(relay_buf), relay, relay_len))) { + fprintf(stderr, "relay url '%s' too large when writing note relay index\n", relay); + return 0; + } + + assert((len % 8) == 0); + + k.mv_data = &note_key; + k.mv_size = sizeof(note_key); + + v.mv_data = relay_buf; + v.mv_size = len; + + // NODUPDATA is specified so that we don't accidently add duplicate + // key/value pairs + if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAYS], + &k, &v, MDB_NODUPDATA))) + { + ndb_debug("ndb_write_note_relay failed: %s\n", mdb_strerror(rc)); + return 0; + } + + return 1; +} + +static int ndb_write_note_relay_kind_index(struct ndb_txn *txn, + uint64_t kind, + uint64_t note_key, + uint64_t created_at, + const char *relay, + int relay_len) +{ + // The relay kind key has a layout like so + // + // - note_key: 00 + 8 bytes + // - kind: 08 + 8 bytes + // - created_at: 16 + 8 bytes + // - relay_url_size: 24 + 1 byte + // - relay_url: 25 + n byte null-terminated string + // - pad to 8 byte alignment + + unsigned char buf[256]; + int rc; + struct cursor cur; + MDB_val k, v; + + // come on bro + if (relay_len > 248) + return 0; + + if (relay == NULL || relay_len == 0) + return 0; + + make_cursor(buf, buf + sizeof(buf), &cur); + + if (!cursor_push(&cur, (unsigned char *)&note_key, 8)) return 0; + if (!cursor_push(&cur, (unsigned char *)&kind, 8)) return 0; + if (!cursor_push(&cur, (unsigned char *)&created_at, 8)) return 0; + if (!cursor_push_byte(&cur, (uint8_t)relay_len)) return 0; + if (!cursor_push(&cur, (unsigned char *)relay, relay_len)) return 0; + if (!cursor_align(&cur, 8)) return 0; + + assert(((cur.p-cur.start)%8) == 0); + + k.mv_data = cur.start; + k.mv_size = cur.p - cur.start; + + v.mv_data = NULL; + v.mv_size = 0; + + if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAY_KIND], &k, &v, 0))) { + fprintf(stderr, "write note relay kind index failed: %s\n", + mdb_strerror(rc)); + return 0; + } + + return 1; +} + static int ndb_write_note_pubkey_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t note_key) { @@ -1601,6 +1725,7 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, case NDB_DB_NDB_META: case NDB_DB_PROFILE_SEARCH: case NDB_DB_PROFILE_LAST_FETCH: + case NDB_DB_NOTE_RELAYS: case NDB_DBS: // this should never happen since we check at // the start @@ -1620,6 +1745,9 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, goto cleanup; } break; + case NDB_DB_NOTE_RELAY_KIND: + fprintf(stderr, "it doesn't make sense to rebuild note relay kind index\n"); + return 0; case NDB_DB_NOTE_PUBKEY_KIND: if (!ndb_write_note_pubkey_kind_index(txn, note, note_key)) { count = -1; @@ -1817,14 +1945,23 @@ enum ndb_ingester_msgtype { }; struct ndb_ingester_event { + const char *relay; char *json; unsigned client : 1; // ["EVENT", {...}] messages unsigned len : 31; }; +struct ndb_writer_note_relay { + const char *relay; + uint64_t note_key; + uint64_t kind; + uint64_t created_at; +}; + struct ndb_writer_note { struct ndb_note *note; size_t note_len; + const char *relay; }; struct ndb_writer_profile { @@ -1862,6 +1999,7 @@ struct ndb_writer_blocks { struct ndb_writer_msg { enum ndb_writer_msgtype type; union { + struct ndb_writer_note_relay note_relay; struct ndb_writer_note note; struct ndb_writer_profile profile; struct ndb_writer_ndb_meta ndb_meta; @@ -2075,6 +2213,7 @@ static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v) return 1; } + // get some value based on a clustered id key int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id, MDB_val *val) @@ -2241,9 +2380,10 @@ static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid hex_decode(hexid, 64, id, sizeof(id)); // let's see if we already have it - ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn); - if (!ndb_has_note(&txn, id)) + c->note = ndb_get_note_by_id(&txn, id, NULL, &c->note_key); + + if (c->note == NULL) return NDB_IDRES_CONT; return NDB_IDRES_STOP; @@ -2330,7 +2470,8 @@ int ndb_process_profile_note(struct ndb_note *note, } static int ndb_ingester_queue_event(struct ndb_ingester *ingester, - char *json, unsigned len, unsigned client) + char *json, unsigned len, + unsigned client, const char *relay) { struct ndb_ingester_msg msg; msg.type = NDB_INGEST_EVENT; @@ -2338,14 +2479,22 @@ static int ndb_ingester_queue_event(struct ndb_ingester *ingester, msg.event.json = json; msg.event.len = len; msg.event.client = client; + msg.event.relay = relay; return threadpool_dispatch(&ingester->tp, &msg); } +void ndb_ingest_meta_init(struct ndb_ingest_meta *meta, unsigned client, const char *relay) +{ + meta->client = client; + meta->relay = relay; +} static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json, - int len, unsigned client) + int len, struct ndb_ingest_meta *meta) { + const char *relay = meta->relay; + // Without this, we get bus errors in the json parser inside when // trying to ingest empty kind 6 reposts... we should probably do fuzz // testing on inputs to the json parser @@ -2362,7 +2511,13 @@ static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json, if (json_copy == NULL) return 0; - return ndb_ingester_queue_event(ingester, json_copy, len, client); + if (relay != NULL) { + relay = strdup(meta->relay); + if (relay == NULL) + return 0; + } + + return ndb_ingester_queue_event(ingester, json_copy, len, meta->client, relay); } @@ -2370,9 +2525,12 @@ static int ndb_ingester_process_note(secp256k1_context *ctx, struct ndb_note *note, size_t note_size, struct ndb_writer_msg *out, - struct ndb_ingester *ingester) + struct ndb_ingester *ingester, + const char *relay) { enum ndb_ingest_filter_action action; + struct ndb_ingest_meta meta; + action = NDB_INGEST_ACCEPT; if (ingester->filter) @@ -2412,24 +2570,81 @@ static int ndb_ingester_process_note(secp256k1_context *ctx, } else if (note->kind == 6) { // process the repost if we have a repost event ndb_debug("processing kind 6 repost\n"); + // dup the relay string + ndb_ingest_meta_init(&meta, 0, relay); ndb_ingest_event(ingester, ndb_note_content(note), - ndb_note_content_length(note), 0); + ndb_note_content_length(note), + &meta); } out->type = NDB_WRITER_NOTE; out->note.note = note; out->note.note_len = note_size; + out->note.relay = relay; return 1; } +int ndb_note_seen_on_relay(struct ndb_txn *txn, uint64_t note_key, const char *relay) +{ + MDB_val k, v; + MDB_cursor *cur; + int rc, len; + char relay_buf[256]; + + if (relay == NULL) + return 0; + + len = strlen(relay); + + if (!(len = prepare_relay_buf(relay_buf, sizeof(relay_buf), relay, len))) + return 0; + + assert((len % 8) == 0); + + k.mv_data = &note_key; + k.mv_size = sizeof(note_key); + + v.mv_data = relay_buf; + v.mv_size = len; + + if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAYS], &cur)) != MDB_SUCCESS) + return 0; + + rc = mdb_cursor_get(cur, &k, &v, MDB_GET_BOTH); + mdb_cursor_close(cur); + + return rc == MDB_SUCCESS; +} + +// process the relay for the note. this is called when we already have the +// note in the database but still need to check if the relay needs to be +// written to the relay indexes for corresponding note +static int ndb_process_note_relay(struct ndb_txn *txn, struct ndb_writer_msg *out, + uint64_t note_key, struct ndb_note *note, + const char *relay) +{ + // query to see if we already have the relay on this note + if (ndb_note_seen_on_relay(txn, note_key, relay)) { + return 0; + } + + // if not, tell the writer thread to emit a NOTE_RELAY event + out->type = NDB_WRITER_NOTE_RELAY; + + out->note_relay.relay = relay; + out->note_relay.note_key = note_key; + out->note_relay.kind = ndb_note_kind(note); + out->note_relay.created_at = ndb_note_created_at(note); + + return 1; +} static int ndb_ingester_process_event(secp256k1_context *ctx, struct ndb_ingester *ingester, struct ndb_ingester_event *ev, struct ndb_writer_msg *out, - MDB_txn *read_txn - ) + MDB_txn *read_txn) { struct ndb_tce tce; struct ndb_fce fce; @@ -2463,10 +2678,29 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) : ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb); + // This is a result from our special json parser. It parsed the id + // and found that we already have it in the database if ((int)note_size == -42) { - // we already have this! - //ndb_debug("already have id??\n"); - goto cleanup; + assert(controller.note != NULL); + assert(controller.note_key != 0); + struct ndb_txn txn; + ndb_txn_from_mdb(&txn, ingester->lmdb, read_txn); + + // we still need to process the relays on the note even + // if we already have it + if (ev->relay && ndb_process_note_relay(&txn, out, + controller.note_key, + controller.note, + ev->relay)) + { + // free note buf here since we don't pass the note to the writer thread + free(buf); + goto success; + } else { + // we already have the note and there are no new + // relays to process. nothing to write. + goto cleanup; + } } else if (note_size == 0) { ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json); goto cleanup; @@ -2484,13 +2718,12 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, } if (!ndb_ingester_process_note(ctx, note, note_size, - out, ingester)) { + out, ingester, + ev->relay)) { ndb_debug("failed to process note\n"); goto cleanup; } else { - // we're done with the original json, free it - free(ev->json); - return 1; + goto success; } } } else { @@ -2507,20 +2740,26 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, } if (!ndb_ingester_process_note(ctx, note, note_size, - out, ingester)) { + out, ingester, + ev->relay)) { ndb_debug("failed to process note\n"); goto cleanup; } else { - // we're done with the original json, free it - free(ev->json); - return 1; + goto success; } } } +success: + free(ev->json); + // we don't free relay or buf since those are passed to the writer thread + return 1; + cleanup: free(ev->json); + if (ev->relay) + free((void*)ev->relay); free(buf); return ok; @@ -2628,6 +2867,68 @@ retry: return 1; } +// +// The relay kind index has a layout like so (so we don't need dupsort) +// +// - note_id: 00 + 8 bytes +// - kind: 08 + 8 bytes +// - created_at: 16 + 8 bytes +// - relay_url_size: 24 + 1 byte +// - relay_url: 25 + n byte null-terminated string +// - pad to 8 byte alignment +// +// The key sort order is: +// +// relay_url, kind, created_at +// +static int ndb_relay_kind_cmp(const MDB_val *a, const MDB_val *b) +{ + int cmp; + MDB_val va, vb; + uint64_t iva, ivb; + unsigned char *ad = (unsigned char *)a->mv_data; + unsigned char *bd = (unsigned char *)b->mv_data; + assert(((uint64_t)a->mv_data % 8) == 0); + + va.mv_size = *(ad + 24); + va.mv_data = ad + 25; + + vb.mv_size = *(bd + 24); + vb.mv_data = bd + 25; + + cmp = mdb_cmp_memn(&va, &vb); + if (cmp) return cmp; + + // kind + iva = *(uint64_t*)(ad + 8); + ivb = *(uint64_t*)(bd + 8); + + if (iva < ivb) + return -1; + else if (iva > ivb) + return 1; + + // created_at + iva = *(uint64_t*)(ad + 16); + ivb = *(uint64_t*)(bd + 16); + + if (iva < ivb) + return -1; + else if (iva > ivb) + return 1; + + // note_id (so we don't need dupsort logic) + iva = *(uint64_t*)ad; + ivb = *(uint64_t*)bd; + + if (iva < ivb) + return -1; + else if (iva > ivb) + return 1; + + return 0; +} + static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b) { int cmp; @@ -4356,7 +4657,7 @@ static uint64_t ndb_write_note(struct ndb_txn *txn, unsigned char *scratch, size_t scratch_size, uint32_t ndb_flags) { - int rc; + int rc, relay_len = 0; uint64_t note_key, kind; MDB_dbi note_db; MDB_val key, val; @@ -4384,11 +4685,18 @@ static uint64_t ndb_write_note(struct ndb_txn *txn, return 0; } + if (note->relay != NULL) + relay_len = strlen(note->relay); + ndb_write_note_id_index(txn, note->note, note_key); ndb_write_note_kind_index(txn, note->note, note_key); ndb_write_note_tag_index(txn, note->note, note_key); ndb_write_note_pubkey_index(txn, note->note, note_key); ndb_write_note_pubkey_kind_index(txn, note->note, note_key); + ndb_write_note_relay_kind_index(txn, kind, note_key, + ndb_note_created_at(note->note), + note->relay, relay_len); + ndb_write_note_relay(txn, note_key, note->relay, relay_len); // only parse content and do fulltext index on text and longform notes if (kind == 1 || kind == 30023) { @@ -4562,7 +4870,7 @@ static void *ndb_writer_thread(void *data) struct ndb_writer *writer = data; struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; struct written_note written_notes[THREAD_QUEUE_BATCH]; - int i, popped, done, needs_commit, num_notes; + int i, popped, done, relay_len, needs_commit, num_notes; uint64_t note_nkey; struct ndb_txn txn; unsigned char *scratch; @@ -4590,6 +4898,7 @@ static void *ndb_writer_thread(void *data) case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break; case NDB_WRITER_BLOCKS: needs_commit = 1; break; case NDB_WRITER_MIGRATE: needs_commit = 1; break; + case NDB_WRITER_NOTE_RELAY: needs_commit = 1; break; case NDB_WRITER_QUIT: break; } } @@ -4643,6 +4952,20 @@ static void *ndb_writer_thread(void *data) }; } break; + case NDB_WRITER_NOTE_RELAY: + relay_len = strlen(msg->note_relay.relay); + ndb_write_note_relay(&txn, + msg->note_relay.note_key, + msg->note_relay.relay, + relay_len); + ndb_write_note_relay_kind_index( + &txn, + msg->note_relay.kind, + msg->note_relay.note_key, + msg->note_relay.created_at, + msg->note_relay.relay, + relay_len); + break; case NDB_WRITER_DBMETA: ndb_write_version(&txn, msg->ndb_meta.version); break; @@ -4683,11 +5006,15 @@ static void *ndb_writer_thread(void *data) msg = &msgs[i]; if (msg->type == NDB_WRITER_NOTE) { free(msg->note.note); + if (msg->note.relay) + free((void*)msg->note.relay); } else if (msg->type == NDB_WRITER_PROFILE) { free(msg->profile.note.note); //ndb_profile_record_builder_free(&msg->profile.record); - } else if (msg->type == NDB_WRITER_BLOCKS) { + } else if (msg->type == NDB_WRITER_BLOCKS) { ndb_blocks_free(msg->blocks.blocks); + } else if (msg->type == NDB_WRITER_NOTE_RELAY) { + free((void*)msg->note_relay.relay); } } } @@ -4928,6 +5255,20 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map return 0; } + // relay kind index. maps <relay_url><kind><created><note_id> primary keys to relay records + // see ndb_relay_kind_cmp function for more details on the key format + if ((rc = mdb_dbi_open(txn, "relay_kind", MDB_CREATE, &lmdb->dbs[NDB_DB_NOTE_RELAY_KIND]))) { + fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); + return 0; + } + mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_RELAY_KIND], ndb_relay_kind_cmp); + + // note_id -> relay index + if ((rc = mdb_dbi_open(txn, "note_relays", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, &lmdb->dbs[NDB_DB_NOTE_RELAYS]))) { + fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); + return 0; + } + // id+ts index flags unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; @@ -5109,6 +5450,7 @@ void ndb_destroy(struct ndb *ndb) free(ndb); } + // Process a nostr event from a client // // ie: ["EVENT", {"content":"..."} ...] @@ -5116,7 +5458,10 @@ void ndb_destroy(struct ndb *ndb) // The client-sent variation of ndb_process_event int ndb_process_client_event(struct ndb *ndb, const char *json, int len) { - return ndb_ingest_event(&ndb->ingester, json, len, 1); + struct ndb_ingest_meta meta; + ndb_ingest_meta_init(&meta, 1, NULL); + + return ndb_ingest_event(&ndb->ingester, json, len, &meta); } // Process anostr event from a relay, @@ -5138,25 +5483,32 @@ int ndb_process_client_event(struct ndb *ndb, const char *json, int len) // int ndb_process_event(struct ndb *ndb, const char *json, int json_len) { - return ndb_ingest_event(&ndb->ingester, json, json_len, 0); + struct ndb_ingest_meta meta; + ndb_ingest_meta_init(&meta, 0, NULL); + + return ndb_ingest_event(&ndb->ingester, json, json_len, &meta); } +int ndb_process_event_with(struct ndb *ndb, const char *json, int json_len, + struct ndb_ingest_meta *meta) +{ + return ndb_ingest_event(&ndb->ingester, json, json_len, meta); +} -int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client) +int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, + struct ndb_ingest_meta *meta) { const char *start, *end, *very_end; start = ldjson; end = start + json_len; very_end = ldjson + json_len; - int (* process)(struct ndb *, const char *, int); #if DEBUG int processed = 0; #endif - process = client ? ndb_process_client_event : ndb_process_event; while ((end = fast_strchr(start, '\n', very_end - start))) { //printf("processing '%.*s'\n", (int)(end-start), start); - if (!process(ndb, start, end - start)) { + if (!ndb_process_event_with(ndb, start, end - start, meta)) { ndb_debug("ndb_process_client_event failed\n"); return 0; } @@ -5194,14 +5546,26 @@ int ndb_process_events_stream(struct ndb *ndb, FILE* fp) } #endif +int ndb_process_events_with(struct ndb *ndb, const char *ldjson, size_t json_len, + struct ndb_ingest_meta *meta) +{ + return _ndb_process_events(ndb, ldjson, json_len, meta); +} + int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len) { - return _ndb_process_events(ndb, ldjson, json_len, 1); + struct ndb_ingest_meta meta; + ndb_ingest_meta_init(&meta, 1, NULL); + + return _ndb_process_events(ndb, ldjson, json_len, &meta); } int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len) { - return _ndb_process_events(ndb, ldjson, json_len, 0); + struct ndb_ingest_meta meta; + ndb_ingest_meta_init(&meta, 0, NULL); + + return _ndb_process_events(ndb, ldjson, json_len, &meta); } static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag) @@ -7086,6 +7450,10 @@ const char *ndb_db_name(enum ndb_dbs db) return "note_pubkey_index"; case NDB_DB_NOTE_PUBKEY_KIND: return "note_pubkey_kind_index"; + case NDB_DB_NOTE_RELAY_KIND: + return "note_relay_kind_index"; + case NDB_DB_NOTE_RELAYS: + return "note_relays"; case NDB_DBS: return "count"; } diff --git a/src/nostrdb.h b/src/nostrdb.h @@ -55,6 +55,11 @@ struct ndb_str { }; }; +struct ndb_ingest_meta { + unsigned client; + const char *relay; +}; + struct ndb_keypair { unsigned char pubkey[32]; unsigned char secret[32]; @@ -189,6 +194,8 @@ enum ndb_dbs { NDB_DB_NOTE_TAGS, // note tags index NDB_DB_NOTE_PUBKEY, // note pubkey index NDB_DB_NOTE_PUBKEY_KIND, // note pubkey kind index + NDB_DB_NOTE_RELAY_KIND, // relay+kind+created -> note_id + NDB_DB_NOTE_RELAYS, // note_id -> relays NDB_DBS, }; @@ -470,14 +477,23 @@ int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[3 // NDB int ndb_init(struct ndb **ndb, const char *dbdir, const struct ndb_config *); int ndb_db_version(struct ndb_txn *txn); + +// NOTE PROCESSING int ndb_process_event(struct ndb *, const char *json, int len); +void ndb_ingest_meta_init(struct ndb_ingest_meta *meta, unsigned client, const char *relay); +// Process an event, recording the relay where it came from. +int ndb_process_event_with(struct ndb *, const char *json, int len, struct ndb_ingest_meta *meta); int ndb_process_events(struct ndb *, const char *ldjson, size_t len); +int ndb_process_events_with(struct ndb *ndb, const char *ldjson, size_t json_len, struct ndb_ingest_meta *meta); #ifndef _WIN32 // TODO: fix on windows int ndb_process_events_stream(struct ndb *, FILE* fp); #endif +// deprecated: use ndb_ingest_event_with int ndb_process_client_event(struct ndb *, const char *json, int len); +// deprecated: use ndb_ingest_events_with int ndb_process_client_events(struct ndb *, const char *json, size_t len); + int ndb_begin_query(struct ndb *, struct ndb_txn *); int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query); int ndb_search_profile_next(struct ndb_search *search); @@ -492,6 +508,7 @@ uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char * struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *primkey); struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len); void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len); +int ndb_note_seen_on_relay(struct ndb_txn *txn, uint64_t note_key, const char *relay); void ndb_destroy(struct ndb *); // BUILDER diff --git a/test.c b/test.c @@ -1779,7 +1779,64 @@ static void test_filter_parse_search_json() { assert(!strcmp((const char*)buf, json)); } +static void test_note_relay_index() +{ + struct ndb *ndb; + struct ndb_txn txn; + struct ndb_config config; + struct ndb_note *note; + struct ndb_filter filter, *f = &filter; + uint64_t note_id, subid; + struct ndb_ingest_meta meta; + + const char *json = "[\"EVENT\",{\"id\": \"0f20295584a62d983a4fa85f7e50b460cd0049f94d8cd250b864bb822a747114\",\"pubkey\": \"55c882cf4a255ac66fc8507e718a1d1283ba46eb7d678d0573184dada1a4f376\",\"created_at\": 1742498339,\"kind\": 1,\"tags\": [],\"content\": \"hi\",\"sig\": \"ae1218280f554ea0b04ae09921031493d60fb7831dfd2dbd7086efeace2719a46842ce80342ebc002da8943df02e98b8b4abb4629c7103ca2114e6c4425f97fe\"}]"; + + // Initialize NDB + ndb_default_config(&config); + assert(ndb_init(&ndb, test_dir, &config)); + + // 1) Ingest the note from “relay1”. + // Use ndb_ingest_meta_init to record the relay. + + assert(ndb_filter_init(f)); + assert(ndb_filter_start_field(f, NDB_FILTER_KINDS)); + assert(ndb_filter_add_int_element(f, 1)); + ndb_filter_end_field(f); + ndb_filter_end(f); + + assert((subid = ndb_subscribe(ndb, f, 1))); + + ndb_ingest_meta_init(&meta, 1, "wss://relay.damus.io"); + assert(ndb_process_event_with(ndb, json, strlen(json), &meta)); + + assert(ndb_wait_for_notes(ndb, subid, &note_id, 1) == 1); + assert(note_id > 0); + assert(ndb_begin_query(ndb, &txn)); + + assert((note = ndb_get_note_by_key(&txn, note_id, NULL))); + + ndb_end_query(&txn); + + // 3) Ingest it again from a new relay: “relay2” + ndb_ingest_meta_init(&meta, 1, "wss://relay.mit.edu"); + assert(ndb_process_event_with(ndb, json, strlen(json), &meta)); + + // TODO: subscribe to this somehow if we have a relay filter? + sleep(1); + + // 4) Check that we have both relays + assert(ndb_begin_query(ndb, &txn)); + assert(ndb_note_seen_on_relay(&txn, note_id, "wss://relay.damus.io")); + assert(ndb_note_seen_on_relay(&txn, note_id, "wss://relay.mit.edu")); + + // Cleanup + ndb_destroy(ndb); + + printf("test_note_relay_index passed!\n"); +} + int main(int argc, const char *argv[]) { + test_note_relay_index(); test_filter_search(); test_filter_parse_search_json(); test_parse_filter_json();