commit b654dc68ec6106a148ded3de027527145993728e
parent 6779a218652f4dc9f41d85a48756e5adf1f0ef42
Author: William Casarin <jb55@jb55.com>
Date: Fri, 21 Mar 2025 13:29:17 -0700
feat: add relay index and ingestion metadata
- Introduce `IngestMetadata` for capturing additional context (e.g.
relay info) when ingesting events.
- Add `process_event_with` to accept metadata, deprecating
`process_event` and `process_client_event`.
- Implement `NoteRelays` and its iterator for retrieving all relays
associated with a note.
Signed-off-by: William Casarin <jb55@jb55.com>
Diffstat:
6 files changed, 230 insertions(+), 25 deletions(-)
diff --git a/src/error.rs b/src/error.rs
@@ -27,6 +27,9 @@ pub enum Error {
#[error("Buffer overflow")]
BufferOverflow,
+ #[error("CString failed")]
+ CString(#[from] std::ffi::NulError),
+
#[error("Filter error: {0}")]
Filter(#[from] FilterError),
diff --git a/src/ingest.rs b/src/ingest.rs
@@ -0,0 +1,49 @@
+use crate::bindings;
+use std::ffi::CString;
+
+pub struct IngestMetadata {
+ meta: bindings::ndb_ingest_meta,
+ relay: Option<CString>,
+}
+
+impl Default for IngestMetadata {
+ fn default() -> Self {
+ Self {
+ relay: None,
+ meta: bindings::ndb_ingest_meta {
+ client: 0,
+ relay: std::ptr::null_mut(),
+ },
+ }
+ }
+}
+
+impl IngestMetadata {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Ingest a relay-sent event in the form `["EVENT", {"id:"...}]`
+ pub fn client(mut self, from_client: bool) -> Self {
+ self.meta.client = if from_client { 1 } else { 0 };
+ self
+ }
+
+ fn relay_str(&self) -> *const ::std::os::raw::c_char {
+ match &self.relay {
+ Some(relay_cstr) => relay_cstr.as_ptr(),
+ None => std::ptr::null(),
+ }
+ }
+
+ pub fn as_mut_ptr(&mut self) -> *mut bindings::ndb_ingest_meta {
+ // update the ingest relay str with our cstr if we have one
+ self.meta.relay = self.relay_str();
+ &mut self.meta
+ }
+
+ pub fn relay(mut self, relay: &str) -> Self {
+ self.relay = Some(CString::new(relay).expect("should never happen"));
+ self
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
@@ -18,11 +18,13 @@ mod future;
mod config;
mod error;
mod filter;
+mod ingest;
mod ndb;
mod ndb_str;
mod note;
mod profile;
mod query;
+mod relay;
mod result;
mod subscription;
mod tags;
@@ -35,12 +37,14 @@ pub use error::{Error, FilterError};
pub use filter::{Filter, FilterBuilder, FilterElement, FilterField, MutFilterField};
pub(crate) use future::SubscriptionState;
pub use future::SubscriptionStream;
+pub use ingest::IngestMetadata;
pub use ndb::Ndb;
pub use ndb_profile::{NdbProfile, NdbProfileRecord};
pub use ndb_str::{NdbStr, NdbStrVariant};
pub use note::{Note, NoteBuildOptions, NoteBuilder, NoteKey};
pub use profile::{ProfileKey, ProfileRecord};
pub use query::QueryResult;
+pub use relay::NoteRelays;
pub use result::Result;
pub use subscription::Subscription;
pub use tags::{Tag, TagIter, Tags, TagsIter};
diff --git a/src/ndb.rs b/src/ndb.rs
@@ -3,8 +3,9 @@ use std::ptr;
use crate::bindings::ndb_search;
use crate::{
- bindings, Blocks, Config, Error, Filter, Note, NoteKey, ProfileKey, ProfileRecord, QueryResult,
- Result, Subscription, SubscriptionState, SubscriptionStream, Transaction,
+ bindings, Blocks, Config, Error, Filter, IngestMetadata, Note, NoteKey, ProfileKey,
+ ProfileRecord, QueryResult, Result, Subscription, SubscriptionState, SubscriptionStream,
+ Transaction,
};
use futures::StreamExt;
use std::collections::hash_map::Entry;
@@ -122,18 +123,20 @@ impl Ndb {
Ok(Ndb { refs, subs })
}
- /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]`
+ /// Ingest a relay or client sent event, with optional relay metadata.
/// This function returns immediately and doesn't provide any information on
/// if ingestion was successful or not.
- pub fn process_event(&self, json: &str) -> Result<()> {
+ pub fn process_event_with(&self, json: &str, mut meta: IngestMetadata) -> Result<()> {
// Convert the Rust string to a C-style string
- let c_json = CString::new(json).expect("CString::new failed");
+ let c_json = CString::new(json)?;
let c_json_ptr = c_json.as_ptr();
// Get the length of the string
let len = json.len() as libc::c_int;
- let res = unsafe { bindings::ndb_process_event(self.as_ptr(), c_json_ptr, len) };
+ let res = unsafe {
+ bindings::ndb_process_event_with(self.as_ptr(), c_json_ptr, len, meta.as_mut_ptr())
+ };
if res == 0 {
return Err(Error::NoteProcessFailed);
@@ -142,24 +145,24 @@ impl Ndb {
Ok(())
}
+ /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]`
+ /// This function returns immediately and doesn't provide any information on
+ /// if ingestion was successful or not.
+ #[deprecated(
+ note = "Use `process_event_with` with IngestMetadata::new().client(false).relay(...)"
+ )]
+ pub fn process_event(&self, json: &str) -> Result<()> {
+ self.process_event_with(json, IngestMetadata::new().client(false))
+ }
+
/// Ingest a client-sent event in the form `["EVENT", {"id:"...}]`
/// This function returns immediately and doesn't provide any information on
/// if ingestion was successful or not.
+ #[deprecated(
+ note = "Use `process_event_with` with IngestMetadata::new().client(true).relay(...)"
+ )]
pub fn process_client_event(&self, json: &str) -> Result<()> {
- // Convert the Rust string to a C-style string
- let c_json = CString::new(json).expect("CString::new failed");
- let c_json_ptr = c_json.as_ptr();
-
- // Get the length of the string
- let len = json.len() as libc::c_int;
-
- let res = unsafe { bindings::ndb_process_client_event(self.as_ptr(), c_json_ptr, len) };
-
- if res == 0 {
- return Err(Error::NoteProcessFailed);
- }
-
- Ok(())
+ self.process_event_with(json, IngestMetadata::new().client(true))
}
pub fn query<'a>(
diff --git a/src/note.rs b/src/note.rs
@@ -1,8 +1,5 @@
-use crate::tags::Tags;
-use crate::transaction::Transaction;
-use crate::{bindings, Error};
-use ::std::os::raw::c_uchar;
-use std::hash::Hash;
+use crate::{bindings, tags::Tags, transaction::Transaction, Error, NoteRelays};
+use std::{hash::Hash, os::raw::c_uchar};
#[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash)]
pub struct NoteKey(u64);
@@ -151,6 +148,16 @@ impl<'a> Note<'a> {
}
}
+ /// Returns a database cursor that iterates over all of the relays
+ /// that the note has been seen on
+ pub fn relays(&self, txn: &'a Transaction) -> NoteRelays<'a> {
+ let Some(note_key) = self.key() else {
+ return NoteRelays::empty();
+ };
+
+ NoteRelays::new(txn, note_key)
+ }
+
pub fn key(&self) -> Option<NoteKey> {
match self {
Note::Transactional { key, .. } => Some(NoteKey::new(key.as_u64())),
diff --git a/src/relay.rs b/src/relay.rs
@@ -0,0 +1,139 @@
+use crate::{bindings, NoteKey, Transaction};
+use tracing::error;
+
+#[derive(Debug)]
+pub struct NoteRelaysIter<'a> {
+ _txn: &'a Transaction,
+ iter: bindings::ndb_note_relay_iterator,
+}
+
+#[derive(Debug)]
+pub enum NoteRelays<'a> {
+ Empty,
+ Active(NoteRelaysIter<'a>),
+}
+
+impl<'a> NoteRelays<'a> {
+ pub fn empty() -> Self {
+ Self::Empty
+ }
+
+ pub fn new(txn: &'a Transaction, note_key: NoteKey) -> Self {
+ Self::Active(NoteRelaysIter::new(txn, note_key))
+ }
+}
+
+impl<'a> NoteRelaysIter<'a> {
+ pub fn as_mut_ptr(&mut self) -> *mut bindings::ndb_note_relay_iterator {
+ &mut self.iter
+ }
+
+ pub fn new(txn: &'a Transaction, note_key: NoteKey) -> Self {
+ let note_key = note_key.as_u64();
+ let mut val = Self {
+ _txn: txn,
+ iter: empty_iterator(),
+ };
+
+ let ok = unsafe {
+ bindings::ndb_note_relay_iterate_start(txn.as_mut_ptr(), val.as_mut_ptr(), note_key)
+ };
+
+ if ok == 0 {
+ // NOTE (jb55): this should never happen, no need to burden the api. let's log just in case?
+ error!("error starting note relay iterator? {}", note_key);
+ }
+
+ val
+ }
+}
+
+fn empty_iterator() -> bindings::ndb_note_relay_iterator {
+ bindings::ndb_note_relay_iterator {
+ txn: std::ptr::null_mut(),
+ note_key: 0,
+ cursor_op: 0,
+ mdb_cur: std::ptr::null_mut(),
+ }
+}
+
+impl<'a> Iterator for NoteRelays<'a> {
+ type Item = &'a str;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let iter = match self {
+ Self::Empty => {
+ return None;
+ }
+ Self::Active(iter) => iter,
+ };
+
+ let relay = unsafe { bindings::ndb_note_relay_iterate_next(iter.as_mut_ptr()) };
+ if relay.is_null() {
+ return None;
+ }
+
+ let relay = unsafe {
+ let byte_slice = std::slice::from_raw_parts(relay as *const u8, libc::strlen(relay));
+ std::str::from_utf8_unchecked(byte_slice)
+ };
+
+ Some(relay)
+ }
+}
+
+impl Drop for NoteRelays<'_> {
+ fn drop(&mut self) {
+ let iter = match self {
+ Self::Empty => {
+ return;
+ }
+ Self::Active(iter) => iter,
+ };
+
+ unsafe {
+ bindings::ndb_note_relay_iterate_close(iter.as_mut_ptr());
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::{config::Config, test_util, IngestMetadata, Ndb};
+ use tokio::time::{self, sleep, Duration};
+
+ #[test]
+ fn process_event_relays_works() {
+ let db = "target/testdbs/relays_work";
+ test_util::cleanup_db(&db);
+
+ {
+ let ndb = Ndb::new(db, &Config::new()).expect("ndb");
+ let eva = r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#;
+ ndb.process_event_with(eva, IngestMetadata::new().client(false).relay("a"))
+ .expect("process ok");
+ let evb = r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#;
+ ndb.process_event_with(evb, IngestMetadata::new().client(false).relay("b"))
+ .expect("process ok");
+ let evc = r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#;
+ ndb.process_event_with(evc, IngestMetadata::new().client(false).relay("c"))
+ .expect("process ok");
+ }
+
+ {
+ let ndb = Ndb::new(db, &Config::new()).expect("ndb");
+ let id =
+ hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3")
+ .expect("hex id");
+ let mut txn = Transaction::new(&ndb).expect("txn");
+ let id_bytes: [u8; 32] = id.try_into().expect("id bytes");
+ let note = ndb.get_note_by_id(&txn, &id_bytes).expect("note");
+
+ let relays: Vec<&str> = note.relays(&txn).collect();
+ assert_eq!(relays, vec!["a", "b", "c"]);
+
+ assert_eq!(note.kind(), 1);
+ }
+ }
+}