ndb.rs (38972B)
1 use std::ffi::CString; 2 use std::ptr; 3 4 use crate::bindings::ndb_search; 5 use crate::{ 6 bindings, Blocks, Config, Error, Filter, IngestMetadata, Note, NoteKey, NoteMetadata, 7 ProfileKey, ProfileRecord, QueryResult, Result, Subscription, SubscriptionState, 8 SubscriptionStream, Transaction, 9 }; 10 use futures::StreamExt; 11 use std::collections::hash_map::Entry; 12 use std::collections::HashMap; 13 use std::fs; 14 use std::os::raw::c_int; 15 use std::path::Path; 16 use std::sync::{Arc, Mutex}; 17 use tracing::debug; 18 19 #[derive(Debug)] 20 struct NdbRef { 21 ndb: *mut bindings::ndb, 22 rust_cb_ctx: *mut ::std::os::raw::c_void, 23 } 24 25 /// SAFETY: thread safety is ensured by nostrdb 26 unsafe impl Send for NdbRef {} 27 28 /// SAFETY: thread safety is ensured by nostrdb 29 unsafe impl Sync for NdbRef {} 30 31 /// The database is automatically closed when [Ndb] is [Drop]ped. 32 impl Drop for NdbRef { 33 fn drop(&mut self) { 34 unsafe { 35 bindings::ndb_destroy(self.ndb); 36 37 if !self.rust_cb_ctx.is_null() { 38 // Rebuild the Box from the raw pointer and drop it. 39 let _ = Box::from_raw(self.rust_cb_ctx as *mut Box<dyn FnMut()>); 40 } 41 } 42 } 43 } 44 45 type SubMap = HashMap<Subscription, SubscriptionState>; 46 47 /// A nostrdb context. Construct one of these with [Ndb::new]. 48 #[derive(Debug, Clone)] 49 pub struct Ndb { 50 refs: Arc<NdbRef>, 51 52 /// Track query future states 53 pub(crate) subs: Arc<Mutex<SubMap>>, 54 } 55 56 impl Ndb { 57 /// Construct a new nostrdb context. Takes a directory where the database 58 /// is/will be located and a nostrdb config. 59 pub fn new(db_dir: &str, config: &Config) -> Result<Self> { 60 let db_dir_cstr = match CString::new(db_dir) { 61 Ok(cstr) => cstr, 62 Err(_) => return Err(Error::DbOpenFailed), 63 }; 64 let mut ndb: *mut bindings::ndb = ptr::null_mut(); 65 66 let path = Path::new(db_dir); 67 if !path.exists() { 68 let _ = fs::create_dir_all(path); 69 } 70 71 let min_mapsize = 1024 * 1024 * 512; 72 let mut mapsize = config.config.mapsize; 73 let config = *config; 74 75 let prev_callback = config.config.sub_cb; 76 let prev_callback_ctx = config.config.sub_cb_ctx; 77 let subs = Arc::new(Mutex::new(SubMap::default())); 78 let subs_clone = subs.clone(); 79 80 // We need to register our own callback so that we can wake 81 // query futures 82 let mut config = config.set_sub_callback(move |sub_id: u64| { 83 let mut map = subs_clone.lock().unwrap(); 84 if let Some(s) = map.get_mut(&Subscription::new(sub_id)) { 85 if let Some(w) = s.waker.take() { 86 w.wake(); 87 } 88 } 89 90 if let Some(pcb) = prev_callback { 91 unsafe { 92 pcb(prev_callback_ctx, sub_id); 93 }; 94 } 95 }); 96 97 let result = loop { 98 let result = 99 unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) }; 100 101 if result == 0 { 102 mapsize /= 2; 103 config = config.set_mapsize(mapsize); 104 debug!("ndb init failed, reducing mapsize to {}", mapsize); 105 106 if mapsize > min_mapsize { 107 continue; 108 } else { 109 break 0; 110 } 111 } else { 112 break result; 113 } 114 }; 115 116 if result == 0 { 117 return Err(Error::DbOpenFailed); 118 } 119 120 let rust_cb_ctx = config.config.sub_cb_ctx; 121 let refs = Arc::new(NdbRef { ndb, rust_cb_ctx }); 122 123 Ok(Ndb { refs, subs }) 124 } 125 126 /// Ingest a relay or client sent event, with optional relay metadata. 127 /// This function returns immediately and doesn't provide any information on 128 /// if ingestion was successful or not. 129 pub fn process_event_with(&self, json: &str, mut meta: IngestMetadata) -> Result<()> { 130 // Convert the Rust string to a C-style string 131 let c_json = CString::new(json)?; 132 let c_json_ptr = c_json.as_ptr(); 133 134 // Get the length of the string 135 let len = json.len() as libc::c_int; 136 137 let res = unsafe { 138 bindings::ndb_process_event_with(self.as_ptr(), c_json_ptr, len, meta.as_mut_ptr()) 139 }; 140 141 if res == 0 { 142 return Err(Error::NoteProcessFailed); 143 } 144 145 Ok(()) 146 } 147 148 /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]` 149 /// This function returns immediately and doesn't provide any information on 150 /// if ingestion was successful or not. 151 pub fn process_event(&self, json: &str) -> Result<()> { 152 self.process_event_with(json, IngestMetadata::new().client(false)) 153 } 154 155 /// Ingest a client-sent event in the form `["EVENT", {"id:"...}]` 156 /// This function returns immediately and doesn't provide any information on 157 /// if ingestion was successful or not. 158 pub fn process_client_event(&self, json: &str) -> Result<()> { 159 self.process_event_with(json, IngestMetadata::new().client(true)) 160 } 161 162 /// Attempt to unwrap any unprocessed giftwraps 163 pub fn process_giftwraps(&self, txn: &Transaction) { 164 unsafe { 165 bindings::ndb_process_giftwraps(self.as_ptr(), txn.as_mut_ptr()); 166 } 167 } 168 169 /// Add a secret key to nostrdb's note ingester threads so that 170 /// nostrdb can unwrap incoming giftwraps. 171 pub fn add_key(&self, key: &[u8; 32]) -> bool { 172 unsafe { bindings::ndb_add_key(self.as_ptr(), key as *const u8 as *mut u8) != 0 } 173 } 174 175 pub fn query<'a>( 176 &self, 177 txn: &'a Transaction, 178 filters: &[Filter], 179 max_results: i32, 180 ) -> Result<Vec<QueryResult<'a>>> { 181 let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect(); 182 let mut out: Vec<bindings::ndb_query_result> = vec![]; 183 let mut returned: i32 = 0; 184 out.reserve_exact(max_results as usize); 185 let res = unsafe { 186 bindings::ndb_query( 187 txn.as_mut_ptr(), 188 ndb_filters.as_mut_ptr(), 189 ndb_filters.len() as i32, 190 out.as_mut_ptr(), 191 max_results, 192 &mut returned as *mut i32, 193 ) 194 }; 195 if res == 1 { 196 unsafe { 197 out.set_len(returned as usize); 198 }; 199 Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect()) 200 } else { 201 Err(Error::QueryError) 202 } 203 } 204 205 pub fn subscription_count(&self) -> u32 { 206 unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 } 207 } 208 209 pub fn unsubscribe(&mut self, sub: Subscription) -> Result<()> { 210 let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub.id()) }; 211 212 debug!( 213 "unsubscribed from {}, sub count {}", 214 sub.id(), 215 self.subscription_count() 216 ); 217 218 // mark the subscription as done if it exists in our stream map 219 { 220 let mut map = self.subs.lock().unwrap(); 221 if let Entry::Occupied(mut entry) = map.entry(sub) { 222 entry.get_mut().done = true; 223 } 224 } 225 226 if r == 0 { 227 Err(Error::SubscriptionError) 228 } else { 229 Ok(()) 230 } 231 } 232 233 pub fn subscribe(&self, filters: &[Filter]) -> Result<Subscription> { 234 unsafe { 235 let mut ndb_filters: Vec<bindings::ndb_filter> = 236 filters.iter().map(|a| a.data).collect(); 237 let id = bindings::ndb_subscribe( 238 self.as_ptr(), 239 ndb_filters.as_mut_ptr(), 240 filters.len() as i32, 241 ); 242 if id == 0 { 243 Err(Error::SubscriptionError) 244 } else { 245 Ok(Subscription::new(id)) 246 } 247 } 248 } 249 250 pub fn poll_for_notes(&self, sub: Subscription, max_notes: u32) -> Vec<NoteKey> { 251 let mut vec = vec![]; 252 vec.reserve_exact(max_notes as usize); 253 254 unsafe { 255 let res = bindings::ndb_poll_for_notes( 256 self.as_ptr(), 257 sub.id(), 258 vec.as_mut_ptr(), 259 max_notes as c_int, 260 ); 261 vec.set_len(res as usize); 262 }; 263 264 vec.into_iter().map(NoteKey::new).collect() 265 } 266 267 pub async fn wait_for_notes( 268 &self, 269 sub_id: Subscription, 270 max_notes: u32, 271 ) -> Result<Vec<NoteKey>> { 272 let mut stream = SubscriptionStream::new(self.clone(), sub_id).notes_per_await(max_notes); 273 274 match stream.next().await { 275 Some(res) => Ok(res), 276 None => Err(Error::SubscriptionError), 277 } 278 } 279 280 pub fn get_profile_by_key<'a>( 281 &self, 282 transaction: &'a Transaction, 283 key: ProfileKey, 284 ) -> Result<ProfileRecord<'a>> { 285 let mut len: usize = 0; 286 287 let profile_record_ptr = unsafe { 288 bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len) 289 }; 290 291 if profile_record_ptr.is_null() { 292 // Handle null pointer (e.g., note not found or error occurred) 293 return Err(Error::NotFound); 294 } 295 296 // Convert the raw pointer to a Note instance 297 Ok(ProfileRecord::new_transactional( 298 profile_record_ptr, 299 len, 300 key, 301 transaction, 302 )) 303 } 304 305 pub fn get_profile_by_pubkey<'a>( 306 &self, 307 transaction: &'a Transaction, 308 id: &[u8; 32], 309 ) -> Result<ProfileRecord<'a>> { 310 let mut len: usize = 0; 311 let mut primkey: u64 = 0; 312 313 let profile_record_ptr = unsafe { 314 bindings::ndb_get_profile_by_pubkey( 315 transaction.as_mut_ptr(), 316 id.as_ptr(), 317 &mut len, 318 &mut primkey, 319 ) 320 }; 321 322 if profile_record_ptr.is_null() { 323 // Handle null pointer (e.g., note not found or error occurred) 324 return Err(Error::NotFound); 325 } 326 327 // Convert the raw pointer to a Note instance 328 Ok(ProfileRecord::new_transactional( 329 profile_record_ptr, 330 len, 331 ProfileKey::new(primkey), 332 transaction, 333 )) 334 } 335 336 pub fn get_note_metadata<'a>( 337 &self, 338 txn: &'a Transaction, 339 id: &[u8; 32], 340 ) -> Result<NoteMetadata<'a>> { 341 let res = unsafe { 342 let res = bindings::ndb_get_note_meta( 343 txn.as_mut_ptr(), 344 id.as_ptr() as *const ::std::os::raw::c_uchar, 345 ); 346 347 if res.is_null() { 348 return Err(Error::NotFound); 349 } 350 351 &mut *res 352 }; 353 354 Ok(NoteMetadata::new(res)) 355 } 356 357 pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<NoteKey> { 358 let res = unsafe { 359 bindings::ndb_get_notekey_by_id( 360 txn.as_mut_ptr(), 361 id.as_ptr() as *const ::std::os::raw::c_uchar, 362 ) 363 }; 364 365 if res == 0 { 366 return Err(Error::NotFound); 367 } 368 369 Ok(NoteKey::new(res)) 370 } 371 372 pub fn get_profilekey_by_pubkey( 373 &self, 374 txn: &Transaction, 375 pubkey: &[u8; 32], 376 ) -> Result<ProfileKey> { 377 let res = unsafe { 378 bindings::ndb_get_profilekey_by_pubkey( 379 txn.as_mut_ptr(), 380 pubkey.as_ptr() as *const ::std::os::raw::c_uchar, 381 ) 382 }; 383 384 if res == 0 { 385 return Err(Error::NotFound); 386 } 387 388 Ok(ProfileKey::new(res)) 389 } 390 391 pub fn get_blocks_by_key<'a>( 392 &self, 393 txn: &'a Transaction, 394 note_key: NoteKey, 395 ) -> Result<Blocks<'a>> { 396 let blocks_ptr = unsafe { 397 bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64()) 398 }; 399 400 if blocks_ptr.is_null() { 401 return Err(Error::NotFound); 402 } 403 404 Ok(Blocks::new_transactional(blocks_ptr, txn)) 405 } 406 407 pub fn get_note_by_key<'a>( 408 &self, 409 transaction: &'a Transaction, 410 note_key: NoteKey, 411 ) -> Result<Note<'a>> { 412 let mut len: usize = 0; 413 414 let note_ptr = unsafe { 415 bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len) 416 }; 417 418 if note_ptr.is_null() { 419 // Handle null pointer (e.g., note not found or error occurred) 420 return Err(Error::NotFound); 421 } 422 423 // Convert the raw pointer to a Note instance 424 Ok(Note::new_transactional( 425 note_ptr, 426 len, 427 note_key, 428 transaction, 429 )) 430 } 431 432 /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id 433 pub fn get_note_by_id<'a>( 434 &self, 435 transaction: &'a Transaction, 436 id: &[u8; 32], 437 ) -> Result<Note<'a>> { 438 let mut len: usize = 0; 439 let mut primkey: u64 = 0; 440 441 let note_ptr = unsafe { 442 bindings::ndb_get_note_by_id( 443 transaction.as_mut_ptr(), 444 id.as_ptr(), 445 &mut len, 446 &mut primkey, 447 ) 448 }; 449 450 if note_ptr.is_null() { 451 // Handle null pointer (e.g., note not found or error occurred) 452 return Err(Error::NotFound); 453 } 454 455 // Convert the raw pointer to a Note instance 456 Ok(Note::new_transactional( 457 note_ptr, 458 len, 459 NoteKey::new(primkey), 460 transaction, 461 )) 462 } 463 464 pub fn search_profile<'a>( 465 &self, 466 transaction: &'a Transaction, 467 search: &str, 468 limit: u32, 469 ) -> Result<Vec<&'a [u8; 32]>> { 470 let mut results = Vec::new(); 471 472 let mut ndb_search = ndb_search { 473 key: std::ptr::null_mut(), 474 profile_key: 0, 475 cursor: std::ptr::null_mut(), 476 }; 477 478 let c_query = CString::new(search).map_err(|_| Error::DecodeError)?; 479 480 let success = unsafe { 481 bindings::ndb_search_profile( 482 transaction.as_mut_ptr(), 483 &mut ndb_search as *mut ndb_search, 484 c_query.as_c_str().as_ptr(), 485 ) 486 }; 487 488 if success == 0 { 489 return Ok(results); 490 } 491 492 // Add the first result 493 if let Some(key) = unsafe { ndb_search.key.as_ref() } { 494 results.push(&key.id); 495 } 496 497 // Iterate through additional results up to the limit 498 let mut remaining = limit; 499 while remaining > 0 { 500 let next_success = 501 unsafe { bindings::ndb_search_profile_next(&mut ndb_search as *mut ndb_search) }; 502 503 if next_success == 0 { 504 break; 505 } 506 507 if let Some(key) = unsafe { ndb_search.key.as_ref() } { 508 results.push(&key.id); 509 } 510 511 remaining -= 1; 512 } 513 514 unsafe { 515 bindings::ndb_search_profile_end(&mut ndb_search as *mut ndb_search); 516 } 517 518 Ok(results) 519 } 520 521 /// Get the underlying pointer to the context in C 522 pub fn as_ptr(&self) -> *mut bindings::ndb { 523 self.refs.ndb 524 } 525 } 526 527 #[cfg(test)] 528 mod tests { 529 use super::*; 530 use crate::config::Config; 531 use crate::test_util; 532 use tokio::time::{self, sleep, Duration}; 533 534 #[test] 535 fn ndb_init_works() { 536 let db = "target/testdbs/init_works"; 537 test_util::cleanup_db(db); 538 539 { 540 let cfg = Config::new(); 541 let _ = Ndb::new(db, &cfg).expect("ok"); 542 } 543 } 544 545 #[tokio::test] 546 async fn query_works() { 547 let db = "target/testdbs/query"; 548 test_util::cleanup_db(&db); 549 550 { 551 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 552 553 let filter = Filter::new().kinds(vec![1]).build(); 554 let filters = vec![filter]; 555 556 let sub = ndb.subscribe(&filters).expect("sub_id"); 557 let waiter = ndb.wait_for_notes(sub, 1); 558 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 559 let res = waiter.await.expect("await ok"); 560 assert_eq!(res, vec![NoteKey::new(1)]); 561 let txn = Transaction::new(&ndb).expect("txn"); 562 let res = ndb.query(&txn, &filters, 1).expect("query ok"); 563 assert_eq!(res.len(), 1); 564 assert_eq!( 565 hex::encode(res[0].note.id()), 566 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3" 567 ); 568 } 569 } 570 571 #[tokio::test] 572 async fn search_profile_works() { 573 let db = "target/testdbs/search_profile"; 574 test_util::cleanup_db(&db); 575 576 { 577 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 578 579 let filter = Filter::new().kinds(vec![0]).build(); 580 let filters = vec![filter]; 581 582 let sub_id = ndb.subscribe(&filters).expect("sub_id"); 583 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 584 585 ndb.process_event(r#"["EVENT","b",{ "id": "0b9f0e14727733e430dcb00c69b12a76a1e100f419ce369df837f7eb33e4523c", "pubkey": "3f770d65d3a764a9c5cb503ae123e62ec7598ad035d836e2a810f3877a745b24", "created_at": 1736785355, "kind": 0, "tags": [ [ "alt", "User profile for Derek Ross" ], [ "i", "twitter:derekmross", "1634343988407726081" ], [ "i", "github:derekross", "3edaf845975fa4500496a15039323fa3I" ] ], "content": "{\"about\":\"Building NostrPlebs.com and NostrNests.com. The purple pill helps the orange pill go down. Nostr is the social glue that binds all of your apps together.\",\"banner\":\"https://i.nostr.build/O2JE.jpg\",\"display_name\":\"Derek Ross\",\"lud16\":\"derekross@strike.me\",\"name\":\"Derek Ross\",\"nip05\":\"derekross@nostrplebs.com\",\"picture\":\"https://i.nostr.build/MVIJ6OOFSUzzjVEc.jpg\",\"website\":\"https://nostrplebs.com\",\"created_at\":1707238393}", "sig": "51e1225ccaf9b6739861dc218ac29045b09d5cf3a51b0ac6ea64bd36827d2d4394244e5f58a4e4a324c84eeda060e1a27e267e0d536e5a0e45b0b6bdc2c43bbc"}]"#).unwrap(); 586 587 ndb.process_event(r#"["EVENT","b",{ "id": "232a02ec7e1b2febf85370b52ed49bf34e2701c385c3d563511508dcf0767bcf", "pubkey": "4a0510f26880d40e432f4865cb5714d9d3c200ca6ebb16b418ae6c555f574967", "created_at": 1736017863, "kind": 0, "tags": [ [ "client", "Damus Notedeck" ] ], "content": "{\"display_name\":\"KernelKind\",\"name\":\"KernelKind\",\"about\":\"hello from notedeck!\",\"lud16\":\"kernelkind@getalby.com\"}", "sig": "18c7dea0da3c30677d6822a31a6dfd9ebc02a18a31d69f0f2ac9ba88409e437d3db0ac433639111df1e4948a6d18451d1582173ee4fcd018d0ec92939f2c1506"}]"#).unwrap(); 588 589 for _ in 0..2 { 590 let _ = sub.next().await; 591 } 592 let txn = Transaction::new(&ndb).expect("txn"); 593 594 let res = ndb.search_profile(&txn, "kernel", 1); 595 assert!(res.is_ok()); 596 let res = res.unwrap(); 597 assert!(res.len() >= 1); 598 let kernelkind_bytes: [u8; 32] = [ 599 0x4a, 0x05, 0x10, 0xf2, 0x68, 0x80, 0xd4, 0x0e, 0x43, 0x2f, 0x48, 0x65, 0xcb, 0x57, 600 0x14, 0xd9, 0xd3, 0xc2, 0x00, 0xca, 0x6e, 0xbb, 0x16, 0xb4, 0x18, 0xae, 0x6c, 0x55, 601 0x5f, 0x57, 0x49, 0x67, 602 ]; 603 assert_eq!(kernelkind_bytes, **res.first().unwrap()); 604 605 let res = ndb.search_profile(&txn, "Derek", 1); 606 assert!(res.is_ok()); 607 let res = res.unwrap(); 608 assert!(res.len() >= 1); 609 let derek_bytes: [u8; 32] = [ 610 0x3f, 0x77, 0x0d, 0x65, 0xd3, 0xa7, 0x64, 0xa9, 0xc5, 0xcb, 0x50, 0x3a, 0xe1, 0x23, 611 0xe6, 0x2e, 0xc7, 0x59, 0x8a, 0xd0, 0x35, 0xd8, 0x36, 0xe2, 0xa8, 0x10, 0xf3, 0x87, 612 0x7a, 0x74, 0x5b, 0x24, 613 ]; 614 assert_eq!(derek_bytes, **res.first().unwrap()); 615 } 616 } 617 618 #[tokio::test] 619 async fn subscribe_event_works() { 620 let db = "target/testdbs/subscribe"; 621 test_util::cleanup_db(&db); 622 623 { 624 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 625 626 let filter = Filter::new().kinds(vec![1]).build(); 627 628 let sub = ndb.subscribe(&[filter]).expect("sub_id"); 629 let waiter = ndb.wait_for_notes(sub, 1); 630 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 631 let res = waiter.await.expect("await ok"); 632 assert_eq!(res, vec![NoteKey::new(1)]); 633 } 634 } 635 636 #[tokio::test] 637 async fn multiple_events_work() { 638 let db = "target/testdbs/multiple_events"; 639 test_util::cleanup_db(&db); 640 641 { 642 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 643 644 let filter = Filter::new().kinds(vec![1]).build(); 645 646 let sub_id = ndb.subscribe(&[filter]).expect("sub_id"); 647 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 648 649 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 650 ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok"); 651 ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok"); 652 653 // this pause causes problems 654 sleep(Duration::from_millis(100)).await; 655 656 ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok"); 657 ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok"); 658 ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok"); 659 660 let timeout_duration = Duration::from_secs(2); 661 let result = time::timeout(timeout_duration, async { 662 let mut count = 0; 663 while count < 6 { 664 let res = sub.next(); 665 let _ = res.await.expect("await ok"); 666 count += 1; 667 println!("saw an event, count = {}", count); 668 } 669 }) 670 .await; 671 672 match result { 673 Ok(_) => println!("Test completed successfully"), 674 Err(_) => panic!("Test timed out"), 675 } 676 } 677 } 678 679 #[tokio::test] 680 async fn multiple_events_with_final_pause_work() { 681 let db = "target/testdbs/multiple_events_with_final_pause"; 682 test_util::cleanup_db(&db); 683 684 { 685 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 686 687 let filter = Filter::new().kinds(vec![1]).build(); 688 689 let sub_id = ndb.subscribe(&[filter]).expect("sub_id"); 690 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 691 692 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 693 ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok"); 694 ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok"); 695 696 sleep(Duration::from_millis(100)).await; 697 698 ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok"); 699 ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok"); 700 ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok"); 701 702 // this final pause causes extra problems 703 sleep(Duration::from_millis(100)).await; 704 705 let timeout_duration = Duration::from_secs(2); 706 let result = time::timeout(timeout_duration, async { 707 let mut count = 0; 708 while count < 6 { 709 let res = sub.next(); 710 let _ = res.await.expect("await ok"); 711 count += 1; 712 println!("saw an event, count = {}", count); 713 } 714 }) 715 .await; 716 717 match result { 718 Ok(_) => println!("Test completed successfully"), 719 Err(_) => panic!("Test timed out"), 720 } 721 } 722 } 723 724 #[test] 725 fn poll_note_works() { 726 let db = "target/testdbs/poll"; 727 test_util::cleanup_db(&db); 728 729 { 730 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 731 732 let filter = Filter::new().kinds(vec![1]).build(); 733 734 let sub = ndb.subscribe(&[filter]).expect("sub_id"); 735 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 736 // this is too fast, we should have nothing 737 let res = ndb.poll_for_notes(sub, 1); 738 assert_eq!(res, vec![]); 739 740 std::thread::sleep(std::time::Duration::from_millis(150)); 741 // now we should have something 742 let res = ndb.poll_for_notes(sub, 1); 743 assert_eq!(res, vec![NoteKey::new(1)]); 744 } 745 } 746 747 #[test] 748 fn process_event_works() { 749 let db = "target/testdbs/event_works"; 750 test_util::cleanup_db(&db); 751 752 { 753 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 754 ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 755 } 756 757 { 758 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 759 let id = 760 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3") 761 .expect("hex id"); 762 let mut txn = Transaction::new(&ndb).expect("txn"); 763 let id_bytes: [u8; 32] = id.try_into().expect("id bytes"); 764 let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note"); 765 assert_eq!(note.kind(), 1); 766 } 767 } 768 769 #[test] 770 #[cfg(target_os = "windows")] 771 fn test_windows_large_mapsize() { 772 use std::{fs, path::Path}; 773 774 let db = "target/testdbs/windows_large_mapsize"; 775 test_util::cleanup_db(&db); 776 777 { 778 // 32 TiB should be way too big for CI 779 let config = 780 Config::new().set_mapsize(1024usize * 1024usize * 1024usize * 1024usize * 32usize); 781 782 // in this case, nostrdb should try to keep resizing to 783 // smaller mapsizes until success 784 785 let ndb = Ndb::new(db, &config); 786 787 assert!(ndb.is_ok()); 788 } 789 790 let file_len = fs::metadata(Path::new(db).join("data.mdb")) 791 .expect("metadata") 792 .len(); 793 794 assert!(file_len > 0); 795 796 if cfg!(target_os = "windows") { 797 // on windows the default mapsize will be 1MB when we fail 798 // to open it 799 assert_ne!(file_len, 1048576); 800 } else { 801 assert!(file_len < 1024u64 * 1024u64); 802 } 803 804 // we should definitely clean this up... especially on windows 805 test_util::cleanup_db(&db); 806 } 807 808 #[tokio::test] 809 async fn test_unsub_on_drop() { 810 let db = "target/testdbs/test_unsub_on_drop"; 811 test_util::cleanup_db(&db); 812 813 { 814 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 815 let sub_id = { 816 let filter = Filter::new().kinds(vec![1]).build(); 817 let filters = vec![filter]; 818 819 let sub_id = ndb.subscribe(&filters).expect("sub_id"); 820 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 821 822 let res = sub.next(); 823 824 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 825 826 let res = res.await.expect("await ok"); 827 assert_eq!(res, vec![NoteKey::new(1)]); 828 829 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id)); 830 sub_id 831 }; 832 833 // ensure subscription state is removed after stream is dropped 834 assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id)); 835 assert_eq!(ndb.subscription_count(), 0); 836 } 837 838 test_util::cleanup_db(&db); 839 } 840 841 #[tokio::test] 842 async fn test_stream() { 843 let db = "target/testdbs/test_stream"; 844 test_util::cleanup_db(&db); 845 846 { 847 let mut ndb = Ndb::new(db, &Config::new()).expect("ndb"); 848 let sub_id = { 849 let filter = Filter::new().kinds(vec![1]).build(); 850 let filters = vec![filter]; 851 852 let sub_id = ndb.subscribe(&filters).expect("sub_id"); 853 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 854 855 let res = sub.next(); 856 857 let _ = ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 858 859 let _ = ndb.process_event(r#"["EVENT",{"content":"👀","created_at":1761514455,"id":"66af95a6bdfec756344f48241562b684082ff9c76ea940c11c4fd85e91e1219c","kind":7,"pubkey":"d5805ae449e108e907091c67cdf49a9835b3cac3dd11489ad215c0ddf7c658fc","sig":"69f4a3fe7c1cc6aa9c9cc4a2e90e4b71c3b9afaad262e68b92336e0493ff1a748b5dcc20ab6e86d4551dc5ea680ddfa1c08d47f9e4845927e143e8ef2183479b","tags":[["e","d44ad96cb8924092a76bc2afddeb12eb85233c0d03a7d9adc42c2a85a79a4305","wss://relay.primal.net/","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"],["p","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9","wss://relay.primal.net/"],["k","1"]]}]"#); 860 let res = res.await.expect("await ok"); 861 assert_eq!(res, vec![NoteKey::new(1)]); 862 863 // ensure that unsubscribing kills the stream 864 assert!(ndb.unsubscribe(sub_id).is_ok()); 865 assert!(sub.next().await.is_none()); 866 867 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id)); 868 sub_id 869 }; 870 871 // ensure subscription state is removed after stream is dropped 872 assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id)); 873 } 874 875 test_util::cleanup_db(&db); 876 } 877 }