nostrdb-rs

nostrdb in rust!
git clone git://jb55.com/nostrdb-rs
Log | Files | Refs | Submodules | README | LICENSE

ndb.rs (17116B)


      1 use std::ffi::CString;
      2 use std::ptr;
      3 
      4 use crate::{
      5     bindings, Blocks, Config, Error, Filter, Note, NoteKey, ProfileKey, ProfileRecord, QueryResult,
      6     Result, Subscription, Transaction,
      7 };
      8 use std::fs;
      9 use std::os::raw::c_int;
     10 use std::path::Path;
     11 use std::sync::Arc;
     12 use tokio::task; // Make sure to import the task module
     13 use tracing::debug;
     14 
     15 #[derive(Debug)]
     16 struct NdbRef {
     17     ndb: *mut bindings::ndb,
     18 
     19     /// Have we configured a rust closure for our callback? If so we need
     20     /// to clean that up when this is dropped
     21     has_rust_closure: bool,
     22     rust_cb_ctx: *mut ::std::os::raw::c_void,
     23 }
     24 
     25 /// SAFETY: thread safety is ensured by nostrdb
     26 unsafe impl Send for NdbRef {}
     27 
     28 /// SAFETY: thread safety is ensured by nostrdb
     29 unsafe impl Sync for NdbRef {}
     30 
     31 /// The database is automatically closed when [Ndb] is [Drop]ped.
     32 impl Drop for NdbRef {
     33     fn drop(&mut self) {
     34         unsafe {
     35             bindings::ndb_destroy(self.ndb);
     36 
     37             if self.has_rust_closure && !self.rust_cb_ctx.is_null() {
     38                 // Rebuild the Box from the raw pointer and drop it.
     39                 let _ = Box::from_raw(self.rust_cb_ctx as *mut Box<dyn FnMut()>);
     40             }
     41         }
     42     }
     43 }
     44 
     45 /// A nostrdb context. Construct one of these with [Ndb::new].
     46 #[derive(Debug, Clone)]
     47 pub struct Ndb {
     48     refs: Arc<NdbRef>,
     49 }
     50 
     51 impl Ndb {
     52     /// Construct a new nostrdb context. Takes a directory where the database
     53     /// is/will be located and a nostrdb config.
     54     pub fn new(db_dir: &str, config: &Config) -> Result<Self> {
     55         let db_dir_cstr = match CString::new(db_dir) {
     56             Ok(cstr) => cstr,
     57             Err(_) => return Err(Error::DbOpenFailed),
     58         };
     59         let mut ndb: *mut bindings::ndb = ptr::null_mut();
     60 
     61         let path = Path::new(db_dir);
     62         if !path.exists() {
     63             let _ = fs::create_dir_all(path);
     64         }
     65 
     66         let min_mapsize = 1024 * 1024 * 512;
     67         let mut mapsize = config.config.mapsize;
     68         let mut config = *config;
     69 
     70         let result = loop {
     71             let result =
     72                 unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) };
     73 
     74             if result == 0 {
     75                 mapsize /= 2;
     76                 config = config.set_mapsize(mapsize);
     77                 debug!("ndb init failed, reducing mapsize to {}", mapsize);
     78 
     79                 if mapsize > min_mapsize {
     80                     continue;
     81                 } else {
     82                     break 0;
     83                 }
     84             } else {
     85                 break result;
     86             }
     87         };
     88 
     89         if result == 0 {
     90             return Err(Error::DbOpenFailed);
     91         }
     92 
     93         let has_rust_closure = !config.config.sub_cb_ctx.is_null();
     94         let rust_cb_ctx = config.config.sub_cb_ctx;
     95         let refs = Arc::new(NdbRef {
     96             ndb,
     97             has_rust_closure,
     98             rust_cb_ctx,
     99         });
    100 
    101         Ok(Ndb { refs })
    102     }
    103 
    104     /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]`
    105     /// This function returns immediately and doesn't provide any information on
    106     /// if ingestion was successful or not.
    107     pub fn process_event(&self, json: &str) -> Result<()> {
    108         // Convert the Rust string to a C-style string
    109         let c_json = CString::new(json).expect("CString::new failed");
    110         let c_json_ptr = c_json.as_ptr();
    111 
    112         // Get the length of the string
    113         let len = json.len() as libc::c_int;
    114 
    115         let res = unsafe { bindings::ndb_process_event(self.as_ptr(), c_json_ptr, len) };
    116 
    117         if res == 0 {
    118             return Err(Error::NoteProcessFailed);
    119         }
    120 
    121         Ok(())
    122     }
    123 
    124     pub fn query<'a>(
    125         &self,
    126         txn: &'a Transaction,
    127         filters: &[Filter],
    128         max_results: i32,
    129     ) -> Result<Vec<QueryResult<'a>>> {
    130         let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect();
    131         let mut out: Vec<bindings::ndb_query_result> = vec![];
    132         let mut returned: i32 = 0;
    133         out.reserve_exact(max_results as usize);
    134         let res = unsafe {
    135             bindings::ndb_query(
    136                 txn.as_mut_ptr(),
    137                 ndb_filters.as_mut_ptr(),
    138                 ndb_filters.len() as i32,
    139                 out.as_mut_ptr(),
    140                 max_results,
    141                 &mut returned as *mut i32,
    142             )
    143         };
    144         if res == 1 {
    145             unsafe {
    146                 out.set_len(returned as usize);
    147             };
    148             Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect())
    149         } else {
    150             Err(Error::QueryError)
    151         }
    152     }
    153 
    154     pub fn subscription_count(&self) -> u32 {
    155         unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 }
    156     }
    157 
    158     pub fn unsubscribe(&self, sub: Subscription) -> Result<()> {
    159         let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub.id()) };
    160 
    161         if r == 0 {
    162             Err(Error::SubscriptionError)
    163         } else {
    164             Ok(())
    165         }
    166     }
    167 
    168     pub fn subscribe(&self, filters: &[Filter]) -> Result<Subscription> {
    169         unsafe {
    170             let mut ndb_filters: Vec<bindings::ndb_filter> =
    171                 filters.iter().map(|a| a.data).collect();
    172             let id = bindings::ndb_subscribe(
    173                 self.as_ptr(),
    174                 ndb_filters.as_mut_ptr(),
    175                 filters.len() as i32,
    176             );
    177             if id == 0 {
    178                 Err(Error::SubscriptionError)
    179             } else {
    180                 Ok(Subscription::new(id))
    181             }
    182         }
    183     }
    184 
    185     pub fn poll_for_notes(&self, sub: Subscription, max_notes: u32) -> Vec<NoteKey> {
    186         let mut vec = vec![];
    187         vec.reserve_exact(max_notes as usize);
    188 
    189         unsafe {
    190             let res = bindings::ndb_poll_for_notes(
    191                 self.as_ptr(),
    192                 sub.id(),
    193                 vec.as_mut_ptr(),
    194                 max_notes as c_int,
    195             );
    196             vec.set_len(res as usize);
    197         };
    198 
    199         vec.into_iter().map(NoteKey::new).collect()
    200     }
    201 
    202     pub async fn wait_for_notes(
    203         &self,
    204         sub_id: Subscription,
    205         max_notes: u32,
    206     ) -> Result<Vec<NoteKey>> {
    207         let ndb = self.clone();
    208         let handle = task::spawn_blocking(move || {
    209             let mut vec: Vec<u64> = vec![];
    210             vec.reserve_exact(max_notes as usize);
    211             let res = unsafe {
    212                 bindings::ndb_wait_for_notes(
    213                     ndb.as_ptr(),
    214                     sub_id.id(),
    215                     vec.as_mut_ptr(),
    216                     max_notes as c_int,
    217                 )
    218             };
    219             if res == 0 {
    220                 Err(Error::SubscriptionError)
    221             } else {
    222                 unsafe {
    223                     vec.set_len(res as usize);
    224                 };
    225                 Ok(vec)
    226             }
    227         });
    228 
    229         match handle.await {
    230             Ok(Ok(res)) => Ok(res.into_iter().map(NoteKey::new).collect()),
    231             Ok(Err(err)) => Err(err),
    232             Err(_) => Err(Error::SubscriptionError),
    233         }
    234     }
    235 
    236     pub fn get_profile_by_key<'a>(
    237         &self,
    238         transaction: &'a Transaction,
    239         key: ProfileKey,
    240     ) -> Result<ProfileRecord<'a>> {
    241         let mut len: usize = 0;
    242 
    243         let profile_record_ptr = unsafe {
    244             bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len)
    245         };
    246 
    247         if profile_record_ptr.is_null() {
    248             // Handle null pointer (e.g., note not found or error occurred)
    249             return Err(Error::NotFound);
    250         }
    251 
    252         // Convert the raw pointer to a Note instance
    253         Ok(ProfileRecord::new_transactional(
    254             profile_record_ptr,
    255             len,
    256             key,
    257             transaction,
    258         ))
    259     }
    260 
    261     pub fn get_profile_by_pubkey<'a>(
    262         &self,
    263         transaction: &'a Transaction,
    264         id: &[u8; 32],
    265     ) -> Result<ProfileRecord<'a>> {
    266         let mut len: usize = 0;
    267         let mut primkey: u64 = 0;
    268 
    269         let profile_record_ptr = unsafe {
    270             bindings::ndb_get_profile_by_pubkey(
    271                 transaction.as_mut_ptr(),
    272                 id.as_ptr(),
    273                 &mut len,
    274                 &mut primkey,
    275             )
    276         };
    277 
    278         if profile_record_ptr.is_null() {
    279             // Handle null pointer (e.g., note not found or error occurred)
    280             return Err(Error::NotFound);
    281         }
    282 
    283         // Convert the raw pointer to a Note instance
    284         Ok(ProfileRecord::new_transactional(
    285             profile_record_ptr,
    286             len,
    287             ProfileKey::new(primkey),
    288             transaction,
    289         ))
    290     }
    291 
    292     pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<u64> {
    293         let res = unsafe {
    294             bindings::ndb_get_notekey_by_id(
    295                 txn.as_mut_ptr(),
    296                 id.as_ptr() as *const ::std::os::raw::c_uchar,
    297             )
    298         };
    299 
    300         if res == 0 {
    301             return Err(Error::NotFound);
    302         }
    303 
    304         Ok(res)
    305     }
    306 
    307     pub fn get_blocks_by_key<'a>(
    308         &self,
    309         txn: &'a Transaction,
    310         note_key: NoteKey,
    311     ) -> Result<Blocks<'a>> {
    312         let blocks_ptr = unsafe {
    313             bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64())
    314         };
    315 
    316         if blocks_ptr.is_null() {
    317             return Err(Error::NotFound);
    318         }
    319 
    320         Ok(Blocks::new_transactional(blocks_ptr, txn))
    321     }
    322 
    323     pub fn get_note_by_key<'a>(
    324         &self,
    325         transaction: &'a Transaction,
    326         note_key: NoteKey,
    327     ) -> Result<Note<'a>> {
    328         let mut len: usize = 0;
    329 
    330         let note_ptr = unsafe {
    331             bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len)
    332         };
    333 
    334         if note_ptr.is_null() {
    335             // Handle null pointer (e.g., note not found or error occurred)
    336             return Err(Error::NotFound);
    337         }
    338 
    339         // Convert the raw pointer to a Note instance
    340         Ok(Note::new_transactional(
    341             note_ptr,
    342             len,
    343             note_key,
    344             transaction,
    345         ))
    346     }
    347 
    348     /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id
    349     pub fn get_note_by_id<'a>(
    350         &self,
    351         transaction: &'a Transaction,
    352         id: &[u8; 32],
    353     ) -> Result<Note<'a>> {
    354         let mut len: usize = 0;
    355         let mut primkey: u64 = 0;
    356 
    357         let note_ptr = unsafe {
    358             bindings::ndb_get_note_by_id(
    359                 transaction.as_mut_ptr(),
    360                 id.as_ptr(),
    361                 &mut len,
    362                 &mut primkey,
    363             )
    364         };
    365 
    366         if note_ptr.is_null() {
    367             // Handle null pointer (e.g., note not found or error occurred)
    368             return Err(Error::NotFound);
    369         }
    370 
    371         // Convert the raw pointer to a Note instance
    372         Ok(Note::new_transactional(
    373             note_ptr,
    374             len,
    375             NoteKey::new(primkey),
    376             transaction,
    377         ))
    378     }
    379 
    380     /// Get the underlying pointer to the context in C
    381     pub fn as_ptr(&self) -> *mut bindings::ndb {
    382         self.refs.ndb
    383     }
    384 }
    385 
    386 #[cfg(test)]
    387 mod tests {
    388     use super::*;
    389     use crate::config::Config;
    390     use crate::test_util;
    391 
    392     #[test]
    393     fn ndb_init_works() {
    394         let db = "target/testdbs/init_works";
    395         test_util::cleanup_db(db);
    396 
    397         {
    398             let cfg = Config::new();
    399             let _ = Ndb::new(db, &cfg).expect("ok");
    400         }
    401     }
    402 
    403     #[tokio::test]
    404     async fn query_works() {
    405         let db = "target/testdbs/query";
    406         test_util::cleanup_db(&db);
    407 
    408         {
    409             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    410 
    411             let filter = Filter::new().kinds(vec![1]).build();
    412             let filters = vec![filter];
    413 
    414             let sub = ndb.subscribe(&filters).expect("sub_id");
    415             let waiter = ndb.wait_for_notes(sub, 1);
    416             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    417             let res = waiter.await.expect("await ok");
    418             assert_eq!(res, vec![NoteKey::new(1)]);
    419             let txn = Transaction::new(&ndb).expect("txn");
    420             let res = ndb.query(&txn, &filters, 1).expect("query ok");
    421             assert_eq!(res.len(), 1);
    422             assert_eq!(
    423                 hex::encode(res[0].note.id()),
    424                 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3"
    425             );
    426         }
    427     }
    428 
    429     #[tokio::test]
    430     async fn subscribe_event_works() {
    431         let db = "target/testdbs/subscribe";
    432         test_util::cleanup_db(&db);
    433 
    434         {
    435             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    436 
    437             let filter = Filter::new().kinds(vec![1]).build();
    438 
    439             let sub = ndb.subscribe(&[filter]).expect("sub_id");
    440             let waiter = ndb.wait_for_notes(sub, 1);
    441             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    442             let res = waiter.await.expect("await ok");
    443             assert_eq!(res, vec![NoteKey::new(1)]);
    444         }
    445     }
    446 
    447     #[test]
    448     fn poll_note_works() {
    449         let db = "target/testdbs/poll";
    450         test_util::cleanup_db(&db);
    451 
    452         {
    453             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    454 
    455             let filter = Filter::new().kinds(vec![1]).build();
    456 
    457             let sub = ndb.subscribe(&[filter]).expect("sub_id");
    458             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    459             // this is too fast, we should have nothing
    460             let res = ndb.poll_for_notes(sub, 1);
    461             assert_eq!(res, vec![]);
    462 
    463             std::thread::sleep(std::time::Duration::from_millis(100));
    464             // now we should have something
    465             let res = ndb.poll_for_notes(sub, 1);
    466             assert_eq!(res, vec![NoteKey::new(1)]);
    467         }
    468     }
    469 
    470     #[test]
    471     fn process_event_works() {
    472         let db = "target/testdbs/event_works";
    473         test_util::cleanup_db(&db);
    474 
    475         {
    476             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    477             ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    478         }
    479 
    480         {
    481             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    482             let id =
    483                 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3")
    484                     .expect("hex id");
    485             let mut txn = Transaction::new(&ndb).expect("txn");
    486             let id_bytes: [u8; 32] = id.try_into().expect("id bytes");
    487             let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note");
    488             assert_eq!(note.kind(), 1);
    489         }
    490     }
    491 
    492     #[test]
    493     #[cfg(target_os = "windows")]
    494     fn test_windows_large_mapsize() {
    495         use std::{fs, path::Path};
    496 
    497         let db = "target/testdbs/windows_large_mapsize";
    498         test_util::cleanup_db(&db);
    499 
    500         {
    501             // 32 TiB should be way too big for CI
    502             let config =
    503                 Config::new().set_mapsize(1024usize * 1024usize * 1024usize * 1024usize * 32usize);
    504 
    505             // in this case, nostrdb should try to keep resizing to
    506             // smaller mapsizes until success
    507 
    508             let ndb = Ndb::new(db, &config);
    509 
    510             assert!(ndb.is_ok());
    511         }
    512 
    513         let file_len = fs::metadata(Path::new(db).join("data.mdb"))
    514             .expect("metadata")
    515             .len();
    516 
    517         assert!(file_len > 0);
    518 
    519         if cfg!(target_os = "windows") {
    520             // on windows the default mapsize will be 1MB when we fail
    521             // to open it
    522             assert_ne!(file_len, 1048576);
    523         } else {
    524             assert!(file_len < 1024u64 * 1024u64);
    525         }
    526 
    527         // we should definitely clean this up... especially on windows
    528         test_util::cleanup_db(&db);
    529     }
    530 }