nostrdb-rs

nostrdb in rust!
git clone git://jb55.com/nostrdb-rs
Log | Files | Refs | Submodules | README | LICENSE

ndb.rs (38972B)


      1 use std::ffi::CString;
      2 use std::ptr;
      3 
      4 use crate::bindings::ndb_search;
      5 use crate::{
      6     bindings, Blocks, Config, Error, Filter, IngestMetadata, Note, NoteKey, NoteMetadata,
      7     ProfileKey, ProfileRecord, QueryResult, Result, Subscription, SubscriptionState,
      8     SubscriptionStream, Transaction,
      9 };
     10 use futures::StreamExt;
     11 use std::collections::hash_map::Entry;
     12 use std::collections::HashMap;
     13 use std::fs;
     14 use std::os::raw::c_int;
     15 use std::path::Path;
     16 use std::sync::{Arc, Mutex};
     17 use tracing::debug;
     18 
     19 #[derive(Debug)]
     20 struct NdbRef {
     21     ndb: *mut bindings::ndb,
     22     rust_cb_ctx: *mut ::std::os::raw::c_void,
     23 }
     24 
     25 /// SAFETY: thread safety is ensured by nostrdb
     26 unsafe impl Send for NdbRef {}
     27 
     28 /// SAFETY: thread safety is ensured by nostrdb
     29 unsafe impl Sync for NdbRef {}
     30 
     31 /// The database is automatically closed when [Ndb] is [Drop]ped.
     32 impl Drop for NdbRef {
     33     fn drop(&mut self) {
     34         unsafe {
     35             bindings::ndb_destroy(self.ndb);
     36 
     37             if !self.rust_cb_ctx.is_null() {
     38                 // Rebuild the Box from the raw pointer and drop it.
     39                 let _ = Box::from_raw(self.rust_cb_ctx as *mut Box<dyn FnMut()>);
     40             }
     41         }
     42     }
     43 }
     44 
     45 type SubMap = HashMap<Subscription, SubscriptionState>;
     46 
     47 /// A nostrdb context. Construct one of these with [Ndb::new].
     48 #[derive(Debug, Clone)]
     49 pub struct Ndb {
     50     refs: Arc<NdbRef>,
     51 
     52     /// Track query future states
     53     pub(crate) subs: Arc<Mutex<SubMap>>,
     54 }
     55 
     56 impl Ndb {
     57     /// Construct a new nostrdb context. Takes a directory where the database
     58     /// is/will be located and a nostrdb config.
     59     pub fn new(db_dir: &str, config: &Config) -> Result<Self> {
     60         let db_dir_cstr = match CString::new(db_dir) {
     61             Ok(cstr) => cstr,
     62             Err(_) => return Err(Error::DbOpenFailed),
     63         };
     64         let mut ndb: *mut bindings::ndb = ptr::null_mut();
     65 
     66         let path = Path::new(db_dir);
     67         if !path.exists() {
     68             let _ = fs::create_dir_all(path);
     69         }
     70 
     71         let min_mapsize = 1024 * 1024 * 512;
     72         let mut mapsize = config.config.mapsize;
     73         let config = *config;
     74 
     75         let prev_callback = config.config.sub_cb;
     76         let prev_callback_ctx = config.config.sub_cb_ctx;
     77         let subs = Arc::new(Mutex::new(SubMap::default()));
     78         let subs_clone = subs.clone();
     79 
     80         // We need to register our own callback so that we can wake
     81         // query futures
     82         let mut config = config.set_sub_callback(move |sub_id: u64| {
     83             let mut map = subs_clone.lock().unwrap();
     84             if let Some(s) = map.get_mut(&Subscription::new(sub_id)) {
     85                 if let Some(w) = s.waker.take() {
     86                     w.wake();
     87                 }
     88             }
     89 
     90             if let Some(pcb) = prev_callback {
     91                 unsafe {
     92                     pcb(prev_callback_ctx, sub_id);
     93                 };
     94             }
     95         });
     96 
     97         let result = loop {
     98             let result =
     99                 unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) };
    100 
    101             if result == 0 {
    102                 mapsize /= 2;
    103                 config = config.set_mapsize(mapsize);
    104                 debug!("ndb init failed, reducing mapsize to {}", mapsize);
    105 
    106                 if mapsize > min_mapsize {
    107                     continue;
    108                 } else {
    109                     break 0;
    110                 }
    111             } else {
    112                 break result;
    113             }
    114         };
    115 
    116         if result == 0 {
    117             return Err(Error::DbOpenFailed);
    118         }
    119 
    120         let rust_cb_ctx = config.config.sub_cb_ctx;
    121         let refs = Arc::new(NdbRef { ndb, rust_cb_ctx });
    122 
    123         Ok(Ndb { refs, subs })
    124     }
    125 
    126     /// Ingest a relay or client sent event, with optional relay metadata.
    127     /// This function returns immediately and doesn't provide any information on
    128     /// if ingestion was successful or not.
    129     pub fn process_event_with(&self, json: &str, mut meta: IngestMetadata) -> Result<()> {
    130         // Convert the Rust string to a C-style string
    131         let c_json = CString::new(json)?;
    132         let c_json_ptr = c_json.as_ptr();
    133 
    134         // Get the length of the string
    135         let len = json.len() as libc::c_int;
    136 
    137         let res = unsafe {
    138             bindings::ndb_process_event_with(self.as_ptr(), c_json_ptr, len, meta.as_mut_ptr())
    139         };
    140 
    141         if res == 0 {
    142             return Err(Error::NoteProcessFailed);
    143         }
    144 
    145         Ok(())
    146     }
    147 
    148     /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]`
    149     /// This function returns immediately and doesn't provide any information on
    150     /// if ingestion was successful or not.
    151     pub fn process_event(&self, json: &str) -> Result<()> {
    152         self.process_event_with(json, IngestMetadata::new().client(false))
    153     }
    154 
    155     /// Ingest a client-sent event in the form `["EVENT", {"id:"...}]`
    156     /// This function returns immediately and doesn't provide any information on
    157     /// if ingestion was successful or not.
    158     pub fn process_client_event(&self, json: &str) -> Result<()> {
    159         self.process_event_with(json, IngestMetadata::new().client(true))
    160     }
    161 
    162     /// Attempt to unwrap any unprocessed giftwraps
    163     pub fn process_giftwraps(&self, txn: &Transaction) {
    164         unsafe {
    165             bindings::ndb_process_giftwraps(self.as_ptr(), txn.as_mut_ptr());
    166         }
    167     }
    168 
    169     /// Add a secret key to nostrdb's note ingester threads so that
    170     /// nostrdb can unwrap incoming giftwraps.
    171     pub fn add_key(&self, key: &[u8; 32]) -> bool {
    172         unsafe { bindings::ndb_add_key(self.as_ptr(), key as *const u8 as *mut u8) != 0 }
    173     }
    174 
    175     pub fn query<'a>(
    176         &self,
    177         txn: &'a Transaction,
    178         filters: &[Filter],
    179         max_results: i32,
    180     ) -> Result<Vec<QueryResult<'a>>> {
    181         let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect();
    182         let mut out: Vec<bindings::ndb_query_result> = vec![];
    183         let mut returned: i32 = 0;
    184         out.reserve_exact(max_results as usize);
    185         let res = unsafe {
    186             bindings::ndb_query(
    187                 txn.as_mut_ptr(),
    188                 ndb_filters.as_mut_ptr(),
    189                 ndb_filters.len() as i32,
    190                 out.as_mut_ptr(),
    191                 max_results,
    192                 &mut returned as *mut i32,
    193             )
    194         };
    195         if res == 1 {
    196             unsafe {
    197                 out.set_len(returned as usize);
    198             };
    199             Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect())
    200         } else {
    201             Err(Error::QueryError)
    202         }
    203     }
    204 
    205     pub fn subscription_count(&self) -> u32 {
    206         unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 }
    207     }
    208 
    209     pub fn unsubscribe(&mut self, sub: Subscription) -> Result<()> {
    210         let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub.id()) };
    211 
    212         debug!(
    213             "unsubscribed from {}, sub count {}",
    214             sub.id(),
    215             self.subscription_count()
    216         );
    217 
    218         // mark the subscription as done if it exists in our stream map
    219         {
    220             let mut map = self.subs.lock().unwrap();
    221             if let Entry::Occupied(mut entry) = map.entry(sub) {
    222                 entry.get_mut().done = true;
    223             }
    224         }
    225 
    226         if r == 0 {
    227             Err(Error::SubscriptionError)
    228         } else {
    229             Ok(())
    230         }
    231     }
    232 
    233     pub fn subscribe(&self, filters: &[Filter]) -> Result<Subscription> {
    234         unsafe {
    235             let mut ndb_filters: Vec<bindings::ndb_filter> =
    236                 filters.iter().map(|a| a.data).collect();
    237             let id = bindings::ndb_subscribe(
    238                 self.as_ptr(),
    239                 ndb_filters.as_mut_ptr(),
    240                 filters.len() as i32,
    241             );
    242             if id == 0 {
    243                 Err(Error::SubscriptionError)
    244             } else {
    245                 Ok(Subscription::new(id))
    246             }
    247         }
    248     }
    249 
    250     pub fn poll_for_notes(&self, sub: Subscription, max_notes: u32) -> Vec<NoteKey> {
    251         let mut vec = vec![];
    252         vec.reserve_exact(max_notes as usize);
    253 
    254         unsafe {
    255             let res = bindings::ndb_poll_for_notes(
    256                 self.as_ptr(),
    257                 sub.id(),
    258                 vec.as_mut_ptr(),
    259                 max_notes as c_int,
    260             );
    261             vec.set_len(res as usize);
    262         };
    263 
    264         vec.into_iter().map(NoteKey::new).collect()
    265     }
    266 
    267     pub async fn wait_for_notes(
    268         &self,
    269         sub_id: Subscription,
    270         max_notes: u32,
    271     ) -> Result<Vec<NoteKey>> {
    272         let mut stream = SubscriptionStream::new(self.clone(), sub_id).notes_per_await(max_notes);
    273 
    274         match stream.next().await {
    275             Some(res) => Ok(res),
    276             None => Err(Error::SubscriptionError),
    277         }
    278     }
    279 
    280     pub fn get_profile_by_key<'a>(
    281         &self,
    282         transaction: &'a Transaction,
    283         key: ProfileKey,
    284     ) -> Result<ProfileRecord<'a>> {
    285         let mut len: usize = 0;
    286 
    287         let profile_record_ptr = unsafe {
    288             bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len)
    289         };
    290 
    291         if profile_record_ptr.is_null() {
    292             // Handle null pointer (e.g., note not found or error occurred)
    293             return Err(Error::NotFound);
    294         }
    295 
    296         // Convert the raw pointer to a Note instance
    297         Ok(ProfileRecord::new_transactional(
    298             profile_record_ptr,
    299             len,
    300             key,
    301             transaction,
    302         ))
    303     }
    304 
    305     pub fn get_profile_by_pubkey<'a>(
    306         &self,
    307         transaction: &'a Transaction,
    308         id: &[u8; 32],
    309     ) -> Result<ProfileRecord<'a>> {
    310         let mut len: usize = 0;
    311         let mut primkey: u64 = 0;
    312 
    313         let profile_record_ptr = unsafe {
    314             bindings::ndb_get_profile_by_pubkey(
    315                 transaction.as_mut_ptr(),
    316                 id.as_ptr(),
    317                 &mut len,
    318                 &mut primkey,
    319             )
    320         };
    321 
    322         if profile_record_ptr.is_null() {
    323             // Handle null pointer (e.g., note not found or error occurred)
    324             return Err(Error::NotFound);
    325         }
    326 
    327         // Convert the raw pointer to a Note instance
    328         Ok(ProfileRecord::new_transactional(
    329             profile_record_ptr,
    330             len,
    331             ProfileKey::new(primkey),
    332             transaction,
    333         ))
    334     }
    335 
    336     pub fn get_note_metadata<'a>(
    337         &self,
    338         txn: &'a Transaction,
    339         id: &[u8; 32],
    340     ) -> Result<NoteMetadata<'a>> {
    341         let res = unsafe {
    342             let res = bindings::ndb_get_note_meta(
    343                 txn.as_mut_ptr(),
    344                 id.as_ptr() as *const ::std::os::raw::c_uchar,
    345             );
    346 
    347             if res.is_null() {
    348                 return Err(Error::NotFound);
    349             }
    350 
    351             &mut *res
    352         };
    353 
    354         Ok(NoteMetadata::new(res))
    355     }
    356 
    357     pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<NoteKey> {
    358         let res = unsafe {
    359             bindings::ndb_get_notekey_by_id(
    360                 txn.as_mut_ptr(),
    361                 id.as_ptr() as *const ::std::os::raw::c_uchar,
    362             )
    363         };
    364 
    365         if res == 0 {
    366             return Err(Error::NotFound);
    367         }
    368 
    369         Ok(NoteKey::new(res))
    370     }
    371 
    372     pub fn get_profilekey_by_pubkey(
    373         &self,
    374         txn: &Transaction,
    375         pubkey: &[u8; 32],
    376     ) -> Result<ProfileKey> {
    377         let res = unsafe {
    378             bindings::ndb_get_profilekey_by_pubkey(
    379                 txn.as_mut_ptr(),
    380                 pubkey.as_ptr() as *const ::std::os::raw::c_uchar,
    381             )
    382         };
    383 
    384         if res == 0 {
    385             return Err(Error::NotFound);
    386         }
    387 
    388         Ok(ProfileKey::new(res))
    389     }
    390 
    391     pub fn get_blocks_by_key<'a>(
    392         &self,
    393         txn: &'a Transaction,
    394         note_key: NoteKey,
    395     ) -> Result<Blocks<'a>> {
    396         let blocks_ptr = unsafe {
    397             bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64())
    398         };
    399 
    400         if blocks_ptr.is_null() {
    401             return Err(Error::NotFound);
    402         }
    403 
    404         Ok(Blocks::new_transactional(blocks_ptr, txn))
    405     }
    406 
    407     pub fn get_note_by_key<'a>(
    408         &self,
    409         transaction: &'a Transaction,
    410         note_key: NoteKey,
    411     ) -> Result<Note<'a>> {
    412         let mut len: usize = 0;
    413 
    414         let note_ptr = unsafe {
    415             bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len)
    416         };
    417 
    418         if note_ptr.is_null() {
    419             // Handle null pointer (e.g., note not found or error occurred)
    420             return Err(Error::NotFound);
    421         }
    422 
    423         // Convert the raw pointer to a Note instance
    424         Ok(Note::new_transactional(
    425             note_ptr,
    426             len,
    427             note_key,
    428             transaction,
    429         ))
    430     }
    431 
    432     /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id
    433     pub fn get_note_by_id<'a>(
    434         &self,
    435         transaction: &'a Transaction,
    436         id: &[u8; 32],
    437     ) -> Result<Note<'a>> {
    438         let mut len: usize = 0;
    439         let mut primkey: u64 = 0;
    440 
    441         let note_ptr = unsafe {
    442             bindings::ndb_get_note_by_id(
    443                 transaction.as_mut_ptr(),
    444                 id.as_ptr(),
    445                 &mut len,
    446                 &mut primkey,
    447             )
    448         };
    449 
    450         if note_ptr.is_null() {
    451             // Handle null pointer (e.g., note not found or error occurred)
    452             return Err(Error::NotFound);
    453         }
    454 
    455         // Convert the raw pointer to a Note instance
    456         Ok(Note::new_transactional(
    457             note_ptr,
    458             len,
    459             NoteKey::new(primkey),
    460             transaction,
    461         ))
    462     }
    463 
    464     pub fn search_profile<'a>(
    465         &self,
    466         transaction: &'a Transaction,
    467         search: &str,
    468         limit: u32,
    469     ) -> Result<Vec<&'a [u8; 32]>> {
    470         let mut results = Vec::new();
    471 
    472         let mut ndb_search = ndb_search {
    473             key: std::ptr::null_mut(),
    474             profile_key: 0,
    475             cursor: std::ptr::null_mut(),
    476         };
    477 
    478         let c_query = CString::new(search).map_err(|_| Error::DecodeError)?;
    479 
    480         let success = unsafe {
    481             bindings::ndb_search_profile(
    482                 transaction.as_mut_ptr(),
    483                 &mut ndb_search as *mut ndb_search,
    484                 c_query.as_c_str().as_ptr(),
    485             )
    486         };
    487 
    488         if success == 0 {
    489             return Ok(results);
    490         }
    491 
    492         // Add the first result
    493         if let Some(key) = unsafe { ndb_search.key.as_ref() } {
    494             results.push(&key.id);
    495         }
    496 
    497         // Iterate through additional results up to the limit
    498         let mut remaining = limit;
    499         while remaining > 0 {
    500             let next_success =
    501                 unsafe { bindings::ndb_search_profile_next(&mut ndb_search as *mut ndb_search) };
    502 
    503             if next_success == 0 {
    504                 break;
    505             }
    506 
    507             if let Some(key) = unsafe { ndb_search.key.as_ref() } {
    508                 results.push(&key.id);
    509             }
    510 
    511             remaining -= 1;
    512         }
    513 
    514         unsafe {
    515             bindings::ndb_search_profile_end(&mut ndb_search as *mut ndb_search);
    516         }
    517 
    518         Ok(results)
    519     }
    520 
    521     /// Get the underlying pointer to the context in C
    522     pub fn as_ptr(&self) -> *mut bindings::ndb {
    523         self.refs.ndb
    524     }
    525 }
    526 
    527 #[cfg(test)]
    528 mod tests {
    529     use super::*;
    530     use crate::config::Config;
    531     use crate::test_util;
    532     use tokio::time::{self, sleep, Duration};
    533 
    534     #[test]
    535     fn ndb_init_works() {
    536         let db = "target/testdbs/init_works";
    537         test_util::cleanup_db(db);
    538 
    539         {
    540             let cfg = Config::new();
    541             let _ = Ndb::new(db, &cfg).expect("ok");
    542         }
    543     }
    544 
    545     #[tokio::test]
    546     async fn query_works() {
    547         let db = "target/testdbs/query";
    548         test_util::cleanup_db(&db);
    549 
    550         {
    551             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    552 
    553             let filter = Filter::new().kinds(vec![1]).build();
    554             let filters = vec![filter];
    555 
    556             let sub = ndb.subscribe(&filters).expect("sub_id");
    557             let waiter = ndb.wait_for_notes(sub, 1);
    558             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    559             let res = waiter.await.expect("await ok");
    560             assert_eq!(res, vec![NoteKey::new(1)]);
    561             let txn = Transaction::new(&ndb).expect("txn");
    562             let res = ndb.query(&txn, &filters, 1).expect("query ok");
    563             assert_eq!(res.len(), 1);
    564             assert_eq!(
    565                 hex::encode(res[0].note.id()),
    566                 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3"
    567             );
    568         }
    569     }
    570 
    571     #[tokio::test]
    572     async fn search_profile_works() {
    573         let db = "target/testdbs/search_profile";
    574         test_util::cleanup_db(&db);
    575 
    576         {
    577             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    578 
    579             let filter = Filter::new().kinds(vec![0]).build();
    580             let filters = vec![filter];
    581 
    582             let sub_id = ndb.subscribe(&filters).expect("sub_id");
    583             let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    584 
    585             ndb.process_event(r#"["EVENT","b",{  "id": "0b9f0e14727733e430dcb00c69b12a76a1e100f419ce369df837f7eb33e4523c",  "pubkey": "3f770d65d3a764a9c5cb503ae123e62ec7598ad035d836e2a810f3877a745b24",  "created_at": 1736785355,  "kind": 0,  "tags": [    [      "alt",      "User profile for Derek Ross"    ],    [      "i",      "twitter:derekmross",      "1634343988407726081"    ],    [      "i",      "github:derekross",      "3edaf845975fa4500496a15039323fa3I"    ]  ],  "content": "{\"about\":\"Building NostrPlebs.com and NostrNests.com. The purple pill helps the orange pill go down. Nostr is the social glue that binds all of your apps together.\",\"banner\":\"https://i.nostr.build/O2JE.jpg\",\"display_name\":\"Derek Ross\",\"lud16\":\"derekross@strike.me\",\"name\":\"Derek Ross\",\"nip05\":\"derekross@nostrplebs.com\",\"picture\":\"https://i.nostr.build/MVIJ6OOFSUzzjVEc.jpg\",\"website\":\"https://nostrplebs.com\",\"created_at\":1707238393}",  "sig": "51e1225ccaf9b6739861dc218ac29045b09d5cf3a51b0ac6ea64bd36827d2d4394244e5f58a4e4a324c84eeda060e1a27e267e0d536e5a0e45b0b6bdc2c43bbc"}]"#).unwrap();
    586 
    587             ndb.process_event(r#"["EVENT","b",{  "id": "232a02ec7e1b2febf85370b52ed49bf34e2701c385c3d563511508dcf0767bcf",  "pubkey": "4a0510f26880d40e432f4865cb5714d9d3c200ca6ebb16b418ae6c555f574967",  "created_at": 1736017863,  "kind": 0,  "tags": [    [      "client",      "Damus Notedeck"    ]  ],  "content": "{\"display_name\":\"KernelKind\",\"name\":\"KernelKind\",\"about\":\"hello from notedeck!\",\"lud16\":\"kernelkind@getalby.com\"}",  "sig": "18c7dea0da3c30677d6822a31a6dfd9ebc02a18a31d69f0f2ac9ba88409e437d3db0ac433639111df1e4948a6d18451d1582173ee4fcd018d0ec92939f2c1506"}]"#).unwrap();
    588 
    589             for _ in 0..2 {
    590                 let _ = sub.next().await;
    591             }
    592             let txn = Transaction::new(&ndb).expect("txn");
    593 
    594             let res = ndb.search_profile(&txn, "kernel", 1);
    595             assert!(res.is_ok());
    596             let res = res.unwrap();
    597             assert!(res.len() >= 1);
    598             let kernelkind_bytes: [u8; 32] = [
    599                 0x4a, 0x05, 0x10, 0xf2, 0x68, 0x80, 0xd4, 0x0e, 0x43, 0x2f, 0x48, 0x65, 0xcb, 0x57,
    600                 0x14, 0xd9, 0xd3, 0xc2, 0x00, 0xca, 0x6e, 0xbb, 0x16, 0xb4, 0x18, 0xae, 0x6c, 0x55,
    601                 0x5f, 0x57, 0x49, 0x67,
    602             ];
    603             assert_eq!(kernelkind_bytes, **res.first().unwrap());
    604 
    605             let res = ndb.search_profile(&txn, "Derek", 1);
    606             assert!(res.is_ok());
    607             let res = res.unwrap();
    608             assert!(res.len() >= 1);
    609             let derek_bytes: [u8; 32] = [
    610                 0x3f, 0x77, 0x0d, 0x65, 0xd3, 0xa7, 0x64, 0xa9, 0xc5, 0xcb, 0x50, 0x3a, 0xe1, 0x23,
    611                 0xe6, 0x2e, 0xc7, 0x59, 0x8a, 0xd0, 0x35, 0xd8, 0x36, 0xe2, 0xa8, 0x10, 0xf3, 0x87,
    612                 0x7a, 0x74, 0x5b, 0x24,
    613             ];
    614             assert_eq!(derek_bytes, **res.first().unwrap());
    615         }
    616     }
    617 
    618     #[tokio::test]
    619     async fn subscribe_event_works() {
    620         let db = "target/testdbs/subscribe";
    621         test_util::cleanup_db(&db);
    622 
    623         {
    624             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    625 
    626             let filter = Filter::new().kinds(vec![1]).build();
    627 
    628             let sub = ndb.subscribe(&[filter]).expect("sub_id");
    629             let waiter = ndb.wait_for_notes(sub, 1);
    630             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    631             let res = waiter.await.expect("await ok");
    632             assert_eq!(res, vec![NoteKey::new(1)]);
    633         }
    634     }
    635 
    636     #[tokio::test]
    637     async fn multiple_events_work() {
    638         let db = "target/testdbs/multiple_events";
    639         test_util::cleanup_db(&db);
    640 
    641         {
    642             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    643 
    644             let filter = Filter::new().kinds(vec![1]).build();
    645 
    646             let sub_id = ndb.subscribe(&[filter]).expect("sub_id");
    647             let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    648 
    649             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    650             ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok");
    651             ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok");
    652 
    653             // this pause causes problems
    654             sleep(Duration::from_millis(100)).await;
    655 
    656             ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok");
    657             ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok");
    658             ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok");
    659 
    660             let timeout_duration = Duration::from_secs(2);
    661             let result = time::timeout(timeout_duration, async {
    662                 let mut count = 0;
    663                 while count < 6 {
    664                     let res = sub.next();
    665                     let _ = res.await.expect("await ok");
    666                     count += 1;
    667                     println!("saw an event, count = {}", count);
    668                 }
    669             })
    670             .await;
    671 
    672             match result {
    673                 Ok(_) => println!("Test completed successfully"),
    674                 Err(_) => panic!("Test timed out"),
    675             }
    676         }
    677     }
    678 
    679     #[tokio::test]
    680     async fn multiple_events_with_final_pause_work() {
    681         let db = "target/testdbs/multiple_events_with_final_pause";
    682         test_util::cleanup_db(&db);
    683 
    684         {
    685             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    686 
    687             let filter = Filter::new().kinds(vec![1]).build();
    688 
    689             let sub_id = ndb.subscribe(&[filter]).expect("sub_id");
    690             let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    691 
    692             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    693             ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok");
    694             ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok");
    695 
    696             sleep(Duration::from_millis(100)).await;
    697 
    698             ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok");
    699             ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok");
    700             ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok");
    701 
    702             // this final pause causes extra problems
    703             sleep(Duration::from_millis(100)).await;
    704 
    705             let timeout_duration = Duration::from_secs(2);
    706             let result = time::timeout(timeout_duration, async {
    707                 let mut count = 0;
    708                 while count < 6 {
    709                     let res = sub.next();
    710                     let _ = res.await.expect("await ok");
    711                     count += 1;
    712                     println!("saw an event, count = {}", count);
    713                 }
    714             })
    715             .await;
    716 
    717             match result {
    718                 Ok(_) => println!("Test completed successfully"),
    719                 Err(_) => panic!("Test timed out"),
    720             }
    721         }
    722     }
    723 
    724     #[test]
    725     fn poll_note_works() {
    726         let db = "target/testdbs/poll";
    727         test_util::cleanup_db(&db);
    728 
    729         {
    730             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    731 
    732             let filter = Filter::new().kinds(vec![1]).build();
    733 
    734             let sub = ndb.subscribe(&[filter]).expect("sub_id");
    735             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    736             // this is too fast, we should have nothing
    737             let res = ndb.poll_for_notes(sub, 1);
    738             assert_eq!(res, vec![]);
    739 
    740             std::thread::sleep(std::time::Duration::from_millis(150));
    741             // now we should have something
    742             let res = ndb.poll_for_notes(sub, 1);
    743             assert_eq!(res, vec![NoteKey::new(1)]);
    744         }
    745     }
    746 
    747     #[test]
    748     fn process_event_works() {
    749         let db = "target/testdbs/event_works";
    750         test_util::cleanup_db(&db);
    751 
    752         {
    753             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    754             ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    755         }
    756 
    757         {
    758             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    759             let id =
    760                 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3")
    761                     .expect("hex id");
    762             let mut txn = Transaction::new(&ndb).expect("txn");
    763             let id_bytes: [u8; 32] = id.try_into().expect("id bytes");
    764             let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note");
    765             assert_eq!(note.kind(), 1);
    766         }
    767     }
    768 
    769     #[test]
    770     #[cfg(target_os = "windows")]
    771     fn test_windows_large_mapsize() {
    772         use std::{fs, path::Path};
    773 
    774         let db = "target/testdbs/windows_large_mapsize";
    775         test_util::cleanup_db(&db);
    776 
    777         {
    778             // 32 TiB should be way too big for CI
    779             let config =
    780                 Config::new().set_mapsize(1024usize * 1024usize * 1024usize * 1024usize * 32usize);
    781 
    782             // in this case, nostrdb should try to keep resizing to
    783             // smaller mapsizes until success
    784 
    785             let ndb = Ndb::new(db, &config);
    786 
    787             assert!(ndb.is_ok());
    788         }
    789 
    790         let file_len = fs::metadata(Path::new(db).join("data.mdb"))
    791             .expect("metadata")
    792             .len();
    793 
    794         assert!(file_len > 0);
    795 
    796         if cfg!(target_os = "windows") {
    797             // on windows the default mapsize will be 1MB when we fail
    798             // to open it
    799             assert_ne!(file_len, 1048576);
    800         } else {
    801             assert!(file_len < 1024u64 * 1024u64);
    802         }
    803 
    804         // we should definitely clean this up... especially on windows
    805         test_util::cleanup_db(&db);
    806     }
    807 
    808     #[tokio::test]
    809     async fn test_unsub_on_drop() {
    810         let db = "target/testdbs/test_unsub_on_drop";
    811         test_util::cleanup_db(&db);
    812 
    813         {
    814             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    815             let sub_id = {
    816                 let filter = Filter::new().kinds(vec![1]).build();
    817                 let filters = vec![filter];
    818 
    819                 let sub_id = ndb.subscribe(&filters).expect("sub_id");
    820                 let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    821 
    822                 let res = sub.next();
    823 
    824                 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    825 
    826                 let res = res.await.expect("await ok");
    827                 assert_eq!(res, vec![NoteKey::new(1)]);
    828 
    829                 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id));
    830                 sub_id
    831             };
    832 
    833             // ensure subscription state is removed after stream is dropped
    834             assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id));
    835             assert_eq!(ndb.subscription_count(), 0);
    836         }
    837 
    838         test_util::cleanup_db(&db);
    839     }
    840 
    841     #[tokio::test]
    842     async fn test_stream() {
    843         let db = "target/testdbs/test_stream";
    844         test_util::cleanup_db(&db);
    845 
    846         {
    847             let mut ndb = Ndb::new(db, &Config::new()).expect("ndb");
    848             let sub_id = {
    849                 let filter = Filter::new().kinds(vec![1]).build();
    850                 let filters = vec![filter];
    851 
    852                 let sub_id = ndb.subscribe(&filters).expect("sub_id");
    853                 let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    854 
    855                 let res = sub.next();
    856 
    857                 let _ = ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    858 
    859                 let _ = ndb.process_event(r#"["EVENT",{"content":"👀","created_at":1761514455,"id":"66af95a6bdfec756344f48241562b684082ff9c76ea940c11c4fd85e91e1219c","kind":7,"pubkey":"d5805ae449e108e907091c67cdf49a9835b3cac3dd11489ad215c0ddf7c658fc","sig":"69f4a3fe7c1cc6aa9c9cc4a2e90e4b71c3b9afaad262e68b92336e0493ff1a748b5dcc20ab6e86d4551dc5ea680ddfa1c08d47f9e4845927e143e8ef2183479b","tags":[["e","d44ad96cb8924092a76bc2afddeb12eb85233c0d03a7d9adc42c2a85a79a4305","wss://relay.primal.net/","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"],["p","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9","wss://relay.primal.net/"],["k","1"]]}]"#);
    860                 let res = res.await.expect("await ok");
    861                 assert_eq!(res, vec![NoteKey::new(1)]);
    862 
    863                 // ensure that unsubscribing kills the stream
    864                 assert!(ndb.unsubscribe(sub_id).is_ok());
    865                 assert!(sub.next().await.is_none());
    866 
    867                 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id));
    868                 sub_id
    869             };
    870 
    871             // ensure subscription state is removed after stream is dropped
    872             assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id));
    873         }
    874 
    875         test_util::cleanup_db(&db);
    876     }
    877 }