nostrdb-rs

nostrdb in rust!
git clone git://jb55.com/nostrdb-rs
Log | Files | Refs | Submodules | README | LICENSE

ndb.rs (38487B)


      1 use std::ffi::CString;
      2 use std::ptr;
      3 
      4 use crate::bindings::ndb_search;
      5 use crate::{
      6     bindings, Blocks, Config, Error, Filter, IngestMetadata, Note, NoteKey, NoteMetadata,
      7     ProfileKey, ProfileRecord, QueryResult, Result, Subscription, SubscriptionState,
      8     SubscriptionStream, Transaction,
      9 };
     10 use futures::StreamExt;
     11 use std::collections::hash_map::Entry;
     12 use std::collections::HashMap;
     13 use std::fs;
     14 use std::os::raw::c_int;
     15 use std::path::Path;
     16 use std::sync::{Arc, Mutex};
     17 use tracing::debug;
     18 
     19 #[derive(Debug)]
     20 struct NdbRef {
     21     ndb: *mut bindings::ndb,
     22     rust_cb_ctx: *mut ::std::os::raw::c_void,
     23 }
     24 
     25 /// SAFETY: thread safety is ensured by nostrdb
     26 unsafe impl Send for NdbRef {}
     27 
     28 /// SAFETY: thread safety is ensured by nostrdb
     29 unsafe impl Sync for NdbRef {}
     30 
     31 /// The database is automatically closed when [Ndb] is [Drop]ped.
     32 impl Drop for NdbRef {
     33     fn drop(&mut self) {
     34         unsafe {
     35             bindings::ndb_destroy(self.ndb);
     36 
     37             if !self.rust_cb_ctx.is_null() {
     38                 // Rebuild the Box from the raw pointer and drop it.
     39                 let _ = Box::from_raw(self.rust_cb_ctx as *mut Box<dyn FnMut()>);
     40             }
     41         }
     42     }
     43 }
     44 
     45 type SubMap = HashMap<Subscription, SubscriptionState>;
     46 
     47 /// A nostrdb context. Construct one of these with [Ndb::new].
     48 #[derive(Debug, Clone)]
     49 pub struct Ndb {
     50     refs: Arc<NdbRef>,
     51 
     52     /// Track query future states
     53     pub(crate) subs: Arc<Mutex<SubMap>>,
     54 }
     55 
     56 impl Ndb {
     57     /// Construct a new nostrdb context. Takes a directory where the database
     58     /// is/will be located and a nostrdb config.
     59     pub fn new(db_dir: &str, config: &Config) -> Result<Self> {
     60         let db_dir_cstr = match CString::new(db_dir) {
     61             Ok(cstr) => cstr,
     62             Err(_) => return Err(Error::DbOpenFailed),
     63         };
     64         let mut ndb: *mut bindings::ndb = ptr::null_mut();
     65 
     66         let path = Path::new(db_dir);
     67         if !path.exists() {
     68             let _ = fs::create_dir_all(path);
     69         }
     70 
     71         let min_mapsize = 1024 * 1024 * 512;
     72         let mut mapsize = config.config.mapsize;
     73         let config = *config;
     74 
     75         let prev_callback = config.config.sub_cb;
     76         let prev_callback_ctx = config.config.sub_cb_ctx;
     77         let subs = Arc::new(Mutex::new(SubMap::default()));
     78         let subs_clone = subs.clone();
     79 
     80         // We need to register our own callback so that we can wake
     81         // query futures
     82         let mut config = config.set_sub_callback(move |sub_id: u64| {
     83             let mut map = subs_clone.lock().unwrap();
     84             if let Some(s) = map.get_mut(&Subscription::new(sub_id)) {
     85                 if let Some(w) = s.waker.take() {
     86                     w.wake();
     87                 }
     88             }
     89 
     90             if let Some(pcb) = prev_callback {
     91                 unsafe {
     92                     pcb(prev_callback_ctx, sub_id);
     93                 };
     94             }
     95         });
     96 
     97         let result = loop {
     98             let result =
     99                 unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) };
    100 
    101             if result == 0 {
    102                 mapsize /= 2;
    103                 config = config.set_mapsize(mapsize);
    104                 debug!("ndb init failed, reducing mapsize to {}", mapsize);
    105 
    106                 if mapsize > min_mapsize {
    107                     continue;
    108                 } else {
    109                     break 0;
    110                 }
    111             } else {
    112                 break result;
    113             }
    114         };
    115 
    116         if result == 0 {
    117             return Err(Error::DbOpenFailed);
    118         }
    119 
    120         let rust_cb_ctx = config.config.sub_cb_ctx;
    121         let refs = Arc::new(NdbRef { ndb, rust_cb_ctx });
    122 
    123         Ok(Ndb { refs, subs })
    124     }
    125 
    126     /// Ingest a relay or client sent event, with optional relay metadata.
    127     /// This function returns immediately and doesn't provide any information on
    128     /// if ingestion was successful or not.
    129     pub fn process_event_with(&self, json: &str, mut meta: IngestMetadata) -> Result<()> {
    130         // Convert the Rust string to a C-style string
    131         let c_json = CString::new(json)?;
    132         let c_json_ptr = c_json.as_ptr();
    133 
    134         // Get the length of the string
    135         let len = json.len() as libc::c_int;
    136 
    137         let res = unsafe {
    138             bindings::ndb_process_event_with(self.as_ptr(), c_json_ptr, len, meta.as_mut_ptr())
    139         };
    140 
    141         if res == 0 {
    142             return Err(Error::NoteProcessFailed);
    143         }
    144 
    145         Ok(())
    146     }
    147 
    148     /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]`
    149     /// This function returns immediately and doesn't provide any information on
    150     /// if ingestion was successful or not.
    151     pub fn process_event(&self, json: &str) -> Result<()> {
    152         self.process_event_with(json, IngestMetadata::new().client(false))
    153     }
    154 
    155     /// Ingest a client-sent event in the form `["EVENT", {"id:"...}]`
    156     /// This function returns immediately and doesn't provide any information on
    157     /// if ingestion was successful or not.
    158     pub fn process_client_event(&self, json: &str) -> Result<()> {
    159         self.process_event_with(json, IngestMetadata::new().client(true))
    160     }
    161 
    162     pub fn query<'a>(
    163         &self,
    164         txn: &'a Transaction,
    165         filters: &[Filter],
    166         max_results: i32,
    167     ) -> Result<Vec<QueryResult<'a>>> {
    168         let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect();
    169         let mut out: Vec<bindings::ndb_query_result> = vec![];
    170         let mut returned: i32 = 0;
    171         out.reserve_exact(max_results as usize);
    172         let res = unsafe {
    173             bindings::ndb_query(
    174                 txn.as_mut_ptr(),
    175                 ndb_filters.as_mut_ptr(),
    176                 ndb_filters.len() as i32,
    177                 out.as_mut_ptr(),
    178                 max_results,
    179                 &mut returned as *mut i32,
    180             )
    181         };
    182         if res == 1 {
    183             unsafe {
    184                 out.set_len(returned as usize);
    185             };
    186             Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect())
    187         } else {
    188             Err(Error::QueryError)
    189         }
    190     }
    191 
    192     pub fn subscription_count(&self) -> u32 {
    193         unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 }
    194     }
    195 
    196     pub fn unsubscribe(&mut self, sub: Subscription) -> Result<()> {
    197         let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub.id()) };
    198 
    199         debug!(
    200             "unsubscribed from {}, sub count {}",
    201             sub.id(),
    202             self.subscription_count()
    203         );
    204 
    205         // mark the subscription as done if it exists in our stream map
    206         {
    207             let mut map = self.subs.lock().unwrap();
    208             if let Entry::Occupied(mut entry) = map.entry(sub) {
    209                 entry.get_mut().done = true;
    210             }
    211         }
    212 
    213         if r == 0 {
    214             Err(Error::SubscriptionError)
    215         } else {
    216             Ok(())
    217         }
    218     }
    219 
    220     pub fn subscribe(&self, filters: &[Filter]) -> Result<Subscription> {
    221         unsafe {
    222             let mut ndb_filters: Vec<bindings::ndb_filter> =
    223                 filters.iter().map(|a| a.data).collect();
    224             let id = bindings::ndb_subscribe(
    225                 self.as_ptr(),
    226                 ndb_filters.as_mut_ptr(),
    227                 filters.len() as i32,
    228             );
    229             if id == 0 {
    230                 Err(Error::SubscriptionError)
    231             } else {
    232                 Ok(Subscription::new(id))
    233             }
    234         }
    235     }
    236 
    237     pub fn poll_for_notes(&self, sub: Subscription, max_notes: u32) -> Vec<NoteKey> {
    238         let mut vec = vec![];
    239         vec.reserve_exact(max_notes as usize);
    240 
    241         unsafe {
    242             let res = bindings::ndb_poll_for_notes(
    243                 self.as_ptr(),
    244                 sub.id(),
    245                 vec.as_mut_ptr(),
    246                 max_notes as c_int,
    247             );
    248             vec.set_len(res as usize);
    249         };
    250 
    251         vec.into_iter().map(NoteKey::new).collect()
    252     }
    253 
    254     pub async fn wait_for_notes(
    255         &self,
    256         sub_id: Subscription,
    257         max_notes: u32,
    258     ) -> Result<Vec<NoteKey>> {
    259         let mut stream = SubscriptionStream::new(self.clone(), sub_id).notes_per_await(max_notes);
    260 
    261         match stream.next().await {
    262             Some(res) => Ok(res),
    263             None => Err(Error::SubscriptionError),
    264         }
    265     }
    266 
    267     pub fn get_profile_by_key<'a>(
    268         &self,
    269         transaction: &'a Transaction,
    270         key: ProfileKey,
    271     ) -> Result<ProfileRecord<'a>> {
    272         let mut len: usize = 0;
    273 
    274         let profile_record_ptr = unsafe {
    275             bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len)
    276         };
    277 
    278         if profile_record_ptr.is_null() {
    279             // Handle null pointer (e.g., note not found or error occurred)
    280             return Err(Error::NotFound);
    281         }
    282 
    283         // Convert the raw pointer to a Note instance
    284         Ok(ProfileRecord::new_transactional(
    285             profile_record_ptr,
    286             len,
    287             key,
    288             transaction,
    289         ))
    290     }
    291 
    292     pub fn get_profile_by_pubkey<'a>(
    293         &self,
    294         transaction: &'a Transaction,
    295         id: &[u8; 32],
    296     ) -> Result<ProfileRecord<'a>> {
    297         let mut len: usize = 0;
    298         let mut primkey: u64 = 0;
    299 
    300         let profile_record_ptr = unsafe {
    301             bindings::ndb_get_profile_by_pubkey(
    302                 transaction.as_mut_ptr(),
    303                 id.as_ptr(),
    304                 &mut len,
    305                 &mut primkey,
    306             )
    307         };
    308 
    309         if profile_record_ptr.is_null() {
    310             // Handle null pointer (e.g., note not found or error occurred)
    311             return Err(Error::NotFound);
    312         }
    313 
    314         // Convert the raw pointer to a Note instance
    315         Ok(ProfileRecord::new_transactional(
    316             profile_record_ptr,
    317             len,
    318             ProfileKey::new(primkey),
    319             transaction,
    320         ))
    321     }
    322 
    323     pub fn get_note_metadata<'a>(
    324         &self,
    325         txn: &'a Transaction,
    326         id: &[u8; 32],
    327     ) -> Result<NoteMetadata<'a>> {
    328         let res = unsafe {
    329             let res = bindings::ndb_get_note_meta(
    330                 txn.as_mut_ptr(),
    331                 id.as_ptr() as *const ::std::os::raw::c_uchar,
    332             );
    333 
    334             if res.is_null() {
    335                 return Err(Error::NotFound);
    336             }
    337 
    338             &mut *res
    339         };
    340 
    341         Ok(NoteMetadata::new(res))
    342     }
    343 
    344     pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<NoteKey> {
    345         let res = unsafe {
    346             bindings::ndb_get_notekey_by_id(
    347                 txn.as_mut_ptr(),
    348                 id.as_ptr() as *const ::std::os::raw::c_uchar,
    349             )
    350         };
    351 
    352         if res == 0 {
    353             return Err(Error::NotFound);
    354         }
    355 
    356         Ok(NoteKey::new(res))
    357     }
    358 
    359     pub fn get_profilekey_by_pubkey(
    360         &self,
    361         txn: &Transaction,
    362         pubkey: &[u8; 32],
    363     ) -> Result<ProfileKey> {
    364         let res = unsafe {
    365             bindings::ndb_get_profilekey_by_pubkey(
    366                 txn.as_mut_ptr(),
    367                 pubkey.as_ptr() as *const ::std::os::raw::c_uchar,
    368             )
    369         };
    370 
    371         if res == 0 {
    372             return Err(Error::NotFound);
    373         }
    374 
    375         Ok(ProfileKey::new(res))
    376     }
    377 
    378     pub fn get_blocks_by_key<'a>(
    379         &self,
    380         txn: &'a Transaction,
    381         note_key: NoteKey,
    382     ) -> Result<Blocks<'a>> {
    383         let blocks_ptr = unsafe {
    384             bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64())
    385         };
    386 
    387         if blocks_ptr.is_null() {
    388             return Err(Error::NotFound);
    389         }
    390 
    391         Ok(Blocks::new_transactional(blocks_ptr, txn))
    392     }
    393 
    394     pub fn get_note_by_key<'a>(
    395         &self,
    396         transaction: &'a Transaction,
    397         note_key: NoteKey,
    398     ) -> Result<Note<'a>> {
    399         let mut len: usize = 0;
    400 
    401         let note_ptr = unsafe {
    402             bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len)
    403         };
    404 
    405         if note_ptr.is_null() {
    406             // Handle null pointer (e.g., note not found or error occurred)
    407             return Err(Error::NotFound);
    408         }
    409 
    410         // Convert the raw pointer to a Note instance
    411         Ok(Note::new_transactional(
    412             note_ptr,
    413             len,
    414             note_key,
    415             transaction,
    416         ))
    417     }
    418 
    419     /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id
    420     pub fn get_note_by_id<'a>(
    421         &self,
    422         transaction: &'a Transaction,
    423         id: &[u8; 32],
    424     ) -> Result<Note<'a>> {
    425         let mut len: usize = 0;
    426         let mut primkey: u64 = 0;
    427 
    428         let note_ptr = unsafe {
    429             bindings::ndb_get_note_by_id(
    430                 transaction.as_mut_ptr(),
    431                 id.as_ptr(),
    432                 &mut len,
    433                 &mut primkey,
    434             )
    435         };
    436 
    437         if note_ptr.is_null() {
    438             // Handle null pointer (e.g., note not found or error occurred)
    439             return Err(Error::NotFound);
    440         }
    441 
    442         // Convert the raw pointer to a Note instance
    443         Ok(Note::new_transactional(
    444             note_ptr,
    445             len,
    446             NoteKey::new(primkey),
    447             transaction,
    448         ))
    449     }
    450 
    451     pub fn search_profile<'a>(
    452         &self,
    453         transaction: &'a Transaction,
    454         search: &str,
    455         limit: u32,
    456     ) -> Result<Vec<&'a [u8; 32]>> {
    457         let mut results = Vec::new();
    458 
    459         let mut ndb_search = ndb_search {
    460             key: std::ptr::null_mut(),
    461             profile_key: 0,
    462             cursor: std::ptr::null_mut(),
    463         };
    464 
    465         let c_query = CString::new(search).map_err(|_| Error::DecodeError)?;
    466 
    467         let success = unsafe {
    468             bindings::ndb_search_profile(
    469                 transaction.as_mut_ptr(),
    470                 &mut ndb_search as *mut ndb_search,
    471                 c_query.as_c_str().as_ptr(),
    472             )
    473         };
    474 
    475         if success == 0 {
    476             return Ok(results);
    477         }
    478 
    479         // Add the first result
    480         if let Some(key) = unsafe { ndb_search.key.as_ref() } {
    481             results.push(&key.id);
    482         }
    483 
    484         // Iterate through additional results up to the limit
    485         let mut remaining = limit;
    486         while remaining > 0 {
    487             let next_success =
    488                 unsafe { bindings::ndb_search_profile_next(&mut ndb_search as *mut ndb_search) };
    489 
    490             if next_success == 0 {
    491                 break;
    492             }
    493 
    494             if let Some(key) = unsafe { ndb_search.key.as_ref() } {
    495                 results.push(&key.id);
    496             }
    497 
    498             remaining -= 1;
    499         }
    500 
    501         unsafe {
    502             bindings::ndb_search_profile_end(&mut ndb_search as *mut ndb_search);
    503         }
    504 
    505         Ok(results)
    506     }
    507 
    508     /// Get the underlying pointer to the context in C
    509     pub fn as_ptr(&self) -> *mut bindings::ndb {
    510         self.refs.ndb
    511     }
    512 }
    513 
    514 #[cfg(test)]
    515 mod tests {
    516     use super::*;
    517     use crate::config::Config;
    518     use crate::test_util;
    519     use tokio::time::{self, sleep, Duration};
    520 
    521     #[test]
    522     fn ndb_init_works() {
    523         let db = "target/testdbs/init_works";
    524         test_util::cleanup_db(db);
    525 
    526         {
    527             let cfg = Config::new();
    528             let _ = Ndb::new(db, &cfg).expect("ok");
    529         }
    530     }
    531 
    532     #[tokio::test]
    533     async fn query_works() {
    534         let db = "target/testdbs/query";
    535         test_util::cleanup_db(&db);
    536 
    537         {
    538             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    539 
    540             let filter = Filter::new().kinds(vec![1]).build();
    541             let filters = vec![filter];
    542 
    543             let sub = ndb.subscribe(&filters).expect("sub_id");
    544             let waiter = ndb.wait_for_notes(sub, 1);
    545             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    546             let res = waiter.await.expect("await ok");
    547             assert_eq!(res, vec![NoteKey::new(1)]);
    548             let txn = Transaction::new(&ndb).expect("txn");
    549             let res = ndb.query(&txn, &filters, 1).expect("query ok");
    550             assert_eq!(res.len(), 1);
    551             assert_eq!(
    552                 hex::encode(res[0].note.id()),
    553                 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3"
    554             );
    555         }
    556     }
    557 
    558     #[tokio::test]
    559     async fn search_profile_works() {
    560         let db = "target/testdbs/search_profile";
    561         test_util::cleanup_db(&db);
    562 
    563         {
    564             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    565 
    566             let filter = Filter::new().kinds(vec![0]).build();
    567             let filters = vec![filter];
    568 
    569             let sub_id = ndb.subscribe(&filters).expect("sub_id");
    570             let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    571 
    572             ndb.process_event(r#"["EVENT","b",{  "id": "0b9f0e14727733e430dcb00c69b12a76a1e100f419ce369df837f7eb33e4523c",  "pubkey": "3f770d65d3a764a9c5cb503ae123e62ec7598ad035d836e2a810f3877a745b24",  "created_at": 1736785355,  "kind": 0,  "tags": [    [      "alt",      "User profile for Derek Ross"    ],    [      "i",      "twitter:derekmross",      "1634343988407726081"    ],    [      "i",      "github:derekross",      "3edaf845975fa4500496a15039323fa3I"    ]  ],  "content": "{\"about\":\"Building NostrPlebs.com and NostrNests.com. The purple pill helps the orange pill go down. Nostr is the social glue that binds all of your apps together.\",\"banner\":\"https://i.nostr.build/O2JE.jpg\",\"display_name\":\"Derek Ross\",\"lud16\":\"derekross@strike.me\",\"name\":\"Derek Ross\",\"nip05\":\"derekross@nostrplebs.com\",\"picture\":\"https://i.nostr.build/MVIJ6OOFSUzzjVEc.jpg\",\"website\":\"https://nostrplebs.com\",\"created_at\":1707238393}",  "sig": "51e1225ccaf9b6739861dc218ac29045b09d5cf3a51b0ac6ea64bd36827d2d4394244e5f58a4e4a324c84eeda060e1a27e267e0d536e5a0e45b0b6bdc2c43bbc"}]"#).unwrap();
    573 
    574             ndb.process_event(r#"["EVENT","b",{  "id": "232a02ec7e1b2febf85370b52ed49bf34e2701c385c3d563511508dcf0767bcf",  "pubkey": "4a0510f26880d40e432f4865cb5714d9d3c200ca6ebb16b418ae6c555f574967",  "created_at": 1736017863,  "kind": 0,  "tags": [    [      "client",      "Damus Notedeck"    ]  ],  "content": "{\"display_name\":\"KernelKind\",\"name\":\"KernelKind\",\"about\":\"hello from notedeck!\",\"lud16\":\"kernelkind@getalby.com\"}",  "sig": "18c7dea0da3c30677d6822a31a6dfd9ebc02a18a31d69f0f2ac9ba88409e437d3db0ac433639111df1e4948a6d18451d1582173ee4fcd018d0ec92939f2c1506"}]"#).unwrap();
    575 
    576             for _ in 0..2 {
    577                 let _ = sub.next().await;
    578             }
    579             let txn = Transaction::new(&ndb).expect("txn");
    580 
    581             let res = ndb.search_profile(&txn, "kernel", 1);
    582             assert!(res.is_ok());
    583             let res = res.unwrap();
    584             assert!(res.len() >= 1);
    585             let kernelkind_bytes: [u8; 32] = [
    586                 0x4a, 0x05, 0x10, 0xf2, 0x68, 0x80, 0xd4, 0x0e, 0x43, 0x2f, 0x48, 0x65, 0xcb, 0x57,
    587                 0x14, 0xd9, 0xd3, 0xc2, 0x00, 0xca, 0x6e, 0xbb, 0x16, 0xb4, 0x18, 0xae, 0x6c, 0x55,
    588                 0x5f, 0x57, 0x49, 0x67,
    589             ];
    590             assert_eq!(kernelkind_bytes, **res.first().unwrap());
    591 
    592             let res = ndb.search_profile(&txn, "Derek", 1);
    593             assert!(res.is_ok());
    594             let res = res.unwrap();
    595             assert!(res.len() >= 1);
    596             let derek_bytes: [u8; 32] = [
    597                 0x3f, 0x77, 0x0d, 0x65, 0xd3, 0xa7, 0x64, 0xa9, 0xc5, 0xcb, 0x50, 0x3a, 0xe1, 0x23,
    598                 0xe6, 0x2e, 0xc7, 0x59, 0x8a, 0xd0, 0x35, 0xd8, 0x36, 0xe2, 0xa8, 0x10, 0xf3, 0x87,
    599                 0x7a, 0x74, 0x5b, 0x24,
    600             ];
    601             assert_eq!(derek_bytes, **res.first().unwrap());
    602         }
    603     }
    604 
    605     #[tokio::test]
    606     async fn subscribe_event_works() {
    607         let db = "target/testdbs/subscribe";
    608         test_util::cleanup_db(&db);
    609 
    610         {
    611             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    612 
    613             let filter = Filter::new().kinds(vec![1]).build();
    614 
    615             let sub = ndb.subscribe(&[filter]).expect("sub_id");
    616             let waiter = ndb.wait_for_notes(sub, 1);
    617             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    618             let res = waiter.await.expect("await ok");
    619             assert_eq!(res, vec![NoteKey::new(1)]);
    620         }
    621     }
    622 
    623     #[tokio::test]
    624     async fn multiple_events_work() {
    625         let db = "target/testdbs/multiple_events";
    626         test_util::cleanup_db(&db);
    627 
    628         {
    629             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    630 
    631             let filter = Filter::new().kinds(vec![1]).build();
    632 
    633             let sub_id = ndb.subscribe(&[filter]).expect("sub_id");
    634             let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    635 
    636             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    637             ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok");
    638             ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok");
    639 
    640             // this pause causes problems
    641             sleep(Duration::from_millis(100)).await;
    642 
    643             ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok");
    644             ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok");
    645             ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok");
    646 
    647             let timeout_duration = Duration::from_secs(2);
    648             let result = time::timeout(timeout_duration, async {
    649                 let mut count = 0;
    650                 while count < 6 {
    651                     let res = sub.next();
    652                     let _ = res.await.expect("await ok");
    653                     count += 1;
    654                     println!("saw an event, count = {}", count);
    655                 }
    656             })
    657             .await;
    658 
    659             match result {
    660                 Ok(_) => println!("Test completed successfully"),
    661                 Err(_) => panic!("Test timed out"),
    662             }
    663         }
    664     }
    665 
    666     #[tokio::test]
    667     async fn multiple_events_with_final_pause_work() {
    668         let db = "target/testdbs/multiple_events_with_final_pause";
    669         test_util::cleanup_db(&db);
    670 
    671         {
    672             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    673 
    674             let filter = Filter::new().kinds(vec![1]).build();
    675 
    676             let sub_id = ndb.subscribe(&[filter]).expect("sub_id");
    677             let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    678 
    679             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    680             ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok");
    681             ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok");
    682 
    683             sleep(Duration::from_millis(100)).await;
    684 
    685             ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok");
    686             ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok");
    687             ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok");
    688 
    689             // this final pause causes extra problems
    690             sleep(Duration::from_millis(100)).await;
    691 
    692             let timeout_duration = Duration::from_secs(2);
    693             let result = time::timeout(timeout_duration, async {
    694                 let mut count = 0;
    695                 while count < 6 {
    696                     let res = sub.next();
    697                     let _ = res.await.expect("await ok");
    698                     count += 1;
    699                     println!("saw an event, count = {}", count);
    700                 }
    701             })
    702             .await;
    703 
    704             match result {
    705                 Ok(_) => println!("Test completed successfully"),
    706                 Err(_) => panic!("Test timed out"),
    707             }
    708         }
    709     }
    710 
    711     #[test]
    712     fn poll_note_works() {
    713         let db = "target/testdbs/poll";
    714         test_util::cleanup_db(&db);
    715 
    716         {
    717             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    718 
    719             let filter = Filter::new().kinds(vec![1]).build();
    720 
    721             let sub = ndb.subscribe(&[filter]).expect("sub_id");
    722             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    723             // this is too fast, we should have nothing
    724             let res = ndb.poll_for_notes(sub, 1);
    725             assert_eq!(res, vec![]);
    726 
    727             std::thread::sleep(std::time::Duration::from_millis(150));
    728             // now we should have something
    729             let res = ndb.poll_for_notes(sub, 1);
    730             assert_eq!(res, vec![NoteKey::new(1)]);
    731         }
    732     }
    733 
    734     #[test]
    735     fn process_event_works() {
    736         let db = "target/testdbs/event_works";
    737         test_util::cleanup_db(&db);
    738 
    739         {
    740             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    741             ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    742         }
    743 
    744         {
    745             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    746             let id =
    747                 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3")
    748                     .expect("hex id");
    749             let mut txn = Transaction::new(&ndb).expect("txn");
    750             let id_bytes: [u8; 32] = id.try_into().expect("id bytes");
    751             let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note");
    752             assert_eq!(note.kind(), 1);
    753         }
    754     }
    755 
    756     #[test]
    757     #[cfg(target_os = "windows")]
    758     fn test_windows_large_mapsize() {
    759         use std::{fs, path::Path};
    760 
    761         let db = "target/testdbs/windows_large_mapsize";
    762         test_util::cleanup_db(&db);
    763 
    764         {
    765             // 32 TiB should be way too big for CI
    766             let config =
    767                 Config::new().set_mapsize(1024usize * 1024usize * 1024usize * 1024usize * 32usize);
    768 
    769             // in this case, nostrdb should try to keep resizing to
    770             // smaller mapsizes until success
    771 
    772             let ndb = Ndb::new(db, &config);
    773 
    774             assert!(ndb.is_ok());
    775         }
    776 
    777         let file_len = fs::metadata(Path::new(db).join("data.mdb"))
    778             .expect("metadata")
    779             .len();
    780 
    781         assert!(file_len > 0);
    782 
    783         if cfg!(target_os = "windows") {
    784             // on windows the default mapsize will be 1MB when we fail
    785             // to open it
    786             assert_ne!(file_len, 1048576);
    787         } else {
    788             assert!(file_len < 1024u64 * 1024u64);
    789         }
    790 
    791         // we should definitely clean this up... especially on windows
    792         test_util::cleanup_db(&db);
    793     }
    794 
    795     #[tokio::test]
    796     async fn test_unsub_on_drop() {
    797         let db = "target/testdbs/test_unsub_on_drop";
    798         test_util::cleanup_db(&db);
    799 
    800         {
    801             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    802             let sub_id = {
    803                 let filter = Filter::new().kinds(vec![1]).build();
    804                 let filters = vec![filter];
    805 
    806                 let sub_id = ndb.subscribe(&filters).expect("sub_id");
    807                 let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    808 
    809                 let res = sub.next();
    810 
    811                 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    812 
    813                 let res = res.await.expect("await ok");
    814                 assert_eq!(res, vec![NoteKey::new(1)]);
    815 
    816                 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id));
    817                 sub_id
    818             };
    819 
    820             // ensure subscription state is removed after stream is dropped
    821             assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id));
    822             assert_eq!(ndb.subscription_count(), 0);
    823         }
    824 
    825         test_util::cleanup_db(&db);
    826     }
    827 
    828     #[tokio::test]
    829     async fn test_stream() {
    830         let db = "target/testdbs/test_stream";
    831         test_util::cleanup_db(&db);
    832 
    833         {
    834             let mut ndb = Ndb::new(db, &Config::new()).expect("ndb");
    835             let sub_id = {
    836                 let filter = Filter::new().kinds(vec![1]).build();
    837                 let filters = vec![filter];
    838 
    839                 let sub_id = ndb.subscribe(&filters).expect("sub_id");
    840                 let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    841 
    842                 let res = sub.next();
    843 
    844                 let _ = ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    845 
    846                 let _ = ndb.process_event(r#"["EVENT",{"content":"👀","created_at":1761514455,"id":"66af95a6bdfec756344f48241562b684082ff9c76ea940c11c4fd85e91e1219c","kind":7,"pubkey":"d5805ae449e108e907091c67cdf49a9835b3cac3dd11489ad215c0ddf7c658fc","sig":"69f4a3fe7c1cc6aa9c9cc4a2e90e4b71c3b9afaad262e68b92336e0493ff1a748b5dcc20ab6e86d4551dc5ea680ddfa1c08d47f9e4845927e143e8ef2183479b","tags":[["e","d44ad96cb8924092a76bc2afddeb12eb85233c0d03a7d9adc42c2a85a79a4305","wss://relay.primal.net/","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"],["p","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9","wss://relay.primal.net/"],["k","1"]]}]"#);
    847                 let res = res.await.expect("await ok");
    848                 assert_eq!(res, vec![NoteKey::new(1)]);
    849 
    850                 // ensure that unsubscribing kills the stream
    851                 assert!(ndb.unsubscribe(sub_id).is_ok());
    852                 assert!(sub.next().await.is_none());
    853 
    854                 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id));
    855                 sub_id
    856             };
    857 
    858             // ensure subscription state is removed after stream is dropped
    859             assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id));
    860         }
    861 
    862         test_util::cleanup_db(&db);
    863     }
    864 }