nostrdb-rs

nostrdb in rust!
git clone git://jb55.com/nostrdb-rs
Log | Files | Refs | Submodules | README | LICENSE

ndb.rs (38845B)


      1 use std::ffi::CString;
      2 use std::ptr;
      3 
      4 use crate::bindings::ndb_search;
      5 use crate::{
      6     bindings, Blocks, Config, Error, Filter, IngestMetadata, Note, NoteKey, ProfileKey,
      7     ProfileRecord, QueryResult, Result, Subscription, SubscriptionState, SubscriptionStream,
      8     Transaction,
      9 };
     10 use futures::StreamExt;
     11 use std::collections::hash_map::Entry;
     12 use std::collections::HashMap;
     13 use std::fs;
     14 use std::os::raw::c_int;
     15 use std::path::Path;
     16 use std::sync::{Arc, Mutex};
     17 use tracing::debug;
     18 
     19 #[derive(Debug)]
     20 struct NdbRef {
     21     ndb: *mut bindings::ndb,
     22     rust_cb_ctx: *mut ::std::os::raw::c_void,
     23 }
     24 
     25 /// SAFETY: thread safety is ensured by nostrdb
     26 unsafe impl Send for NdbRef {}
     27 
     28 /// SAFETY: thread safety is ensured by nostrdb
     29 unsafe impl Sync for NdbRef {}
     30 
     31 /// The database is automatically closed when [Ndb] is [Drop]ped.
     32 impl Drop for NdbRef {
     33     fn drop(&mut self) {
     34         unsafe {
     35             bindings::ndb_destroy(self.ndb);
     36 
     37             if !self.rust_cb_ctx.is_null() {
     38                 // Rebuild the Box from the raw pointer and drop it.
     39                 let _ = Box::from_raw(self.rust_cb_ctx as *mut Box<dyn FnMut()>);
     40             }
     41         }
     42     }
     43 }
     44 
     45 type SubMap = HashMap<Subscription, SubscriptionState>;
     46 
     47 /// A nostrdb context. Construct one of these with [Ndb::new].
     48 #[derive(Debug, Clone)]
     49 pub struct Ndb {
     50     refs: Arc<NdbRef>,
     51 
     52     /// Track query future states
     53     pub(crate) subs: Arc<Mutex<SubMap>>,
     54 }
     55 
     56 impl Ndb {
     57     /// Construct a new nostrdb context. Takes a directory where the database
     58     /// is/will be located and a nostrdb config.
     59     pub fn new(db_dir: &str, config: &Config) -> Result<Self> {
     60         let db_dir_cstr = match CString::new(db_dir) {
     61             Ok(cstr) => cstr,
     62             Err(_) => return Err(Error::DbOpenFailed),
     63         };
     64         let mut ndb: *mut bindings::ndb = ptr::null_mut();
     65 
     66         let path = Path::new(db_dir);
     67         if !path.exists() {
     68             let _ = fs::create_dir_all(path);
     69         }
     70 
     71         let min_mapsize = 1024 * 1024 * 512;
     72         let mut mapsize = config.config.mapsize;
     73         let config = *config;
     74 
     75         let prev_callback = config.config.sub_cb;
     76         let prev_callback_ctx = config.config.sub_cb_ctx;
     77         let subs = Arc::new(Mutex::new(SubMap::default()));
     78         let subs_clone = subs.clone();
     79 
     80         // We need to register our own callback so that we can wake
     81         // query futures
     82         let mut config = config.set_sub_callback(move |sub_id: u64| {
     83             let mut map = subs_clone.lock().unwrap();
     84             if let Some(s) = map.get_mut(&Subscription::new(sub_id)) {
     85                 if let Some(w) = s.waker.take() {
     86                     w.wake();
     87                 }
     88             }
     89 
     90             if let Some(pcb) = prev_callback {
     91                 unsafe {
     92                     pcb(prev_callback_ctx, sub_id);
     93                 };
     94             }
     95         });
     96 
     97         let result = loop {
     98             let result =
     99                 unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) };
    100 
    101             if result == 0 {
    102                 mapsize /= 2;
    103                 config = config.set_mapsize(mapsize);
    104                 debug!("ndb init failed, reducing mapsize to {}", mapsize);
    105 
    106                 if mapsize > min_mapsize {
    107                     continue;
    108                 } else {
    109                     break 0;
    110                 }
    111             } else {
    112                 break result;
    113             }
    114         };
    115 
    116         if result == 0 {
    117             return Err(Error::DbOpenFailed);
    118         }
    119 
    120         let rust_cb_ctx = config.config.sub_cb_ctx;
    121         let refs = Arc::new(NdbRef { ndb, rust_cb_ctx });
    122 
    123         Ok(Ndb { refs, subs })
    124     }
    125 
    126     /// Ingest a relay or client sent event, with optional relay metadata.
    127     /// This function returns immediately and doesn't provide any information on
    128     /// if ingestion was successful or not.
    129     pub fn process_event_with(&self, json: &str, mut meta: IngestMetadata) -> Result<()> {
    130         // Convert the Rust string to a C-style string
    131         let c_json = CString::new(json)?;
    132         let c_json_ptr = c_json.as_ptr();
    133 
    134         // Get the length of the string
    135         let len = json.len() as libc::c_int;
    136 
    137         let res = unsafe {
    138             bindings::ndb_process_event_with(self.as_ptr(), c_json_ptr, len, meta.as_mut_ptr())
    139         };
    140 
    141         if res == 0 {
    142             return Err(Error::NoteProcessFailed);
    143         }
    144 
    145         Ok(())
    146     }
    147 
    148     /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]`
    149     /// This function returns immediately and doesn't provide any information on
    150     /// if ingestion was successful or not.
    151     #[deprecated(
    152         note = "Use `process_event_with` with IngestMetadata::new().client(false).relay(...)"
    153     )]
    154     pub fn process_event(&self, json: &str) -> Result<()> {
    155         self.process_event_with(json, IngestMetadata::new().client(false))
    156     }
    157 
    158     /// Ingest a client-sent event in the form `["EVENT", {"id:"...}]`
    159     /// This function returns immediately and doesn't provide any information on
    160     /// if ingestion was successful or not.
    161     #[deprecated(
    162         note = "Use `process_event_with` with IngestMetadata::new().client(true).relay(...)"
    163     )]
    164     pub fn process_client_event(&self, json: &str) -> Result<()> {
    165         self.process_event_with(json, IngestMetadata::new().client(true))
    166     }
    167 
    168     pub fn query<'a>(
    169         &self,
    170         txn: &'a Transaction,
    171         filters: &[Filter],
    172         max_results: i32,
    173     ) -> Result<Vec<QueryResult<'a>>> {
    174         let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect();
    175         let mut out: Vec<bindings::ndb_query_result> = vec![];
    176         let mut returned: i32 = 0;
    177         out.reserve_exact(max_results as usize);
    178         let res = unsafe {
    179             bindings::ndb_query(
    180                 txn.as_mut_ptr(),
    181                 ndb_filters.as_mut_ptr(),
    182                 ndb_filters.len() as i32,
    183                 out.as_mut_ptr(),
    184                 max_results,
    185                 &mut returned as *mut i32,
    186             )
    187         };
    188         if res == 1 {
    189             unsafe {
    190                 out.set_len(returned as usize);
    191             };
    192             Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect())
    193         } else {
    194             Err(Error::QueryError)
    195         }
    196     }
    197 
    198     pub fn subscription_count(&self) -> u32 {
    199         unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 }
    200     }
    201 
    202     pub fn unsubscribe(&mut self, sub: Subscription) -> Result<()> {
    203         let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub.id()) };
    204 
    205         debug!(
    206             "unsubscribed from {}, sub count {}",
    207             sub.id(),
    208             self.subscription_count()
    209         );
    210 
    211         // mark the subscription as done if it exists in our stream map
    212         {
    213             let mut map = self.subs.lock().unwrap();
    214             if let Entry::Occupied(mut entry) = map.entry(sub) {
    215                 entry.get_mut().done = true;
    216             }
    217         }
    218 
    219         if r == 0 {
    220             Err(Error::SubscriptionError)
    221         } else {
    222             Ok(())
    223         }
    224     }
    225 
    226     pub fn subscribe(&self, filters: &[Filter]) -> Result<Subscription> {
    227         unsafe {
    228             let mut ndb_filters: Vec<bindings::ndb_filter> =
    229                 filters.iter().map(|a| a.data).collect();
    230             let id = bindings::ndb_subscribe(
    231                 self.as_ptr(),
    232                 ndb_filters.as_mut_ptr(),
    233                 filters.len() as i32,
    234             );
    235             if id == 0 {
    236                 Err(Error::SubscriptionError)
    237             } else {
    238                 Ok(Subscription::new(id))
    239             }
    240         }
    241     }
    242 
    243     pub fn poll_for_notes(&self, sub: Subscription, max_notes: u32) -> Vec<NoteKey> {
    244         let mut vec = vec![];
    245         vec.reserve_exact(max_notes as usize);
    246 
    247         unsafe {
    248             let res = bindings::ndb_poll_for_notes(
    249                 self.as_ptr(),
    250                 sub.id(),
    251                 vec.as_mut_ptr(),
    252                 max_notes as c_int,
    253             );
    254             vec.set_len(res as usize);
    255         };
    256 
    257         vec.into_iter().map(NoteKey::new).collect()
    258     }
    259 
    260     pub async fn wait_for_notes(
    261         &self,
    262         sub_id: Subscription,
    263         max_notes: u32,
    264     ) -> Result<Vec<NoteKey>> {
    265         let mut stream = SubscriptionStream::new(self.clone(), sub_id).notes_per_await(max_notes);
    266 
    267         match stream.next().await {
    268             Some(res) => Ok(res),
    269             None => Err(Error::SubscriptionError),
    270         }
    271     }
    272 
    273     pub fn get_profile_by_key<'a>(
    274         &self,
    275         transaction: &'a Transaction,
    276         key: ProfileKey,
    277     ) -> Result<ProfileRecord<'a>> {
    278         let mut len: usize = 0;
    279 
    280         let profile_record_ptr = unsafe {
    281             bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len)
    282         };
    283 
    284         if profile_record_ptr.is_null() {
    285             // Handle null pointer (e.g., note not found or error occurred)
    286             return Err(Error::NotFound);
    287         }
    288 
    289         // Convert the raw pointer to a Note instance
    290         Ok(ProfileRecord::new_transactional(
    291             profile_record_ptr,
    292             len,
    293             key,
    294             transaction,
    295         ))
    296     }
    297 
    298     pub fn get_profile_by_pubkey<'a>(
    299         &self,
    300         transaction: &'a Transaction,
    301         id: &[u8; 32],
    302     ) -> Result<ProfileRecord<'a>> {
    303         let mut len: usize = 0;
    304         let mut primkey: u64 = 0;
    305 
    306         let profile_record_ptr = unsafe {
    307             bindings::ndb_get_profile_by_pubkey(
    308                 transaction.as_mut_ptr(),
    309                 id.as_ptr(),
    310                 &mut len,
    311                 &mut primkey,
    312             )
    313         };
    314 
    315         if profile_record_ptr.is_null() {
    316             // Handle null pointer (e.g., note not found or error occurred)
    317             return Err(Error::NotFound);
    318         }
    319 
    320         // Convert the raw pointer to a Note instance
    321         Ok(ProfileRecord::new_transactional(
    322             profile_record_ptr,
    323             len,
    324             ProfileKey::new(primkey),
    325             transaction,
    326         ))
    327     }
    328 
    329     pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<NoteKey> {
    330         let res = unsafe {
    331             bindings::ndb_get_notekey_by_id(
    332                 txn.as_mut_ptr(),
    333                 id.as_ptr() as *const ::std::os::raw::c_uchar,
    334             )
    335         };
    336 
    337         if res == 0 {
    338             return Err(Error::NotFound);
    339         }
    340 
    341         Ok(NoteKey::new(res))
    342     }
    343 
    344     pub fn get_profilekey_by_pubkey(
    345         &self,
    346         txn: &Transaction,
    347         pubkey: &[u8; 32],
    348     ) -> Result<ProfileKey> {
    349         let res = unsafe {
    350             bindings::ndb_get_profilekey_by_pubkey(
    351                 txn.as_mut_ptr(),
    352                 pubkey.as_ptr() as *const ::std::os::raw::c_uchar,
    353             )
    354         };
    355 
    356         if res == 0 {
    357             return Err(Error::NotFound);
    358         }
    359 
    360         Ok(ProfileKey::new(res))
    361     }
    362 
    363     pub fn get_blocks_by_key<'a>(
    364         &self,
    365         txn: &'a Transaction,
    366         note_key: NoteKey,
    367     ) -> Result<Blocks<'a>> {
    368         let blocks_ptr = unsafe {
    369             bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64())
    370         };
    371 
    372         if blocks_ptr.is_null() {
    373             return Err(Error::NotFound);
    374         }
    375 
    376         Ok(Blocks::new_transactional(blocks_ptr, txn))
    377     }
    378 
    379     pub fn get_note_by_key<'a>(
    380         &self,
    381         transaction: &'a Transaction,
    382         note_key: NoteKey,
    383     ) -> Result<Note<'a>> {
    384         let mut len: usize = 0;
    385 
    386         let note_ptr = unsafe {
    387             bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len)
    388         };
    389 
    390         if note_ptr.is_null() {
    391             // Handle null pointer (e.g., note not found or error occurred)
    392             return Err(Error::NotFound);
    393         }
    394 
    395         // Convert the raw pointer to a Note instance
    396         Ok(Note::new_transactional(
    397             note_ptr,
    398             len,
    399             note_key,
    400             transaction,
    401         ))
    402     }
    403 
    404     /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id
    405     pub fn get_note_by_id<'a>(
    406         &self,
    407         transaction: &'a Transaction,
    408         id: &[u8; 32],
    409     ) -> Result<Note<'a>> {
    410         let mut len: usize = 0;
    411         let mut primkey: u64 = 0;
    412 
    413         let note_ptr = unsafe {
    414             bindings::ndb_get_note_by_id(
    415                 transaction.as_mut_ptr(),
    416                 id.as_ptr(),
    417                 &mut len,
    418                 &mut primkey,
    419             )
    420         };
    421 
    422         if note_ptr.is_null() {
    423             // Handle null pointer (e.g., note not found or error occurred)
    424             return Err(Error::NotFound);
    425         }
    426 
    427         // Convert the raw pointer to a Note instance
    428         Ok(Note::new_transactional(
    429             note_ptr,
    430             len,
    431             NoteKey::new(primkey),
    432             transaction,
    433         ))
    434     }
    435 
    436     pub fn search_profile<'a>(
    437         &self,
    438         transaction: &'a Transaction,
    439         search: &str,
    440         limit: u32,
    441     ) -> Result<Vec<&'a [u8; 32]>> {
    442         let mut results = Vec::new();
    443 
    444         let mut ndb_search = ndb_search {
    445             key: std::ptr::null_mut(),
    446             profile_key: 0,
    447             cursor: std::ptr::null_mut(),
    448         };
    449 
    450         let c_query = CString::new(search).map_err(|_| Error::DecodeError)?;
    451 
    452         let success = unsafe {
    453             bindings::ndb_search_profile(
    454                 transaction.as_mut_ptr(),
    455                 &mut ndb_search as *mut ndb_search,
    456                 c_query.as_c_str().as_ptr(),
    457             )
    458         };
    459 
    460         if success == 0 {
    461             return Ok(results);
    462         }
    463 
    464         // Add the first result
    465         if let Some(key) = unsafe { ndb_search.key.as_ref() } {
    466             results.push(&key.id);
    467         }
    468 
    469         // Iterate through additional results up to the limit
    470         let mut remaining = limit;
    471         while remaining > 0 {
    472             let next_success =
    473                 unsafe { bindings::ndb_search_profile_next(&mut ndb_search as *mut ndb_search) };
    474 
    475             if next_success == 0 {
    476                 break;
    477             }
    478 
    479             if let Some(key) = unsafe { ndb_search.key.as_ref() } {
    480                 results.push(&key.id);
    481             }
    482 
    483             remaining -= 1;
    484         }
    485 
    486         unsafe {
    487             bindings::ndb_search_profile_end(&mut ndb_search as *mut ndb_search);
    488         }
    489 
    490         Ok(results)
    491     }
    492 
    493     /// Get the underlying pointer to the context in C
    494     pub fn as_ptr(&self) -> *mut bindings::ndb {
    495         self.refs.ndb
    496     }
    497 }
    498 
    499 #[cfg(test)]
    500 mod tests {
    501     use super::*;
    502     use crate::config::Config;
    503     use crate::test_util;
    504     use tokio::time::{self, sleep, Duration};
    505 
    506     #[test]
    507     fn ndb_init_works() {
    508         let db = "target/testdbs/init_works";
    509         test_util::cleanup_db(db);
    510 
    511         {
    512             let cfg = Config::new();
    513             let _ = Ndb::new(db, &cfg).expect("ok");
    514         }
    515     }
    516 
    517     #[tokio::test]
    518     async fn query_works() {
    519         let db = "target/testdbs/query";
    520         test_util::cleanup_db(&db);
    521 
    522         {
    523             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    524 
    525             let filter = Filter::new().kinds(vec![1]).build();
    526             let filters = vec![filter];
    527 
    528             let sub = ndb.subscribe(&filters).expect("sub_id");
    529             let waiter = ndb.wait_for_notes(sub, 1);
    530             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    531             let res = waiter.await.expect("await ok");
    532             assert_eq!(res, vec![NoteKey::new(1)]);
    533             let txn = Transaction::new(&ndb).expect("txn");
    534             let res = ndb.query(&txn, &filters, 1).expect("query ok");
    535             assert_eq!(res.len(), 1);
    536             assert_eq!(
    537                 hex::encode(res[0].note.id()),
    538                 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3"
    539             );
    540         }
    541     }
    542 
    543     #[tokio::test]
    544     async fn search_profile_works() {
    545         let db = "target/testdbs/search_profile";
    546         test_util::cleanup_db(&db);
    547 
    548         {
    549             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    550 
    551             let filter = Filter::new().kinds(vec![0]).build();
    552             let filters = vec![filter];
    553 
    554             let sub_id = ndb.subscribe(&filters).expect("sub_id");
    555             let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    556             ndb.process_event(r#"["EVENT","b",{  "id": "0b9f0e14727733e430dcb00c69b12a76a1e100f419ce369df837f7eb33e4523c",  "pubkey": "3f770d65d3a764a9c5cb503ae123e62ec7598ad035d836e2a810f3877a745b24",  "created_at": 1736785355,  "kind": 0,  "tags": [    [      "alt",      "User profile for Derek Ross"    ],    [      "i",      "twitter:derekmross",      "1634343988407726081"    ],    [      "i",      "github:derekross",      "3edaf845975fa4500496a15039323fa3I"    ]  ],  "content": "{\"about\":\"Building NostrPlebs.com and NostrNests.com. The purple pill helps the orange pill go down. Nostr is the social glue that binds all of your apps together.\",\"banner\":\"https://i.nostr.build/O2JE.jpg\",\"display_name\":\"Derek Ross\",\"lud16\":\"derekross@strike.me\",\"name\":\"Derek Ross\",\"nip05\":\"derekross@nostrplebs.com\",\"picture\":\"https://i.nostr.build/MVIJ6OOFSUzzjVEc.jpg\",\"website\":\"https://nostrplebs.com\",\"created_at\":1707238393}",  "sig": "51e1225ccaf9b6739861dc218ac29045b09d5cf3a51b0ac6ea64bd36827d2d4394244e5f58a4e4a324c84eeda060e1a27e267e0d536e5a0e45b0b6bdc2c43bbc"}]"#).unwrap();
    557             ndb.process_event(r#"["EVENT","b",{  "id": "232a02ec7e1b2febf85370b52ed49bf34e2701c385c3d563511508dcf0767bcf",  "pubkey": "4a0510f26880d40e432f4865cb5714d9d3c200ca6ebb16b418ae6c555f574967",  "created_at": 1736017863,  "kind": 0,  "tags": [    [      "client",      "Damus Notedeck"    ]  ],  "content": "{\"display_name\":\"KernelKind\",\"name\":\"KernelKind\",\"about\":\"hello from notedeck!\",\"lud16\":\"kernelkind@getalby.com\"}",  "sig": "18c7dea0da3c30677d6822a31a6dfd9ebc02a18a31d69f0f2ac9ba88409e437d3db0ac433639111df1e4948a6d18451d1582173ee4fcd018d0ec92939f2c1506"}]"#).unwrap();
    558             ndb.process_event(r#"["EVENT","b",{  "id": "3e9e3b63a7831f09bf2963616a2440e6f30c6e95adbc7841d59376ec100ae9dc",  "pubkey": "32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245",  "created_at": 1737466417,  "kind": 0,  "tags": [],  "content": "{\"banner\":\"https://nostr.build/i/3d6f22d45d95ecc2c19b1acdec57aa15f2dba9c423b536e26fc62707c125f557.jpg\",\"website\":\"https://damus.io\",\"nip05\":\"_@jb55.com\",\"display_name\":\"\",\"about\":\"I made damus, zaps, and npubs. Bitcoin core, lightning, and nostr dev. \",\"picture\":\"https://cdn.jb55.com/img/red-me.jpg\",\"name\":\"jb55\",\"lud16\":\"jb55@sendsats.lol\"}",  "sig": "9cf1c89a4dbb2888e0f5fc300e56f93eb788bd84d3d0f8b52e4ac4abdd92256b0fb694bfd82d917c3923f01e8eac7886bb75c8043dcd9d4e070e4eaa5ab3bd0a"}]"#).unwrap();
    559             for _ in 0..3 {
    560                 let _ = sub.next().await;
    561             }
    562             let txn = Transaction::new(&ndb).expect("txn");
    563 
    564             let res = ndb.search_profile(&txn, "jb55", 1);
    565             assert!(res.is_ok());
    566             let res = res.unwrap();
    567             assert!(res.len() >= 1);
    568             let will_bytes: [u8; 32] = [
    569                 0x32, 0xe1, 0x82, 0x76, 0x35, 0x45, 0x0e, 0xbb, 0x3c, 0x5a, 0x7d, 0x12, 0xc1, 0xf8,
    570                 0xe7, 0xb2, 0xb5, 0x14, 0x43, 0x9a, 0xc1, 0x0a, 0x67, 0xee, 0xf3, 0xd9, 0xfd, 0x9c,
    571                 0x5c, 0x68, 0xe2, 0x45,
    572             ];
    573             assert_eq!(will_bytes, **res.first().unwrap());
    574 
    575             let res = ndb.search_profile(&txn, "kernel", 1);
    576             assert!(res.is_ok());
    577             let res = res.unwrap();
    578             assert!(res.len() >= 1);
    579             let kernelkind_bytes: [u8; 32] = [
    580                 0x4a, 0x05, 0x10, 0xf2, 0x68, 0x80, 0xd4, 0x0e, 0x43, 0x2f, 0x48, 0x65, 0xcb, 0x57,
    581                 0x14, 0xd9, 0xd3, 0xc2, 0x00, 0xca, 0x6e, 0xbb, 0x16, 0xb4, 0x18, 0xae, 0x6c, 0x55,
    582                 0x5f, 0x57, 0x49, 0x67,
    583             ];
    584             assert_eq!(kernelkind_bytes, **res.first().unwrap());
    585 
    586             let res = ndb.search_profile(&txn, "Derek", 1);
    587             assert!(res.is_ok());
    588             let res = res.unwrap();
    589             assert!(res.len() >= 1);
    590             let derek_bytes: [u8; 32] = [
    591                 0x3f, 0x77, 0x0d, 0x65, 0xd3, 0xa7, 0x64, 0xa9, 0xc5, 0xcb, 0x50, 0x3a, 0xe1, 0x23,
    592                 0xe6, 0x2e, 0xc7, 0x59, 0x8a, 0xd0, 0x35, 0xd8, 0x36, 0xe2, 0xa8, 0x10, 0xf3, 0x87,
    593                 0x7a, 0x74, 0x5b, 0x24,
    594             ];
    595             assert_eq!(derek_bytes, **res.first().unwrap());
    596         }
    597     }
    598 
    599     #[tokio::test]
    600     async fn subscribe_event_works() {
    601         let db = "target/testdbs/subscribe";
    602         test_util::cleanup_db(&db);
    603 
    604         {
    605             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    606 
    607             let filter = Filter::new().kinds(vec![1]).build();
    608 
    609             let sub = ndb.subscribe(&[filter]).expect("sub_id");
    610             let waiter = ndb.wait_for_notes(sub, 1);
    611             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    612             let res = waiter.await.expect("await ok");
    613             assert_eq!(res, vec![NoteKey::new(1)]);
    614         }
    615     }
    616 
    617     #[tokio::test]
    618     async fn multiple_events_work() {
    619         let db = "target/testdbs/multiple_events";
    620         test_util::cleanup_db(&db);
    621 
    622         {
    623             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    624 
    625             let filter = Filter::new().kinds(vec![1]).build();
    626 
    627             let sub_id = ndb.subscribe(&[filter]).expect("sub_id");
    628             let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    629 
    630             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    631             ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok");
    632             ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok");
    633 
    634             // this pause causes problems
    635             sleep(Duration::from_millis(100)).await;
    636 
    637             ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok");
    638             ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok");
    639             ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok");
    640 
    641             let timeout_duration = Duration::from_secs(2);
    642             let result = time::timeout(timeout_duration, async {
    643                 let mut count = 0;
    644                 while count < 6 {
    645                     let res = sub.next();
    646                     let _ = res.await.expect("await ok");
    647                     count += 1;
    648                     println!("saw an event, count = {}", count);
    649                 }
    650             })
    651             .await;
    652 
    653             match result {
    654                 Ok(_) => println!("Test completed successfully"),
    655                 Err(_) => panic!("Test timed out"),
    656             }
    657         }
    658     }
    659 
    660     #[tokio::test]
    661     async fn multiple_events_with_final_pause_work() {
    662         let db = "target/testdbs/multiple_events_with_final_pause";
    663         test_util::cleanup_db(&db);
    664 
    665         {
    666             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    667 
    668             let filter = Filter::new().kinds(vec![1]).build();
    669 
    670             let sub_id = ndb.subscribe(&[filter]).expect("sub_id");
    671             let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    672 
    673             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    674             ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok");
    675             ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok");
    676 
    677             sleep(Duration::from_millis(100)).await;
    678 
    679             ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok");
    680             ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok");
    681             ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok");
    682 
    683             // this final pause causes extra problems
    684             sleep(Duration::from_millis(100)).await;
    685 
    686             let timeout_duration = Duration::from_secs(2);
    687             let result = time::timeout(timeout_duration, async {
    688                 let mut count = 0;
    689                 while count < 6 {
    690                     let res = sub.next();
    691                     let _ = res.await.expect("await ok");
    692                     count += 1;
    693                     println!("saw an event, count = {}", count);
    694                 }
    695             })
    696             .await;
    697 
    698             match result {
    699                 Ok(_) => println!("Test completed successfully"),
    700                 Err(_) => panic!("Test timed out"),
    701             }
    702         }
    703     }
    704 
    705     #[test]
    706     fn poll_note_works() {
    707         let db = "target/testdbs/poll";
    708         test_util::cleanup_db(&db);
    709 
    710         {
    711             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    712 
    713             let filter = Filter::new().kinds(vec![1]).build();
    714 
    715             let sub = ndb.subscribe(&[filter]).expect("sub_id");
    716             ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    717             // this is too fast, we should have nothing
    718             let res = ndb.poll_for_notes(sub, 1);
    719             assert_eq!(res, vec![]);
    720 
    721             std::thread::sleep(std::time::Duration::from_millis(150));
    722             // now we should have something
    723             let res = ndb.poll_for_notes(sub, 1);
    724             assert_eq!(res, vec![NoteKey::new(1)]);
    725         }
    726     }
    727 
    728     #[test]
    729     fn process_event_works() {
    730         let db = "target/testdbs/event_works";
    731         test_util::cleanup_db(&db);
    732 
    733         {
    734             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    735             ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    736         }
    737 
    738         {
    739             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    740             let id =
    741                 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3")
    742                     .expect("hex id");
    743             let mut txn = Transaction::new(&ndb).expect("txn");
    744             let id_bytes: [u8; 32] = id.try_into().expect("id bytes");
    745             let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note");
    746             assert_eq!(note.kind(), 1);
    747         }
    748     }
    749 
    750     #[test]
    751     #[cfg(target_os = "windows")]
    752     fn test_windows_large_mapsize() {
    753         use std::{fs, path::Path};
    754 
    755         let db = "target/testdbs/windows_large_mapsize";
    756         test_util::cleanup_db(&db);
    757 
    758         {
    759             // 32 TiB should be way too big for CI
    760             let config =
    761                 Config::new().set_mapsize(1024usize * 1024usize * 1024usize * 1024usize * 32usize);
    762 
    763             // in this case, nostrdb should try to keep resizing to
    764             // smaller mapsizes until success
    765 
    766             let ndb = Ndb::new(db, &config);
    767 
    768             assert!(ndb.is_ok());
    769         }
    770 
    771         let file_len = fs::metadata(Path::new(db).join("data.mdb"))
    772             .expect("metadata")
    773             .len();
    774 
    775         assert!(file_len > 0);
    776 
    777         if cfg!(target_os = "windows") {
    778             // on windows the default mapsize will be 1MB when we fail
    779             // to open it
    780             assert_ne!(file_len, 1048576);
    781         } else {
    782             assert!(file_len < 1024u64 * 1024u64);
    783         }
    784 
    785         // we should definitely clean this up... especially on windows
    786         test_util::cleanup_db(&db);
    787     }
    788 
    789     #[tokio::test]
    790     async fn test_unsub_on_drop() {
    791         let db = "target/testdbs/test_unsub_on_drop";
    792         test_util::cleanup_db(&db);
    793 
    794         {
    795             let ndb = Ndb::new(db, &Config::new()).expect("ndb");
    796             let sub_id = {
    797                 let filter = Filter::new().kinds(vec![1]).build();
    798                 let filters = vec![filter];
    799 
    800                 let sub_id = ndb.subscribe(&filters).expect("sub_id");
    801                 let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    802 
    803                 let res = sub.next();
    804 
    805                 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    806 
    807                 let res = res.await.expect("await ok");
    808                 assert_eq!(res, vec![NoteKey::new(1)]);
    809 
    810                 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id));
    811                 sub_id
    812             };
    813 
    814             // ensure subscription state is removed after stream is dropped
    815             assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id));
    816             assert_eq!(ndb.subscription_count(), 0);
    817         }
    818 
    819         test_util::cleanup_db(&db);
    820     }
    821 
    822     #[tokio::test]
    823     async fn test_stream() {
    824         let db = "target/testdbs/test_stream";
    825         test_util::cleanup_db(&db);
    826 
    827         {
    828             let mut ndb = Ndb::new(db, &Config::new()).expect("ndb");
    829             let sub_id = {
    830                 let filter = Filter::new().kinds(vec![1]).build();
    831                 let filters = vec![filter];
    832 
    833                 let sub_id = ndb.subscribe(&filters).expect("sub_id");
    834                 let mut sub = sub_id.stream(&ndb).notes_per_await(1);
    835 
    836                 let res = sub.next();
    837 
    838                 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
    839 
    840                 let res = res.await.expect("await ok");
    841                 assert_eq!(res, vec![NoteKey::new(1)]);
    842 
    843                 // ensure that unsubscribing kills the stream
    844                 assert!(ndb.unsubscribe(sub_id).is_ok());
    845                 assert!(sub.next().await.is_none());
    846 
    847                 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id));
    848                 sub_id
    849             };
    850 
    851             // ensure subscription state is removed after stream is dropped
    852             assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id));
    853         }
    854 
    855         test_util::cleanup_db(&db);
    856     }
    857 }