nostrdb-rs

nostrdb in rust!
git clone git://jb55.com/nostrdb-rs
Log | Files | Refs | Submodules | README | LICENSE

ndb.rs (14412B)


      1 use std::ffi::CString;
      2 use std::ptr;
      3 
      4 use crate::{
      5     bindings, Blocks, Config, Error, Filter, Note, NoteKey, ProfileKey, ProfileRecord, QueryResult,
      6     Result, Subscription, Transaction,
      7 };
      8 use std::fs;
      9 use std::os::raw::c_int;
     10 use std::path::Path;
     11 use std::sync::Arc;
     12 use tokio::task; // Make sure to import the task module
     13 
     14 #[derive(Debug)]
     15 struct NdbRef {
     16     ndb: *mut bindings::ndb,
     17 }
     18 
     19 /// It's safe to have multi-threaded references to this because thread safety
     20 /// is guaranteed by LMDB
     21 unsafe impl Send for NdbRef {}
     22 unsafe impl Sync for NdbRef {}
     23 
     24 /// The database is automatically closed when [Ndb] is [Drop]ped.
     25 impl Drop for NdbRef {
     26     fn drop(&mut self) {
     27         unsafe {
     28             bindings::ndb_destroy(self.ndb);
     29         }
     30     }
     31 }
     32 
     33 /// A nostrdb context. Construct one of these with [Ndb::new].
     34 #[derive(Debug, Clone)]
     35 pub struct Ndb {
     36     refs: Arc<NdbRef>,
     37 }
     38 
     39 impl Ndb {
     40     /// Construct a new nostrdb context. Takes a directory where the database
     41     /// is/will be located and a nostrdb config.
     42     pub fn new(db_dir: &str, config: &Config) -> Result<Self> {
     43         let db_dir_cstr = match CString::new(db_dir) {
     44             Ok(cstr) => cstr,
     45             Err(_) => return Err(Error::DbOpenFailed),
     46         };
     47         let mut ndb: *mut bindings::ndb = ptr::null_mut();
     48 
     49         let path = Path::new(db_dir);
     50         if !path.exists() {
     51             let _ = fs::create_dir_all(path);
     52         }
     53 
     54         let result = unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) };
     55 
     56         if result == 0 {
     57             return Err(Error::DbOpenFailed);
     58         }
     59 
     60         let refs = Arc::new(NdbRef { ndb });
     61         Ok(Ndb { refs })
     62     }
     63 
     64     /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]`
     65     /// This function returns immediately and doesn't provide any information on
     66     /// if ingestion was successful or not.
     67     pub fn process_event(&self, json: &str) -> Result<()> {
     68         // Convert the Rust string to a C-style string
     69         let c_json = CString::new(json).expect("CString::new failed");
     70         let c_json_ptr = c_json.as_ptr();
     71 
     72         // Get the length of the string
     73         let len = json.len() as libc::c_int;
     74 
     75         let res = unsafe { bindings::ndb_process_event(self.as_ptr(), c_json_ptr, len) };
     76 
     77         if res == 0 {
     78             return Err(Error::NoteProcessFailed);
     79         }
     80 
     81         Ok(())
     82     }
     83 
     84     pub fn query<'a>(
     85         &self,
     86         txn: &'a Transaction,
     87         filters: Vec<Filter>,
     88         max_results: i32,
     89     ) -> Result<Vec<QueryResult<'a>>> {
     90         let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect();
     91         let mut out: Vec<bindings::ndb_query_result> = vec![];
     92         let mut returned: i32 = 0;
     93         out.reserve_exact(max_results as usize);
     94         let res = unsafe {
     95             bindings::ndb_query(
     96                 txn.as_mut_ptr(),
     97                 ndb_filters.as_mut_ptr(),
     98                 ndb_filters.len() as i32,
     99                 out.as_mut_ptr(),
    100                 max_results,
    101                 &mut returned as *mut i32,
    102             )
    103         };
    104         if res == 1 {
    105             unsafe {
    106                 out.set_len(returned as usize);
    107             };
    108             Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect())
    109         } else {
    110             Err(Error::QueryError)
    111         }
    112     }
    113 
    114     pub fn subscribe(&self, filters: Vec<Filter>) -> Result<Subscription> {
    115         unsafe {
    116             let mut ndb_filters: Vec<bindings::ndb_filter> =
    117                 filters.iter().map(|a| a.data).collect();
    118             let id = bindings::ndb_subscribe(
    119                 self.as_ptr(),
    120                 ndb_filters.as_mut_ptr(),
    121                 filters.len() as i32,
    122             );
    123             if id == 0 {
    124                 Err(Error::SubscriptionError)
    125             } else {
    126                 Ok(Subscription { filters, id })
    127             }
    128         }
    129     }
    130 
    131     pub fn poll_for_notes(&self, sub: &Subscription, max_notes: u32) -> Vec<NoteKey> {
    132         let mut vec = vec![];
    133         vec.reserve_exact(max_notes as usize);
    134         let sub_id = sub.id;
    135 
    136         unsafe {
    137             let res = bindings::ndb_poll_for_notes(
    138                 self.as_ptr(),
    139                 sub_id,
    140                 vec.as_mut_ptr(),
    141                 max_notes as c_int,
    142             );
    143             vec.set_len(res as usize);
    144         };
    145 
    146         vec.into_iter().map(NoteKey::new).collect()
    147     }
    148 
    149     pub async fn wait_for_notes(&self, sub: &Subscription, max_notes: u32) -> Result<Vec<NoteKey>> {
    150         let ndb = self.clone();
    151         let sub_id = sub.id;
    152         let handle = task::spawn_blocking(move || {
    153             let mut vec: Vec<u64> = vec![];
    154             vec.reserve_exact(max_notes as usize);
    155             let res = unsafe {
    156                 bindings::ndb_wait_for_notes(
    157                     ndb.as_ptr(),
    158                     sub_id,
    159                     vec.as_mut_ptr(),
    160                     max_notes as c_int,
    161                 )
    162             };
    163             if res == 0 {
    164                 Err(Error::SubscriptionError)
    165             } else {
    166                 unsafe {
    167                     vec.set_len(res as usize);
    168                 };
    169                 Ok(vec)
    170             }
    171         });
    172 
    173         match handle.await {
    174             Ok(Ok(res)) => Ok(res.into_iter().map(NoteKey::new).collect()),
    175             Ok(Err(err)) => Err(err),
    176             Err(_) => Err(Error::SubscriptionError),
    177         }
    178     }
    179 
    180     pub fn get_profile_by_key<'a>(
    181         &self,
    182         transaction: &'a Transaction,
    183         key: ProfileKey,
    184     ) -> Result<ProfileRecord<'a>> {
    185         let mut len: usize = 0;
    186 
    187         let profile_record_ptr = unsafe {
    188             bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len)
    189         };
    190 
    191         if profile_record_ptr.is_null() {
    192             // Handle null pointer (e.g., note not found or error occurred)
    193             return Err(Error::NotFound);
    194         }
    195 
    196         // Convert the raw pointer to a Note instance
    197         Ok(ProfileRecord::new_transactional(
    198             profile_record_ptr,
    199             len,
    200             key,
    201             transaction,
    202         ))
    203     }
    204 
    205     pub fn get_profile_by_pubkey<'a>(
    206         &self,
    207         transaction: &'a Transaction,
    208         id: &[u8; 32],
    209     ) -> Result<ProfileRecord<'a>> {
    210         let mut len: usize = 0;
    211         let mut primkey: u64 = 0;
    212 
    213         let profile_record_ptr = unsafe {
    214             bindings::ndb_get_profile_by_pubkey(
    215                 transaction.as_mut_ptr(),
    216                 id.as_ptr(),
    217                 &mut len,
    218                 &mut primkey,
    219             )
    220         };
    221 
    222         if profile_record_ptr.is_null() {
    223             // Handle null pointer (e.g., note not found or error occurred)
    224             return Err(Error::NotFound);
    225         }
    226 
    227         // Convert the raw pointer to a Note instance
    228         Ok(ProfileRecord::new_transactional(
    229             profile_record_ptr,
    230             len,
    231             ProfileKey::new(primkey),
    232             transaction,
    233         ))
    234     }
    235 
    236     pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<u64> {
    237         let res = unsafe {
    238             bindings::ndb_get_notekey_by_id(
    239                 txn.as_mut_ptr(),
    240                 id.as_ptr() as *const ::std::os::raw::c_uchar,
    241             )
    242         };
    243 
    244         if res == 0 {
    245             return Err(Error::NotFound);
    246         }
    247 
    248         Ok(res)
    249     }
    250 
    251     pub fn get_blocks_by_key<'a>(
    252         &self,
    253         txn: &'a Transaction,
    254         note_key: NoteKey,
    255     ) -> Result<Blocks<'a>> {
    256         let blocks_ptr = unsafe {
    257             bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64())
    258         };
    259 
    260         if blocks_ptr.is_null() {
    261             return Err(Error::NotFound);
    262         }
    263 
    264         Ok(Blocks::new_transactional(blocks_ptr, txn))
    265     }
    266 
    267     pub fn get_note_by_key<'a>(
    268         &self,
    269         transaction: &'a Transaction,
    270         note_key: NoteKey,
    271     ) -> Result<Note<'a>> {
    272         let mut len: usize = 0;
    273 
    274         let note_ptr = unsafe {
    275             bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len)
    276         };
    277 
    278         if note_ptr.is_null() {
    279             // Handle null pointer (e.g., note not found or error occurred)
    280             return Err(Error::NotFound);
    281         }
    282 
    283         // Convert the raw pointer to a Note instance
    284         Ok(Note::new_transactional(
    285             note_ptr,
    286             len,
    287             note_key,
    288             transaction,
    289         ))
    290     }
    291 
    292     /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id
    293     pub fn get_note_by_id<'a>(
    294         &self,
    295         transaction: &'a Transaction,
    296         id: &[u8; 32],
    297     ) -> Result<Note<'a>> {
    298         let mut len: usize = 0;
    299         let mut primkey: u64 = 0;
    300 
    301         let note_ptr = unsafe {
    302             bindings::ndb_get_note_by_id(
    303                 transaction.as_mut_ptr(),
    304                 id.as_ptr(),
    305                 &mut len,
    306                 &mut primkey,
    307             )
    308         };
    309 
    310         if note_ptr.is_null() {
    311             // Handle null pointer (e.g., note not found or error occurred)
    312             return Err(Error::NotFound);
    313         }
    314 
    315         // Convert the raw pointer to a Note instance
    316         Ok(Note::new_transactional(
    317             note_ptr,
    318             len,
    319             NoteKey::new(primkey),
    320             transaction,
    321         ))
    322     }
    323 
    324     /// Get the underlying pointer to the context in C
    325     pub fn as_ptr(&self) -> *mut bindings::ndb {
    326         self.refs.ndb
    327     }
    328 }
    329 
    330 #[cfg(test)]
    331 mod tests {
    332     use super::*;
    333     use crate::config::Config;
    334     use crate::test_util;
    335 
    336     #[test]
    337     fn ndb_init_works() {
    338         let db = "target/testdbs/init_works";
    339         test_util::cleanup_db(db);
    340 
    341         {
    342             let cfg = Config::new();
    343             let _ = Ndb::new(db, &cfg).expect("ok");
    344         }
    345     }
    346 
    347     #[tokio::test]
    348     async fn query_works() {
    349         let db = "target/testdbs/query";
    350         test_util::cleanup_db(&db);
    351 
    352         {
    353             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    354 
    355             let filter = Filter::new().kinds(vec![1]).build();
    356             let filters = vec![filter];
    357 
    358             let sub = ndb.subscribe(filters.clone()).expect("sub_id");
    359             let waiter = ndb.wait_for_notes(&sub, 1);
    360             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    361             let res = waiter.await.expect("await ok");
    362             assert_eq!(res, vec![NoteKey::new(1)]);
    363             let txn = Transaction::new(&ndb).expect("txn");
    364             let res = ndb.query(&txn, filters, 1).expect("query ok");
    365             assert_eq!(res.len(), 1);
    366             assert_eq!(
    367                 hex::encode(res[0].note.id()),
    368                 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3"
    369             );
    370         }
    371     }
    372 
    373     #[tokio::test]
    374     async fn subscribe_event_works() {
    375         let db = "target/testdbs/subscribe";
    376         test_util::cleanup_db(&db);
    377 
    378         {
    379             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    380 
    381             let filter = Filter::new().kinds(vec![1]).build();
    382 
    383             let sub = ndb.subscribe(vec![filter]).expect("sub_id");
    384             let waiter = ndb.wait_for_notes(&sub, 1);
    385             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    386             let res = waiter.await.expect("await ok");
    387             assert_eq!(res, vec![NoteKey::new(1)]);
    388         }
    389     }
    390 
    391     #[test]
    392     fn poll_note_works() {
    393         let db = "target/testdbs/poll";
    394         test_util::cleanup_db(&db);
    395 
    396         {
    397             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    398 
    399             let filter = Filter::new().kinds(vec![1]).build();
    400 
    401             let sub = ndb.subscribe(vec![filter]).expect("sub_id");
    402             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    403             // this is too fast, we should have nothing
    404             let res = ndb.poll_for_notes(&sub, 1);
    405             assert_eq!(res, vec![]);
    406 
    407             std::thread::sleep(std::time::Duration::from_millis(100));
    408             // now we should have something
    409             let res = ndb.poll_for_notes(&sub, 1);
    410             assert_eq!(res, vec![NoteKey::new(1)]);
    411         }
    412     }
    413 
    414     #[test]
    415     fn process_event_works() {
    416         let db = "target/testdbs/event_works";
    417         test_util::cleanup_db(&db);
    418 
    419         {
    420             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    421             ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    422         }
    423 
    424         {
    425             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    426             let id =
    427                 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3")
    428                     .expect("hex id");
    429             let mut txn = Transaction::new(&ndb).expect("txn");
    430             let id_bytes: [u8; 32] = id.try_into().expect("id bytes");
    431             let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note");
    432             assert_eq!(note.kind(), 1);
    433         }
    434     }
    435 }