nostrdb-rs

nostrdb in rust!
git clone git://jb55.com/nostrdb-rs
Log | Files | Refs | Submodules | README | LICENSE

ndb.rs (13692B)


      1 use libc;
      2 use std::ffi::CString;
      3 use std::ptr;
      4 
      5 use crate::{
      6     bindings, Blocks, Config, Error, Filter, Note, NoteKey, ProfileRecord, QueryResult, Result,
      7     Subscription, Transaction,
      8 };
      9 use std::fs;
     10 use std::os::raw::c_int;
     11 use std::path::Path;
     12 use std::sync::Arc;
     13 use tokio::task; // Make sure to import the task module
     14 
     15 #[derive(Debug)]
     16 struct NdbRef {
     17     ndb: *mut bindings::ndb,
     18 }
     19 
     20 /// It's safe to have multi-threaded references to this because thread safety
     21 /// is guaranteed by LMDB
     22 unsafe impl Send for NdbRef {}
     23 unsafe impl Sync for NdbRef {}
     24 
     25 /// The database is automatically closed when [Ndb] is [Drop]ped.
     26 impl Drop for NdbRef {
     27     fn drop(&mut self) {
     28         unsafe {
     29             bindings::ndb_destroy(self.ndb);
     30         }
     31     }
     32 }
     33 
     34 /// A nostrdb context. Construct one of these with [Ndb::new].
     35 #[derive(Debug, Clone)]
     36 pub struct Ndb {
     37     refs: Arc<NdbRef>,
     38 }
     39 
     40 impl Ndb {
     41     /// Construct a new nostrdb context. Takes a directory where the database
     42     /// is/will be located and a nostrdb config.
     43     pub fn new(db_dir: &str, config: &Config) -> Result<Self> {
     44         let db_dir_cstr = match CString::new(db_dir) {
     45             Ok(cstr) => cstr,
     46             Err(_) => return Err(Error::DbOpenFailed),
     47         };
     48         let mut ndb: *mut bindings::ndb = ptr::null_mut();
     49 
     50         let path = Path::new(db_dir);
     51         if !path.exists() {
     52             let _ = fs::create_dir_all(&path);
     53         }
     54 
     55         let result = unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) };
     56 
     57         if result == 0 {
     58             return Err(Error::DbOpenFailed);
     59         }
     60 
     61         let refs = Arc::new(NdbRef { ndb });
     62         Ok(Ndb { refs })
     63     }
     64 
     65     /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]`
     66     /// This function returns immediately and doesn't provide any information on
     67     /// if ingestion was successful or not.
     68     pub fn process_event(&self, json: &str) -> Result<()> {
     69         // Convert the Rust string to a C-style string
     70         let c_json = CString::new(json).expect("CString::new failed");
     71         let c_json_ptr = c_json.as_ptr();
     72 
     73         // Get the length of the string
     74         let len = json.len() as libc::c_int;
     75 
     76         let res = unsafe { bindings::ndb_process_event(self.as_ptr(), c_json_ptr, len) };
     77 
     78         if res == 0 {
     79             return Err(Error::NoteProcessFailed);
     80         }
     81 
     82         Ok(())
     83     }
     84 
     85     pub fn query<'a>(
     86         &self,
     87         txn: &'a Transaction,
     88         filters: Vec<Filter>,
     89         max_results: i32,
     90     ) -> Result<Vec<QueryResult<'a>>> {
     91         let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect();
     92         let mut out: Vec<bindings::ndb_query_result> = vec![];
     93         let mut returned: i32 = 0;
     94         out.reserve_exact(max_results as usize);
     95         let res = unsafe {
     96             bindings::ndb_query(
     97                 txn.as_mut_ptr(),
     98                 ndb_filters.as_mut_ptr(),
     99                 ndb_filters.len() as i32,
    100                 out.as_mut_ptr(),
    101                 max_results,
    102                 &mut returned as *mut i32,
    103             )
    104         };
    105         if res == 1 {
    106             unsafe {
    107                 out.set_len(returned as usize);
    108             };
    109             Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect())
    110         } else {
    111             Err(Error::QueryError)
    112         }
    113     }
    114 
    115     pub fn subscribe(&self, filters: Vec<Filter>) -> Result<Subscription> {
    116         unsafe {
    117             let mut ndb_filters: Vec<bindings::ndb_filter> =
    118                 filters.iter().map(|a| a.data).collect();
    119             let id = bindings::ndb_subscribe(
    120                 self.as_ptr(),
    121                 ndb_filters.as_mut_ptr(),
    122                 filters.len() as i32,
    123             );
    124             if id == 0 {
    125                 Err(Error::SubscriptionError)
    126             } else {
    127                 Ok(Subscription { filters, id })
    128             }
    129         }
    130     }
    131 
    132     pub fn poll_for_notes(&self, sub: &Subscription, max_notes: u32) -> Vec<NoteKey> {
    133         let mut vec = vec![];
    134         vec.reserve_exact(max_notes as usize);
    135         let sub_id = sub.id;
    136 
    137         unsafe {
    138             let res = bindings::ndb_poll_for_notes(
    139                 self.as_ptr(),
    140                 sub_id,
    141                 vec.as_mut_ptr(),
    142                 max_notes as c_int,
    143             );
    144             vec.set_len(res as usize);
    145         };
    146 
    147         vec.into_iter().map(|n| NoteKey::new(n)).collect()
    148     }
    149 
    150     pub async fn wait_for_notes(&self, sub: &Subscription, max_notes: u32) -> Result<Vec<NoteKey>> {
    151         let ndb = self.clone();
    152         let sub_id = sub.id;
    153         let handle = task::spawn_blocking(move || {
    154             let mut vec: Vec<u64> = vec![];
    155             vec.reserve_exact(max_notes as usize);
    156             let res = unsafe {
    157                 bindings::ndb_wait_for_notes(
    158                     ndb.as_ptr(),
    159                     sub_id,
    160                     vec.as_mut_ptr(),
    161                     max_notes as c_int,
    162                 )
    163             };
    164             if res == 0 {
    165                 Err(Error::SubscriptionError)
    166             } else {
    167                 unsafe {
    168                     vec.set_len(res as usize);
    169                 };
    170                 Ok(vec)
    171             }
    172         });
    173 
    174         match handle.await {
    175             Ok(Ok(res)) => Ok(res.into_iter().map(|n| NoteKey::new(n)).collect()),
    176             Ok(Err(err)) => Err(err),
    177             Err(_) => Err(Error::SubscriptionError),
    178         }
    179     }
    180 
    181     pub fn get_profile_by_pubkey<'a>(
    182         &self,
    183         transaction: &'a Transaction,
    184         id: &[u8; 32],
    185     ) -> Result<ProfileRecord<'a>> {
    186         let mut len: usize = 0;
    187         let mut primkey: u64 = 0;
    188 
    189         let profile_record_ptr = unsafe {
    190             bindings::ndb_get_profile_by_pubkey(
    191                 transaction.as_mut_ptr(),
    192                 id.as_ptr(),
    193                 &mut len,
    194                 &mut primkey,
    195             )
    196         };
    197 
    198         if profile_record_ptr.is_null() {
    199             // Handle null pointer (e.g., note not found or error occurred)
    200             return Err(Error::NotFound);
    201         }
    202 
    203         // Convert the raw pointer to a Note instance
    204         Ok(ProfileRecord::new(
    205             profile_record_ptr,
    206             len,
    207             primkey,
    208             transaction,
    209         ))
    210     }
    211 
    212     pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<u64> {
    213         let res = unsafe {
    214             bindings::ndb_get_notekey_by_id(
    215                 txn.as_mut_ptr(),
    216                 id.as_ptr() as *const ::std::os::raw::c_uchar,
    217             )
    218         };
    219 
    220         if res == 0 {
    221             return Err(Error::NotFound);
    222         }
    223 
    224         Ok(res)
    225     }
    226 
    227     pub fn get_blocks_by_key<'a>(
    228         &self,
    229         txn: &'a Transaction,
    230         note_key: NoteKey,
    231     ) -> Result<Blocks<'a>> {
    232         let blocks_ptr = unsafe {
    233             bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64())
    234         };
    235 
    236         if blocks_ptr.is_null() {
    237             return Err(Error::NotFound);
    238         }
    239 
    240         Ok(Blocks::new_transactional(blocks_ptr, txn))
    241     }
    242 
    243     pub fn get_note_by_key<'a>(
    244         &self,
    245         transaction: &'a Transaction,
    246         note_key: NoteKey,
    247     ) -> Result<Note<'a>> {
    248         let mut len: usize = 0;
    249 
    250         let note_ptr = unsafe {
    251             bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len)
    252         };
    253 
    254         if note_ptr.is_null() {
    255             // Handle null pointer (e.g., note not found or error occurred)
    256             return Err(Error::NotFound);
    257         }
    258 
    259         // Convert the raw pointer to a Note instance
    260         Ok(Note::new_transactional(
    261             note_ptr,
    262             len,
    263             note_key,
    264             transaction,
    265         ))
    266     }
    267 
    268     /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id
    269     pub fn get_note_by_id<'a>(
    270         &self,
    271         transaction: &'a Transaction,
    272         id: &[u8; 32],
    273     ) -> Result<Note<'a>> {
    274         let mut len: usize = 0;
    275         let mut primkey: u64 = 0;
    276 
    277         let note_ptr = unsafe {
    278             bindings::ndb_get_note_by_id(
    279                 transaction.as_mut_ptr(),
    280                 id.as_ptr(),
    281                 &mut len,
    282                 &mut primkey,
    283             )
    284         };
    285 
    286         if note_ptr.is_null() {
    287             // Handle null pointer (e.g., note not found or error occurred)
    288             return Err(Error::NotFound);
    289         }
    290 
    291         // Convert the raw pointer to a Note instance
    292         Ok(Note::new_transactional(
    293             note_ptr,
    294             len,
    295             NoteKey::new(primkey),
    296             transaction,
    297         ))
    298     }
    299 
    300     /// Get the underlying pointer to the context in C
    301     pub fn as_ptr(&self) -> *mut bindings::ndb {
    302         return self.refs.ndb;
    303     }
    304 }
    305 
    306 #[cfg(test)]
    307 mod tests {
    308     use super::*;
    309     use crate::config::Config;
    310     use crate::test_util;
    311 
    312     #[test]
    313     fn ndb_init_works() {
    314         let db = "target/testdbs/init_works";
    315         test_util::cleanup_db(db);
    316 
    317         {
    318             let cfg = Config::new();
    319             let _ = Ndb::new(db, &cfg).expect("ok");
    320         }
    321     }
    322 
    323     #[tokio::test]
    324     async fn query_works() {
    325         let db = "target/testdbs/query";
    326         test_util::cleanup_db(&db);
    327 
    328         {
    329             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    330 
    331             let filter = Filter::new().kinds(vec![1]).build();
    332             let filters = vec![filter];
    333 
    334             let sub = ndb.subscribe(filters.clone()).expect("sub_id");
    335             let waiter = ndb.wait_for_notes(&sub, 1);
    336             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    337             let res = waiter.await.expect("await ok");
    338             assert_eq!(res, vec![NoteKey::new(1)]);
    339             let txn = Transaction::new(&ndb).expect("txn");
    340             let res = ndb.query(&txn, filters, 1).expect("query ok");
    341             assert_eq!(res.len(), 1);
    342             assert_eq!(
    343                 hex::encode(res[0].note.id()),
    344                 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3"
    345             );
    346         }
    347     }
    348 
    349     #[tokio::test]
    350     async fn subscribe_event_works() {
    351         let db = "target/testdbs/subscribe";
    352         test_util::cleanup_db(&db);
    353 
    354         {
    355             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    356 
    357             let filter = Filter::new().kinds(vec![1]).build();
    358 
    359             let sub = ndb.subscribe(vec![filter]).expect("sub_id");
    360             let waiter = ndb.wait_for_notes(&sub, 1);
    361             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    362             let res = waiter.await.expect("await ok");
    363             assert_eq!(res, vec![NoteKey::new(1)]);
    364         }
    365     }
    366 
    367     #[test]
    368     fn poll_note_works() {
    369         let db = "target/testdbs/poll";
    370         test_util::cleanup_db(&db);
    371 
    372         {
    373             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    374 
    375             let filter = Filter::new().kinds(vec![1]).build();
    376 
    377             let sub = ndb.subscribe(vec![filter]).expect("sub_id");
    378             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    379             // this is too fast, we should have nothing
    380             let res = ndb.poll_for_notes(&sub, 1);
    381             assert_eq!(res, vec![]);
    382 
    383             std::thread::sleep(std::time::Duration::from_millis(100));
    384             // now we should have something
    385             let res = ndb.poll_for_notes(&sub, 1);
    386             assert_eq!(res, vec![NoteKey::new(1)]);
    387         }
    388     }
    389 
    390     #[test]
    391     fn process_event_works() {
    392         let db = "target/testdbs/event_works";
    393         test_util::cleanup_db(&db);
    394 
    395         {
    396             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    397             ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    398         }
    399 
    400         {
    401             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    402             let id =
    403                 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3")
    404                     .expect("hex id");
    405             let mut txn = Transaction::new(&ndb).expect("txn");
    406             let id_bytes: [u8; 32] = id.try_into().expect("id bytes");
    407             let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note");
    408             assert_eq!(note.kind(), 1);
    409         }
    410     }
    411 }