ndb.rs (38845B)
1 use std::ffi::CString; 2 use std::ptr; 3 4 use crate::bindings::ndb_search; 5 use crate::{ 6 bindings, Blocks, Config, Error, Filter, IngestMetadata, Note, NoteKey, ProfileKey, 7 ProfileRecord, QueryResult, Result, Subscription, SubscriptionState, SubscriptionStream, 8 Transaction, 9 }; 10 use futures::StreamExt; 11 use std::collections::hash_map::Entry; 12 use std::collections::HashMap; 13 use std::fs; 14 use std::os::raw::c_int; 15 use std::path::Path; 16 use std::sync::{Arc, Mutex}; 17 use tracing::debug; 18 19 #[derive(Debug)] 20 struct NdbRef { 21 ndb: *mut bindings::ndb, 22 rust_cb_ctx: *mut ::std::os::raw::c_void, 23 } 24 25 /// SAFETY: thread safety is ensured by nostrdb 26 unsafe impl Send for NdbRef {} 27 28 /// SAFETY: thread safety is ensured by nostrdb 29 unsafe impl Sync for NdbRef {} 30 31 /// The database is automatically closed when [Ndb] is [Drop]ped. 32 impl Drop for NdbRef { 33 fn drop(&mut self) { 34 unsafe { 35 bindings::ndb_destroy(self.ndb); 36 37 if !self.rust_cb_ctx.is_null() { 38 // Rebuild the Box from the raw pointer and drop it. 39 let _ = Box::from_raw(self.rust_cb_ctx as *mut Box<dyn FnMut()>); 40 } 41 } 42 } 43 } 44 45 type SubMap = HashMap<Subscription, SubscriptionState>; 46 47 /// A nostrdb context. Construct one of these with [Ndb::new]. 48 #[derive(Debug, Clone)] 49 pub struct Ndb { 50 refs: Arc<NdbRef>, 51 52 /// Track query future states 53 pub(crate) subs: Arc<Mutex<SubMap>>, 54 } 55 56 impl Ndb { 57 /// Construct a new nostrdb context. Takes a directory where the database 58 /// is/will be located and a nostrdb config. 59 pub fn new(db_dir: &str, config: &Config) -> Result<Self> { 60 let db_dir_cstr = match CString::new(db_dir) { 61 Ok(cstr) => cstr, 62 Err(_) => return Err(Error::DbOpenFailed), 63 }; 64 let mut ndb: *mut bindings::ndb = ptr::null_mut(); 65 66 let path = Path::new(db_dir); 67 if !path.exists() { 68 let _ = fs::create_dir_all(path); 69 } 70 71 let min_mapsize = 1024 * 1024 * 512; 72 let mut mapsize = config.config.mapsize; 73 let config = *config; 74 75 let prev_callback = config.config.sub_cb; 76 let prev_callback_ctx = config.config.sub_cb_ctx; 77 let subs = Arc::new(Mutex::new(SubMap::default())); 78 let subs_clone = subs.clone(); 79 80 // We need to register our own callback so that we can wake 81 // query futures 82 let mut config = config.set_sub_callback(move |sub_id: u64| { 83 let mut map = subs_clone.lock().unwrap(); 84 if let Some(s) = map.get_mut(&Subscription::new(sub_id)) { 85 if let Some(w) = s.waker.take() { 86 w.wake(); 87 } 88 } 89 90 if let Some(pcb) = prev_callback { 91 unsafe { 92 pcb(prev_callback_ctx, sub_id); 93 }; 94 } 95 }); 96 97 let result = loop { 98 let result = 99 unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) }; 100 101 if result == 0 { 102 mapsize /= 2; 103 config = config.set_mapsize(mapsize); 104 debug!("ndb init failed, reducing mapsize to {}", mapsize); 105 106 if mapsize > min_mapsize { 107 continue; 108 } else { 109 break 0; 110 } 111 } else { 112 break result; 113 } 114 }; 115 116 if result == 0 { 117 return Err(Error::DbOpenFailed); 118 } 119 120 let rust_cb_ctx = config.config.sub_cb_ctx; 121 let refs = Arc::new(NdbRef { ndb, rust_cb_ctx }); 122 123 Ok(Ndb { refs, subs }) 124 } 125 126 /// Ingest a relay or client sent event, with optional relay metadata. 127 /// This function returns immediately and doesn't provide any information on 128 /// if ingestion was successful or not. 129 pub fn process_event_with(&self, json: &str, mut meta: IngestMetadata) -> Result<()> { 130 // Convert the Rust string to a C-style string 131 let c_json = CString::new(json)?; 132 let c_json_ptr = c_json.as_ptr(); 133 134 // Get the length of the string 135 let len = json.len() as libc::c_int; 136 137 let res = unsafe { 138 bindings::ndb_process_event_with(self.as_ptr(), c_json_ptr, len, meta.as_mut_ptr()) 139 }; 140 141 if res == 0 { 142 return Err(Error::NoteProcessFailed); 143 } 144 145 Ok(()) 146 } 147 148 /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]` 149 /// This function returns immediately and doesn't provide any information on 150 /// if ingestion was successful or not. 151 #[deprecated( 152 note = "Use `process_event_with` with IngestMetadata::new().client(false).relay(...)" 153 )] 154 pub fn process_event(&self, json: &str) -> Result<()> { 155 self.process_event_with(json, IngestMetadata::new().client(false)) 156 } 157 158 /// Ingest a client-sent event in the form `["EVENT", {"id:"...}]` 159 /// This function returns immediately and doesn't provide any information on 160 /// if ingestion was successful or not. 161 #[deprecated( 162 note = "Use `process_event_with` with IngestMetadata::new().client(true).relay(...)" 163 )] 164 pub fn process_client_event(&self, json: &str) -> Result<()> { 165 self.process_event_with(json, IngestMetadata::new().client(true)) 166 } 167 168 pub fn query<'a>( 169 &self, 170 txn: &'a Transaction, 171 filters: &[Filter], 172 max_results: i32, 173 ) -> Result<Vec<QueryResult<'a>>> { 174 let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect(); 175 let mut out: Vec<bindings::ndb_query_result> = vec![]; 176 let mut returned: i32 = 0; 177 out.reserve_exact(max_results as usize); 178 let res = unsafe { 179 bindings::ndb_query( 180 txn.as_mut_ptr(), 181 ndb_filters.as_mut_ptr(), 182 ndb_filters.len() as i32, 183 out.as_mut_ptr(), 184 max_results, 185 &mut returned as *mut i32, 186 ) 187 }; 188 if res == 1 { 189 unsafe { 190 out.set_len(returned as usize); 191 }; 192 Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect()) 193 } else { 194 Err(Error::QueryError) 195 } 196 } 197 198 pub fn subscription_count(&self) -> u32 { 199 unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 } 200 } 201 202 pub fn unsubscribe(&mut self, sub: Subscription) -> Result<()> { 203 let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub.id()) }; 204 205 debug!( 206 "unsubscribed from {}, sub count {}", 207 sub.id(), 208 self.subscription_count() 209 ); 210 211 // mark the subscription as done if it exists in our stream map 212 { 213 let mut map = self.subs.lock().unwrap(); 214 if let Entry::Occupied(mut entry) = map.entry(sub) { 215 entry.get_mut().done = true; 216 } 217 } 218 219 if r == 0 { 220 Err(Error::SubscriptionError) 221 } else { 222 Ok(()) 223 } 224 } 225 226 pub fn subscribe(&self, filters: &[Filter]) -> Result<Subscription> { 227 unsafe { 228 let mut ndb_filters: Vec<bindings::ndb_filter> = 229 filters.iter().map(|a| a.data).collect(); 230 let id = bindings::ndb_subscribe( 231 self.as_ptr(), 232 ndb_filters.as_mut_ptr(), 233 filters.len() as i32, 234 ); 235 if id == 0 { 236 Err(Error::SubscriptionError) 237 } else { 238 Ok(Subscription::new(id)) 239 } 240 } 241 } 242 243 pub fn poll_for_notes(&self, sub: Subscription, max_notes: u32) -> Vec<NoteKey> { 244 let mut vec = vec![]; 245 vec.reserve_exact(max_notes as usize); 246 247 unsafe { 248 let res = bindings::ndb_poll_for_notes( 249 self.as_ptr(), 250 sub.id(), 251 vec.as_mut_ptr(), 252 max_notes as c_int, 253 ); 254 vec.set_len(res as usize); 255 }; 256 257 vec.into_iter().map(NoteKey::new).collect() 258 } 259 260 pub async fn wait_for_notes( 261 &self, 262 sub_id: Subscription, 263 max_notes: u32, 264 ) -> Result<Vec<NoteKey>> { 265 let mut stream = SubscriptionStream::new(self.clone(), sub_id).notes_per_await(max_notes); 266 267 match stream.next().await { 268 Some(res) => Ok(res), 269 None => Err(Error::SubscriptionError), 270 } 271 } 272 273 pub fn get_profile_by_key<'a>( 274 &self, 275 transaction: &'a Transaction, 276 key: ProfileKey, 277 ) -> Result<ProfileRecord<'a>> { 278 let mut len: usize = 0; 279 280 let profile_record_ptr = unsafe { 281 bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len) 282 }; 283 284 if profile_record_ptr.is_null() { 285 // Handle null pointer (e.g., note not found or error occurred) 286 return Err(Error::NotFound); 287 } 288 289 // Convert the raw pointer to a Note instance 290 Ok(ProfileRecord::new_transactional( 291 profile_record_ptr, 292 len, 293 key, 294 transaction, 295 )) 296 } 297 298 pub fn get_profile_by_pubkey<'a>( 299 &self, 300 transaction: &'a Transaction, 301 id: &[u8; 32], 302 ) -> Result<ProfileRecord<'a>> { 303 let mut len: usize = 0; 304 let mut primkey: u64 = 0; 305 306 let profile_record_ptr = unsafe { 307 bindings::ndb_get_profile_by_pubkey( 308 transaction.as_mut_ptr(), 309 id.as_ptr(), 310 &mut len, 311 &mut primkey, 312 ) 313 }; 314 315 if profile_record_ptr.is_null() { 316 // Handle null pointer (e.g., note not found or error occurred) 317 return Err(Error::NotFound); 318 } 319 320 // Convert the raw pointer to a Note instance 321 Ok(ProfileRecord::new_transactional( 322 profile_record_ptr, 323 len, 324 ProfileKey::new(primkey), 325 transaction, 326 )) 327 } 328 329 pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<NoteKey> { 330 let res = unsafe { 331 bindings::ndb_get_notekey_by_id( 332 txn.as_mut_ptr(), 333 id.as_ptr() as *const ::std::os::raw::c_uchar, 334 ) 335 }; 336 337 if res == 0 { 338 return Err(Error::NotFound); 339 } 340 341 Ok(NoteKey::new(res)) 342 } 343 344 pub fn get_profilekey_by_pubkey( 345 &self, 346 txn: &Transaction, 347 pubkey: &[u8; 32], 348 ) -> Result<ProfileKey> { 349 let res = unsafe { 350 bindings::ndb_get_profilekey_by_pubkey( 351 txn.as_mut_ptr(), 352 pubkey.as_ptr() as *const ::std::os::raw::c_uchar, 353 ) 354 }; 355 356 if res == 0 { 357 return Err(Error::NotFound); 358 } 359 360 Ok(ProfileKey::new(res)) 361 } 362 363 pub fn get_blocks_by_key<'a>( 364 &self, 365 txn: &'a Transaction, 366 note_key: NoteKey, 367 ) -> Result<Blocks<'a>> { 368 let blocks_ptr = unsafe { 369 bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64()) 370 }; 371 372 if blocks_ptr.is_null() { 373 return Err(Error::NotFound); 374 } 375 376 Ok(Blocks::new_transactional(blocks_ptr, txn)) 377 } 378 379 pub fn get_note_by_key<'a>( 380 &self, 381 transaction: &'a Transaction, 382 note_key: NoteKey, 383 ) -> Result<Note<'a>> { 384 let mut len: usize = 0; 385 386 let note_ptr = unsafe { 387 bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len) 388 }; 389 390 if note_ptr.is_null() { 391 // Handle null pointer (e.g., note not found or error occurred) 392 return Err(Error::NotFound); 393 } 394 395 // Convert the raw pointer to a Note instance 396 Ok(Note::new_transactional( 397 note_ptr, 398 len, 399 note_key, 400 transaction, 401 )) 402 } 403 404 /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id 405 pub fn get_note_by_id<'a>( 406 &self, 407 transaction: &'a Transaction, 408 id: &[u8; 32], 409 ) -> Result<Note<'a>> { 410 let mut len: usize = 0; 411 let mut primkey: u64 = 0; 412 413 let note_ptr = unsafe { 414 bindings::ndb_get_note_by_id( 415 transaction.as_mut_ptr(), 416 id.as_ptr(), 417 &mut len, 418 &mut primkey, 419 ) 420 }; 421 422 if note_ptr.is_null() { 423 // Handle null pointer (e.g., note not found or error occurred) 424 return Err(Error::NotFound); 425 } 426 427 // Convert the raw pointer to a Note instance 428 Ok(Note::new_transactional( 429 note_ptr, 430 len, 431 NoteKey::new(primkey), 432 transaction, 433 )) 434 } 435 436 pub fn search_profile<'a>( 437 &self, 438 transaction: &'a Transaction, 439 search: &str, 440 limit: u32, 441 ) -> Result<Vec<&'a [u8; 32]>> { 442 let mut results = Vec::new(); 443 444 let mut ndb_search = ndb_search { 445 key: std::ptr::null_mut(), 446 profile_key: 0, 447 cursor: std::ptr::null_mut(), 448 }; 449 450 let c_query = CString::new(search).map_err(|_| Error::DecodeError)?; 451 452 let success = unsafe { 453 bindings::ndb_search_profile( 454 transaction.as_mut_ptr(), 455 &mut ndb_search as *mut ndb_search, 456 c_query.as_c_str().as_ptr(), 457 ) 458 }; 459 460 if success == 0 { 461 return Ok(results); 462 } 463 464 // Add the first result 465 if let Some(key) = unsafe { ndb_search.key.as_ref() } { 466 results.push(&key.id); 467 } 468 469 // Iterate through additional results up to the limit 470 let mut remaining = limit; 471 while remaining > 0 { 472 let next_success = 473 unsafe { bindings::ndb_search_profile_next(&mut ndb_search as *mut ndb_search) }; 474 475 if next_success == 0 { 476 break; 477 } 478 479 if let Some(key) = unsafe { ndb_search.key.as_ref() } { 480 results.push(&key.id); 481 } 482 483 remaining -= 1; 484 } 485 486 unsafe { 487 bindings::ndb_search_profile_end(&mut ndb_search as *mut ndb_search); 488 } 489 490 Ok(results) 491 } 492 493 /// Get the underlying pointer to the context in C 494 pub fn as_ptr(&self) -> *mut bindings::ndb { 495 self.refs.ndb 496 } 497 } 498 499 #[cfg(test)] 500 mod tests { 501 use super::*; 502 use crate::config::Config; 503 use crate::test_util; 504 use tokio::time::{self, sleep, Duration}; 505 506 #[test] 507 fn ndb_init_works() { 508 let db = "target/testdbs/init_works"; 509 test_util::cleanup_db(db); 510 511 { 512 let cfg = Config::new(); 513 let _ = Ndb::new(db, &cfg).expect("ok"); 514 } 515 } 516 517 #[tokio::test] 518 async fn query_works() { 519 let db = "target/testdbs/query"; 520 test_util::cleanup_db(&db); 521 522 { 523 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 524 525 let filter = Filter::new().kinds(vec![1]).build(); 526 let filters = vec![filter]; 527 528 let sub = ndb.subscribe(&filters).expect("sub_id"); 529 let waiter = ndb.wait_for_notes(sub, 1); 530 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 531 let res = waiter.await.expect("await ok"); 532 assert_eq!(res, vec![NoteKey::new(1)]); 533 let txn = Transaction::new(&ndb).expect("txn"); 534 let res = ndb.query(&txn, &filters, 1).expect("query ok"); 535 assert_eq!(res.len(), 1); 536 assert_eq!( 537 hex::encode(res[0].note.id()), 538 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3" 539 ); 540 } 541 } 542 543 #[tokio::test] 544 async fn search_profile_works() { 545 let db = "target/testdbs/search_profile"; 546 test_util::cleanup_db(&db); 547 548 { 549 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 550 551 let filter = Filter::new().kinds(vec![0]).build(); 552 let filters = vec![filter]; 553 554 let sub_id = ndb.subscribe(&filters).expect("sub_id"); 555 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 556 ndb.process_event(r#"["EVENT","b",{ "id": "0b9f0e14727733e430dcb00c69b12a76a1e100f419ce369df837f7eb33e4523c", "pubkey": "3f770d65d3a764a9c5cb503ae123e62ec7598ad035d836e2a810f3877a745b24", "created_at": 1736785355, "kind": 0, "tags": [ [ "alt", "User profile for Derek Ross" ], [ "i", "twitter:derekmross", "1634343988407726081" ], [ "i", "github:derekross", "3edaf845975fa4500496a15039323fa3I" ] ], "content": "{\"about\":\"Building NostrPlebs.com and NostrNests.com. The purple pill helps the orange pill go down. Nostr is the social glue that binds all of your apps together.\",\"banner\":\"https://i.nostr.build/O2JE.jpg\",\"display_name\":\"Derek Ross\",\"lud16\":\"derekross@strike.me\",\"name\":\"Derek Ross\",\"nip05\":\"derekross@nostrplebs.com\",\"picture\":\"https://i.nostr.build/MVIJ6OOFSUzzjVEc.jpg\",\"website\":\"https://nostrplebs.com\",\"created_at\":1707238393}", "sig": "51e1225ccaf9b6739861dc218ac29045b09d5cf3a51b0ac6ea64bd36827d2d4394244e5f58a4e4a324c84eeda060e1a27e267e0d536e5a0e45b0b6bdc2c43bbc"}]"#).unwrap(); 557 ndb.process_event(r#"["EVENT","b",{ "id": "232a02ec7e1b2febf85370b52ed49bf34e2701c385c3d563511508dcf0767bcf", "pubkey": "4a0510f26880d40e432f4865cb5714d9d3c200ca6ebb16b418ae6c555f574967", "created_at": 1736017863, "kind": 0, "tags": [ [ "client", "Damus Notedeck" ] ], "content": "{\"display_name\":\"KernelKind\",\"name\":\"KernelKind\",\"about\":\"hello from notedeck!\",\"lud16\":\"kernelkind@getalby.com\"}", "sig": "18c7dea0da3c30677d6822a31a6dfd9ebc02a18a31d69f0f2ac9ba88409e437d3db0ac433639111df1e4948a6d18451d1582173ee4fcd018d0ec92939f2c1506"}]"#).unwrap(); 558 ndb.process_event(r#"["EVENT","b",{ "id": "3e9e3b63a7831f09bf2963616a2440e6f30c6e95adbc7841d59376ec100ae9dc", "pubkey": "32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245", "created_at": 1737466417, "kind": 0, "tags": [], "content": "{\"banner\":\"https://nostr.build/i/3d6f22d45d95ecc2c19b1acdec57aa15f2dba9c423b536e26fc62707c125f557.jpg\",\"website\":\"https://damus.io\",\"nip05\":\"_@jb55.com\",\"display_name\":\"\",\"about\":\"I made damus, zaps, and npubs. Bitcoin core, lightning, and nostr dev. \",\"picture\":\"https://cdn.jb55.com/img/red-me.jpg\",\"name\":\"jb55\",\"lud16\":\"jb55@sendsats.lol\"}", "sig": "9cf1c89a4dbb2888e0f5fc300e56f93eb788bd84d3d0f8b52e4ac4abdd92256b0fb694bfd82d917c3923f01e8eac7886bb75c8043dcd9d4e070e4eaa5ab3bd0a"}]"#).unwrap(); 559 for _ in 0..3 { 560 let _ = sub.next().await; 561 } 562 let txn = Transaction::new(&ndb).expect("txn"); 563 564 let res = ndb.search_profile(&txn, "jb55", 1); 565 assert!(res.is_ok()); 566 let res = res.unwrap(); 567 assert!(res.len() >= 1); 568 let will_bytes: [u8; 32] = [ 569 0x32, 0xe1, 0x82, 0x76, 0x35, 0x45, 0x0e, 0xbb, 0x3c, 0x5a, 0x7d, 0x12, 0xc1, 0xf8, 570 0xe7, 0xb2, 0xb5, 0x14, 0x43, 0x9a, 0xc1, 0x0a, 0x67, 0xee, 0xf3, 0xd9, 0xfd, 0x9c, 571 0x5c, 0x68, 0xe2, 0x45, 572 ]; 573 assert_eq!(will_bytes, **res.first().unwrap()); 574 575 let res = ndb.search_profile(&txn, "kernel", 1); 576 assert!(res.is_ok()); 577 let res = res.unwrap(); 578 assert!(res.len() >= 1); 579 let kernelkind_bytes: [u8; 32] = [ 580 0x4a, 0x05, 0x10, 0xf2, 0x68, 0x80, 0xd4, 0x0e, 0x43, 0x2f, 0x48, 0x65, 0xcb, 0x57, 581 0x14, 0xd9, 0xd3, 0xc2, 0x00, 0xca, 0x6e, 0xbb, 0x16, 0xb4, 0x18, 0xae, 0x6c, 0x55, 582 0x5f, 0x57, 0x49, 0x67, 583 ]; 584 assert_eq!(kernelkind_bytes, **res.first().unwrap()); 585 586 let res = ndb.search_profile(&txn, "Derek", 1); 587 assert!(res.is_ok()); 588 let res = res.unwrap(); 589 assert!(res.len() >= 1); 590 let derek_bytes: [u8; 32] = [ 591 0x3f, 0x77, 0x0d, 0x65, 0xd3, 0xa7, 0x64, 0xa9, 0xc5, 0xcb, 0x50, 0x3a, 0xe1, 0x23, 592 0xe6, 0x2e, 0xc7, 0x59, 0x8a, 0xd0, 0x35, 0xd8, 0x36, 0xe2, 0xa8, 0x10, 0xf3, 0x87, 593 0x7a, 0x74, 0x5b, 0x24, 594 ]; 595 assert_eq!(derek_bytes, **res.first().unwrap()); 596 } 597 } 598 599 #[tokio::test] 600 async fn subscribe_event_works() { 601 let db = "target/testdbs/subscribe"; 602 test_util::cleanup_db(&db); 603 604 { 605 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 606 607 let filter = Filter::new().kinds(vec![1]).build(); 608 609 let sub = ndb.subscribe(&[filter]).expect("sub_id"); 610 let waiter = ndb.wait_for_notes(sub, 1); 611 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 612 let res = waiter.await.expect("await ok"); 613 assert_eq!(res, vec![NoteKey::new(1)]); 614 } 615 } 616 617 #[tokio::test] 618 async fn multiple_events_work() { 619 let db = "target/testdbs/multiple_events"; 620 test_util::cleanup_db(&db); 621 622 { 623 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 624 625 let filter = Filter::new().kinds(vec![1]).build(); 626 627 let sub_id = ndb.subscribe(&[filter]).expect("sub_id"); 628 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 629 630 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 631 ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok"); 632 ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok"); 633 634 // this pause causes problems 635 sleep(Duration::from_millis(100)).await; 636 637 ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok"); 638 ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok"); 639 ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok"); 640 641 let timeout_duration = Duration::from_secs(2); 642 let result = time::timeout(timeout_duration, async { 643 let mut count = 0; 644 while count < 6 { 645 let res = sub.next(); 646 let _ = res.await.expect("await ok"); 647 count += 1; 648 println!("saw an event, count = {}", count); 649 } 650 }) 651 .await; 652 653 match result { 654 Ok(_) => println!("Test completed successfully"), 655 Err(_) => panic!("Test timed out"), 656 } 657 } 658 } 659 660 #[tokio::test] 661 async fn multiple_events_with_final_pause_work() { 662 let db = "target/testdbs/multiple_events_with_final_pause"; 663 test_util::cleanup_db(&db); 664 665 { 666 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 667 668 let filter = Filter::new().kinds(vec![1]).build(); 669 670 let sub_id = ndb.subscribe(&[filter]).expect("sub_id"); 671 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 672 673 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 674 ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok"); 675 ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok"); 676 677 sleep(Duration::from_millis(100)).await; 678 679 ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok"); 680 ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok"); 681 ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok"); 682 683 // this final pause causes extra problems 684 sleep(Duration::from_millis(100)).await; 685 686 let timeout_duration = Duration::from_secs(2); 687 let result = time::timeout(timeout_duration, async { 688 let mut count = 0; 689 while count < 6 { 690 let res = sub.next(); 691 let _ = res.await.expect("await ok"); 692 count += 1; 693 println!("saw an event, count = {}", count); 694 } 695 }) 696 .await; 697 698 match result { 699 Ok(_) => println!("Test completed successfully"), 700 Err(_) => panic!("Test timed out"), 701 } 702 } 703 } 704 705 #[test] 706 fn poll_note_works() { 707 let db = "target/testdbs/poll"; 708 test_util::cleanup_db(&db); 709 710 { 711 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 712 713 let filter = Filter::new().kinds(vec![1]).build(); 714 715 let sub = ndb.subscribe(&[filter]).expect("sub_id"); 716 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 717 // this is too fast, we should have nothing 718 let res = ndb.poll_for_notes(sub, 1); 719 assert_eq!(res, vec![]); 720 721 std::thread::sleep(std::time::Duration::from_millis(150)); 722 // now we should have something 723 let res = ndb.poll_for_notes(sub, 1); 724 assert_eq!(res, vec![NoteKey::new(1)]); 725 } 726 } 727 728 #[test] 729 fn process_event_works() { 730 let db = "target/testdbs/event_works"; 731 test_util::cleanup_db(&db); 732 733 { 734 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 735 ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 736 } 737 738 { 739 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 740 let id = 741 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3") 742 .expect("hex id"); 743 let mut txn = Transaction::new(&ndb).expect("txn"); 744 let id_bytes: [u8; 32] = id.try_into().expect("id bytes"); 745 let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note"); 746 assert_eq!(note.kind(), 1); 747 } 748 } 749 750 #[test] 751 #[cfg(target_os = "windows")] 752 fn test_windows_large_mapsize() { 753 use std::{fs, path::Path}; 754 755 let db = "target/testdbs/windows_large_mapsize"; 756 test_util::cleanup_db(&db); 757 758 { 759 // 32 TiB should be way too big for CI 760 let config = 761 Config::new().set_mapsize(1024usize * 1024usize * 1024usize * 1024usize * 32usize); 762 763 // in this case, nostrdb should try to keep resizing to 764 // smaller mapsizes until success 765 766 let ndb = Ndb::new(db, &config); 767 768 assert!(ndb.is_ok()); 769 } 770 771 let file_len = fs::metadata(Path::new(db).join("data.mdb")) 772 .expect("metadata") 773 .len(); 774 775 assert!(file_len > 0); 776 777 if cfg!(target_os = "windows") { 778 // on windows the default mapsize will be 1MB when we fail 779 // to open it 780 assert_ne!(file_len, 1048576); 781 } else { 782 assert!(file_len < 1024u64 * 1024u64); 783 } 784 785 // we should definitely clean this up... especially on windows 786 test_util::cleanup_db(&db); 787 } 788 789 #[tokio::test] 790 async fn test_unsub_on_drop() { 791 let db = "target/testdbs/test_unsub_on_drop"; 792 test_util::cleanup_db(&db); 793 794 { 795 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 796 let sub_id = { 797 let filter = Filter::new().kinds(vec![1]).build(); 798 let filters = vec![filter]; 799 800 let sub_id = ndb.subscribe(&filters).expect("sub_id"); 801 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 802 803 let res = sub.next(); 804 805 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 806 807 let res = res.await.expect("await ok"); 808 assert_eq!(res, vec![NoteKey::new(1)]); 809 810 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id)); 811 sub_id 812 }; 813 814 // ensure subscription state is removed after stream is dropped 815 assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id)); 816 assert_eq!(ndb.subscription_count(), 0); 817 } 818 819 test_util::cleanup_db(&db); 820 } 821 822 #[tokio::test] 823 async fn test_stream() { 824 let db = "target/testdbs/test_stream"; 825 test_util::cleanup_db(&db); 826 827 { 828 let mut ndb = Ndb::new(db, &Config::new()).expect("ndb"); 829 let sub_id = { 830 let filter = Filter::new().kinds(vec![1]).build(); 831 let filters = vec![filter]; 832 833 let sub_id = ndb.subscribe(&filters).expect("sub_id"); 834 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 835 836 let res = sub.next(); 837 838 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 839 840 let res = res.await.expect("await ok"); 841 assert_eq!(res, vec![NoteKey::new(1)]); 842 843 // ensure that unsubscribing kills the stream 844 assert!(ndb.unsubscribe(sub_id).is_ok()); 845 assert!(sub.next().await.is_none()); 846 847 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id)); 848 sub_id 849 }; 850 851 // ensure subscription state is removed after stream is dropped 852 assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id)); 853 } 854 855 test_util::cleanup_db(&db); 856 } 857 }