nostrdb-rs

nostrdb in rust!
git clone git://jb55.com/nostrdb-rs
Log | Files | Refs | Submodules | README | LICENSE

ndb.rs (14726B)


      1 use std::ffi::CString;
      2 use std::ptr;
      3 
      4 use crate::{
      5     bindings, Blocks, Config, Error, Filter, Note, NoteKey, ProfileKey, ProfileRecord, QueryResult,
      6     Result, Subscription, Transaction,
      7 };
      8 use std::fs;
      9 use std::os::raw::c_int;
     10 use std::path::Path;
     11 use std::sync::Arc;
     12 use tokio::task; // Make sure to import the task module
     13 
     14 #[derive(Debug)]
     15 struct NdbRef {
     16     ndb: *mut bindings::ndb,
     17 }
     18 
     19 /// It's safe to have multi-threaded references to this because thread safety
     20 /// is guaranteed by LMDB
     21 unsafe impl Send for NdbRef {}
     22 unsafe impl Sync for NdbRef {}
     23 
     24 /// The database is automatically closed when [Ndb] is [Drop]ped.
     25 impl Drop for NdbRef {
     26     fn drop(&mut self) {
     27         unsafe {
     28             bindings::ndb_destroy(self.ndb);
     29         }
     30     }
     31 }
     32 
     33 /// A nostrdb context. Construct one of these with [Ndb::new].
     34 #[derive(Debug, Clone)]
     35 pub struct Ndb {
     36     refs: Arc<NdbRef>,
     37 }
     38 
     39 impl Ndb {
     40     /// Construct a new nostrdb context. Takes a directory where the database
     41     /// is/will be located and a nostrdb config.
     42     pub fn new(db_dir: &str, config: &Config) -> Result<Self> {
     43         let db_dir_cstr = match CString::new(db_dir) {
     44             Ok(cstr) => cstr,
     45             Err(_) => return Err(Error::DbOpenFailed),
     46         };
     47         let mut ndb: *mut bindings::ndb = ptr::null_mut();
     48 
     49         let path = Path::new(db_dir);
     50         if !path.exists() {
     51             let _ = fs::create_dir_all(path);
     52         }
     53 
     54         let result = unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) };
     55 
     56         if result == 0 {
     57             return Err(Error::DbOpenFailed);
     58         }
     59 
     60         let refs = Arc::new(NdbRef { ndb });
     61         Ok(Ndb { refs })
     62     }
     63 
     64     /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]`
     65     /// This function returns immediately and doesn't provide any information on
     66     /// if ingestion was successful or not.
     67     pub fn process_event(&self, json: &str) -> Result<()> {
     68         // Convert the Rust string to a C-style string
     69         let c_json = CString::new(json).expect("CString::new failed");
     70         let c_json_ptr = c_json.as_ptr();
     71 
     72         // Get the length of the string
     73         let len = json.len() as libc::c_int;
     74 
     75         let res = unsafe { bindings::ndb_process_event(self.as_ptr(), c_json_ptr, len) };
     76 
     77         if res == 0 {
     78             return Err(Error::NoteProcessFailed);
     79         }
     80 
     81         Ok(())
     82     }
     83 
     84     pub fn query<'a>(
     85         &self,
     86         txn: &'a Transaction,
     87         filters: Vec<Filter>,
     88         max_results: i32,
     89     ) -> Result<Vec<QueryResult<'a>>> {
     90         let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect();
     91         let mut out: Vec<bindings::ndb_query_result> = vec![];
     92         let mut returned: i32 = 0;
     93         out.reserve_exact(max_results as usize);
     94         let res = unsafe {
     95             bindings::ndb_query(
     96                 txn.as_mut_ptr(),
     97                 ndb_filters.as_mut_ptr(),
     98                 ndb_filters.len() as i32,
     99                 out.as_mut_ptr(),
    100                 max_results,
    101                 &mut returned as *mut i32,
    102             )
    103         };
    104         if res == 1 {
    105             unsafe {
    106                 out.set_len(returned as usize);
    107             };
    108             Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect())
    109         } else {
    110             Err(Error::QueryError)
    111         }
    112     }
    113 
    114     pub fn subscription_count(&self) -> u32 {
    115         unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 }
    116     }
    117 
    118     pub fn unsubscribe(&self, sub_id: u64) -> Result<()> {
    119         let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub_id) };
    120 
    121         if r == 0 {
    122             Err(Error::SubscriptionError)
    123         } else {
    124             Ok(())
    125         }
    126     }
    127 
    128     pub fn subscribe(&self, filters: Vec<Filter>) -> Result<Subscription> {
    129         unsafe {
    130             let mut ndb_filters: Vec<bindings::ndb_filter> =
    131                 filters.iter().map(|a| a.data).collect();
    132             let id = bindings::ndb_subscribe(
    133                 self.as_ptr(),
    134                 ndb_filters.as_mut_ptr(),
    135                 filters.len() as i32,
    136             );
    137             if id == 0 {
    138                 Err(Error::SubscriptionError)
    139             } else {
    140                 Ok(Subscription { filters, id })
    141             }
    142         }
    143     }
    144 
    145     pub fn poll_for_notes(&self, sub_id: u64, max_notes: u32) -> Vec<NoteKey> {
    146         let mut vec = vec![];
    147         vec.reserve_exact(max_notes as usize);
    148 
    149         unsafe {
    150             let res = bindings::ndb_poll_for_notes(
    151                 self.as_ptr(),
    152                 sub_id,
    153                 vec.as_mut_ptr(),
    154                 max_notes as c_int,
    155             );
    156             vec.set_len(res as usize);
    157         };
    158 
    159         vec.into_iter().map(NoteKey::new).collect()
    160     }
    161 
    162     pub async fn wait_for_notes(&self, sub_id: u64, max_notes: u32) -> Result<Vec<NoteKey>> {
    163         let ndb = self.clone();
    164         let handle = task::spawn_blocking(move || {
    165             let mut vec: Vec<u64> = vec![];
    166             vec.reserve_exact(max_notes as usize);
    167             let res = unsafe {
    168                 bindings::ndb_wait_for_notes(
    169                     ndb.as_ptr(),
    170                     sub_id,
    171                     vec.as_mut_ptr(),
    172                     max_notes as c_int,
    173                 )
    174             };
    175             if res == 0 {
    176                 Err(Error::SubscriptionError)
    177             } else {
    178                 unsafe {
    179                     vec.set_len(res as usize);
    180                 };
    181                 Ok(vec)
    182             }
    183         });
    184 
    185         match handle.await {
    186             Ok(Ok(res)) => Ok(res.into_iter().map(NoteKey::new).collect()),
    187             Ok(Err(err)) => Err(err),
    188             Err(_) => Err(Error::SubscriptionError),
    189         }
    190     }
    191 
    192     pub fn get_profile_by_key<'a>(
    193         &self,
    194         transaction: &'a Transaction,
    195         key: ProfileKey,
    196     ) -> Result<ProfileRecord<'a>> {
    197         let mut len: usize = 0;
    198 
    199         let profile_record_ptr = unsafe {
    200             bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len)
    201         };
    202 
    203         if profile_record_ptr.is_null() {
    204             // Handle null pointer (e.g., note not found or error occurred)
    205             return Err(Error::NotFound);
    206         }
    207 
    208         // Convert the raw pointer to a Note instance
    209         Ok(ProfileRecord::new_transactional(
    210             profile_record_ptr,
    211             len,
    212             key,
    213             transaction,
    214         ))
    215     }
    216 
    217     pub fn get_profile_by_pubkey<'a>(
    218         &self,
    219         transaction: &'a Transaction,
    220         id: &[u8; 32],
    221     ) -> Result<ProfileRecord<'a>> {
    222         let mut len: usize = 0;
    223         let mut primkey: u64 = 0;
    224 
    225         let profile_record_ptr = unsafe {
    226             bindings::ndb_get_profile_by_pubkey(
    227                 transaction.as_mut_ptr(),
    228                 id.as_ptr(),
    229                 &mut len,
    230                 &mut primkey,
    231             )
    232         };
    233 
    234         if profile_record_ptr.is_null() {
    235             // Handle null pointer (e.g., note not found or error occurred)
    236             return Err(Error::NotFound);
    237         }
    238 
    239         // Convert the raw pointer to a Note instance
    240         Ok(ProfileRecord::new_transactional(
    241             profile_record_ptr,
    242             len,
    243             ProfileKey::new(primkey),
    244             transaction,
    245         ))
    246     }
    247 
    248     pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<u64> {
    249         let res = unsafe {
    250             bindings::ndb_get_notekey_by_id(
    251                 txn.as_mut_ptr(),
    252                 id.as_ptr() as *const ::std::os::raw::c_uchar,
    253             )
    254         };
    255 
    256         if res == 0 {
    257             return Err(Error::NotFound);
    258         }
    259 
    260         Ok(res)
    261     }
    262 
    263     pub fn get_blocks_by_key<'a>(
    264         &self,
    265         txn: &'a Transaction,
    266         note_key: NoteKey,
    267     ) -> Result<Blocks<'a>> {
    268         let blocks_ptr = unsafe {
    269             bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64())
    270         };
    271 
    272         if blocks_ptr.is_null() {
    273             return Err(Error::NotFound);
    274         }
    275 
    276         Ok(Blocks::new_transactional(blocks_ptr, txn))
    277     }
    278 
    279     pub fn get_note_by_key<'a>(
    280         &self,
    281         transaction: &'a Transaction,
    282         note_key: NoteKey,
    283     ) -> Result<Note<'a>> {
    284         let mut len: usize = 0;
    285 
    286         let note_ptr = unsafe {
    287             bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len)
    288         };
    289 
    290         if note_ptr.is_null() {
    291             // Handle null pointer (e.g., note not found or error occurred)
    292             return Err(Error::NotFound);
    293         }
    294 
    295         // Convert the raw pointer to a Note instance
    296         Ok(Note::new_transactional(
    297             note_ptr,
    298             len,
    299             note_key,
    300             transaction,
    301         ))
    302     }
    303 
    304     /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id
    305     pub fn get_note_by_id<'a>(
    306         &self,
    307         transaction: &'a Transaction,
    308         id: &[u8; 32],
    309     ) -> Result<Note<'a>> {
    310         let mut len: usize = 0;
    311         let mut primkey: u64 = 0;
    312 
    313         let note_ptr = unsafe {
    314             bindings::ndb_get_note_by_id(
    315                 transaction.as_mut_ptr(),
    316                 id.as_ptr(),
    317                 &mut len,
    318                 &mut primkey,
    319             )
    320         };
    321 
    322         if note_ptr.is_null() {
    323             // Handle null pointer (e.g., note not found or error occurred)
    324             return Err(Error::NotFound);
    325         }
    326 
    327         // Convert the raw pointer to a Note instance
    328         Ok(Note::new_transactional(
    329             note_ptr,
    330             len,
    331             NoteKey::new(primkey),
    332             transaction,
    333         ))
    334     }
    335 
    336     /// Get the underlying pointer to the context in C
    337     pub fn as_ptr(&self) -> *mut bindings::ndb {
    338         self.refs.ndb
    339     }
    340 }
    341 
    342 #[cfg(test)]
    343 mod tests {
    344     use super::*;
    345     use crate::config::Config;
    346     use crate::test_util;
    347 
    348     #[test]
    349     fn ndb_init_works() {
    350         let db = "target/testdbs/init_works";
    351         test_util::cleanup_db(db);
    352 
    353         {
    354             let cfg = Config::new();
    355             let _ = Ndb::new(db, &cfg).expect("ok");
    356         }
    357     }
    358 
    359     #[tokio::test]
    360     async fn query_works() {
    361         let db = "target/testdbs/query";
    362         test_util::cleanup_db(&db);
    363 
    364         {
    365             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    366 
    367             let filter = Filter::new().kinds(vec![1]).build();
    368             let filters = vec![filter];
    369 
    370             let sub = ndb.subscribe(filters.clone()).expect("sub_id");
    371             let waiter = ndb.wait_for_notes(sub.id, 1);
    372             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    373             let res = waiter.await.expect("await ok");
    374             assert_eq!(res, vec![NoteKey::new(1)]);
    375             let txn = Transaction::new(&ndb).expect("txn");
    376             let res = ndb.query(&txn, filters, 1).expect("query ok");
    377             assert_eq!(res.len(), 1);
    378             assert_eq!(
    379                 hex::encode(res[0].note.id()),
    380                 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3"
    381             );
    382         }
    383     }
    384 
    385     #[tokio::test]
    386     async fn subscribe_event_works() {
    387         let db = "target/testdbs/subscribe";
    388         test_util::cleanup_db(&db);
    389 
    390         {
    391             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    392 
    393             let filter = Filter::new().kinds(vec![1]).build();
    394 
    395             let sub = ndb.subscribe(vec![filter]).expect("sub_id");
    396             let waiter = ndb.wait_for_notes(sub.id, 1);
    397             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    398             let res = waiter.await.expect("await ok");
    399             assert_eq!(res, vec![NoteKey::new(1)]);
    400         }
    401     }
    402 
    403     #[test]
    404     fn poll_note_works() {
    405         let db = "target/testdbs/poll";
    406         test_util::cleanup_db(&db);
    407 
    408         {
    409             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    410 
    411             let filter = Filter::new().kinds(vec![1]).build();
    412 
    413             let sub = ndb.subscribe(vec![filter]).expect("sub_id");
    414             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    415             // this is too fast, we should have nothing
    416             let res = ndb.poll_for_notes(sub.id, 1);
    417             assert_eq!(res, vec![]);
    418 
    419             std::thread::sleep(std::time::Duration::from_millis(100));
    420             // now we should have something
    421             let res = ndb.poll_for_notes(sub.id, 1);
    422             assert_eq!(res, vec![NoteKey::new(1)]);
    423         }
    424     }
    425 
    426     #[test]
    427     fn process_event_works() {
    428         let db = "target/testdbs/event_works";
    429         test_util::cleanup_db(&db);
    430 
    431         {
    432             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    433             ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    434         }
    435 
    436         {
    437             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    438             let id =
    439                 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3")
    440                     .expect("hex id");
    441             let mut txn = Transaction::new(&ndb).expect("txn");
    442             let id_bytes: [u8; 32] = id.try_into().expect("id bytes");
    443             let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note");
    444             assert_eq!(note.kind(), 1);
    445         }
    446     }
    447 }