ndb.rs (38487B)
1 use std::ffi::CString; 2 use std::ptr; 3 4 use crate::bindings::ndb_search; 5 use crate::{ 6 bindings, Blocks, Config, Error, Filter, IngestMetadata, Note, NoteKey, NoteMetadata, 7 ProfileKey, ProfileRecord, QueryResult, Result, Subscription, SubscriptionState, 8 SubscriptionStream, Transaction, 9 }; 10 use futures::StreamExt; 11 use std::collections::hash_map::Entry; 12 use std::collections::HashMap; 13 use std::fs; 14 use std::os::raw::c_int; 15 use std::path::Path; 16 use std::sync::{Arc, Mutex}; 17 use tracing::debug; 18 19 #[derive(Debug)] 20 struct NdbRef { 21 ndb: *mut bindings::ndb, 22 rust_cb_ctx: *mut ::std::os::raw::c_void, 23 } 24 25 /// SAFETY: thread safety is ensured by nostrdb 26 unsafe impl Send for NdbRef {} 27 28 /// SAFETY: thread safety is ensured by nostrdb 29 unsafe impl Sync for NdbRef {} 30 31 /// The database is automatically closed when [Ndb] is [Drop]ped. 32 impl Drop for NdbRef { 33 fn drop(&mut self) { 34 unsafe { 35 bindings::ndb_destroy(self.ndb); 36 37 if !self.rust_cb_ctx.is_null() { 38 // Rebuild the Box from the raw pointer and drop it. 39 let _ = Box::from_raw(self.rust_cb_ctx as *mut Box<dyn FnMut()>); 40 } 41 } 42 } 43 } 44 45 type SubMap = HashMap<Subscription, SubscriptionState>; 46 47 /// A nostrdb context. Construct one of these with [Ndb::new]. 48 #[derive(Debug, Clone)] 49 pub struct Ndb { 50 refs: Arc<NdbRef>, 51 52 /// Track query future states 53 pub(crate) subs: Arc<Mutex<SubMap>>, 54 } 55 56 impl Ndb { 57 /// Construct a new nostrdb context. Takes a directory where the database 58 /// is/will be located and a nostrdb config. 59 pub fn new(db_dir: &str, config: &Config) -> Result<Self> { 60 let db_dir_cstr = match CString::new(db_dir) { 61 Ok(cstr) => cstr, 62 Err(_) => return Err(Error::DbOpenFailed), 63 }; 64 let mut ndb: *mut bindings::ndb = ptr::null_mut(); 65 66 let path = Path::new(db_dir); 67 if !path.exists() { 68 let _ = fs::create_dir_all(path); 69 } 70 71 let min_mapsize = 1024 * 1024 * 512; 72 let mut mapsize = config.config.mapsize; 73 let config = *config; 74 75 let prev_callback = config.config.sub_cb; 76 let prev_callback_ctx = config.config.sub_cb_ctx; 77 let subs = Arc::new(Mutex::new(SubMap::default())); 78 let subs_clone = subs.clone(); 79 80 // We need to register our own callback so that we can wake 81 // query futures 82 let mut config = config.set_sub_callback(move |sub_id: u64| { 83 let mut map = subs_clone.lock().unwrap(); 84 if let Some(s) = map.get_mut(&Subscription::new(sub_id)) { 85 if let Some(w) = s.waker.take() { 86 w.wake(); 87 } 88 } 89 90 if let Some(pcb) = prev_callback { 91 unsafe { 92 pcb(prev_callback_ctx, sub_id); 93 }; 94 } 95 }); 96 97 let result = loop { 98 let result = 99 unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) }; 100 101 if result == 0 { 102 mapsize /= 2; 103 config = config.set_mapsize(mapsize); 104 debug!("ndb init failed, reducing mapsize to {}", mapsize); 105 106 if mapsize > min_mapsize { 107 continue; 108 } else { 109 break 0; 110 } 111 } else { 112 break result; 113 } 114 }; 115 116 if result == 0 { 117 return Err(Error::DbOpenFailed); 118 } 119 120 let rust_cb_ctx = config.config.sub_cb_ctx; 121 let refs = Arc::new(NdbRef { ndb, rust_cb_ctx }); 122 123 Ok(Ndb { refs, subs }) 124 } 125 126 /// Ingest a relay or client sent event, with optional relay metadata. 127 /// This function returns immediately and doesn't provide any information on 128 /// if ingestion was successful or not. 129 pub fn process_event_with(&self, json: &str, mut meta: IngestMetadata) -> Result<()> { 130 // Convert the Rust string to a C-style string 131 let c_json = CString::new(json)?; 132 let c_json_ptr = c_json.as_ptr(); 133 134 // Get the length of the string 135 let len = json.len() as libc::c_int; 136 137 let res = unsafe { 138 bindings::ndb_process_event_with(self.as_ptr(), c_json_ptr, len, meta.as_mut_ptr()) 139 }; 140 141 if res == 0 { 142 return Err(Error::NoteProcessFailed); 143 } 144 145 Ok(()) 146 } 147 148 /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]` 149 /// This function returns immediately and doesn't provide any information on 150 /// if ingestion was successful or not. 151 pub fn process_event(&self, json: &str) -> Result<()> { 152 self.process_event_with(json, IngestMetadata::new().client(false)) 153 } 154 155 /// Ingest a client-sent event in the form `["EVENT", {"id:"...}]` 156 /// This function returns immediately and doesn't provide any information on 157 /// if ingestion was successful or not. 158 pub fn process_client_event(&self, json: &str) -> Result<()> { 159 self.process_event_with(json, IngestMetadata::new().client(true)) 160 } 161 162 pub fn query<'a>( 163 &self, 164 txn: &'a Transaction, 165 filters: &[Filter], 166 max_results: i32, 167 ) -> Result<Vec<QueryResult<'a>>> { 168 let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect(); 169 let mut out: Vec<bindings::ndb_query_result> = vec![]; 170 let mut returned: i32 = 0; 171 out.reserve_exact(max_results as usize); 172 let res = unsafe { 173 bindings::ndb_query( 174 txn.as_mut_ptr(), 175 ndb_filters.as_mut_ptr(), 176 ndb_filters.len() as i32, 177 out.as_mut_ptr(), 178 max_results, 179 &mut returned as *mut i32, 180 ) 181 }; 182 if res == 1 { 183 unsafe { 184 out.set_len(returned as usize); 185 }; 186 Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect()) 187 } else { 188 Err(Error::QueryError) 189 } 190 } 191 192 pub fn subscription_count(&self) -> u32 { 193 unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 } 194 } 195 196 pub fn unsubscribe(&mut self, sub: Subscription) -> Result<()> { 197 let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub.id()) }; 198 199 debug!( 200 "unsubscribed from {}, sub count {}", 201 sub.id(), 202 self.subscription_count() 203 ); 204 205 // mark the subscription as done if it exists in our stream map 206 { 207 let mut map = self.subs.lock().unwrap(); 208 if let Entry::Occupied(mut entry) = map.entry(sub) { 209 entry.get_mut().done = true; 210 } 211 } 212 213 if r == 0 { 214 Err(Error::SubscriptionError) 215 } else { 216 Ok(()) 217 } 218 } 219 220 pub fn subscribe(&self, filters: &[Filter]) -> Result<Subscription> { 221 unsafe { 222 let mut ndb_filters: Vec<bindings::ndb_filter> = 223 filters.iter().map(|a| a.data).collect(); 224 let id = bindings::ndb_subscribe( 225 self.as_ptr(), 226 ndb_filters.as_mut_ptr(), 227 filters.len() as i32, 228 ); 229 if id == 0 { 230 Err(Error::SubscriptionError) 231 } else { 232 Ok(Subscription::new(id)) 233 } 234 } 235 } 236 237 pub fn poll_for_notes(&self, sub: Subscription, max_notes: u32) -> Vec<NoteKey> { 238 let mut vec = vec![]; 239 vec.reserve_exact(max_notes as usize); 240 241 unsafe { 242 let res = bindings::ndb_poll_for_notes( 243 self.as_ptr(), 244 sub.id(), 245 vec.as_mut_ptr(), 246 max_notes as c_int, 247 ); 248 vec.set_len(res as usize); 249 }; 250 251 vec.into_iter().map(NoteKey::new).collect() 252 } 253 254 pub async fn wait_for_notes( 255 &self, 256 sub_id: Subscription, 257 max_notes: u32, 258 ) -> Result<Vec<NoteKey>> { 259 let mut stream = SubscriptionStream::new(self.clone(), sub_id).notes_per_await(max_notes); 260 261 match stream.next().await { 262 Some(res) => Ok(res), 263 None => Err(Error::SubscriptionError), 264 } 265 } 266 267 pub fn get_profile_by_key<'a>( 268 &self, 269 transaction: &'a Transaction, 270 key: ProfileKey, 271 ) -> Result<ProfileRecord<'a>> { 272 let mut len: usize = 0; 273 274 let profile_record_ptr = unsafe { 275 bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len) 276 }; 277 278 if profile_record_ptr.is_null() { 279 // Handle null pointer (e.g., note not found or error occurred) 280 return Err(Error::NotFound); 281 } 282 283 // Convert the raw pointer to a Note instance 284 Ok(ProfileRecord::new_transactional( 285 profile_record_ptr, 286 len, 287 key, 288 transaction, 289 )) 290 } 291 292 pub fn get_profile_by_pubkey<'a>( 293 &self, 294 transaction: &'a Transaction, 295 id: &[u8; 32], 296 ) -> Result<ProfileRecord<'a>> { 297 let mut len: usize = 0; 298 let mut primkey: u64 = 0; 299 300 let profile_record_ptr = unsafe { 301 bindings::ndb_get_profile_by_pubkey( 302 transaction.as_mut_ptr(), 303 id.as_ptr(), 304 &mut len, 305 &mut primkey, 306 ) 307 }; 308 309 if profile_record_ptr.is_null() { 310 // Handle null pointer (e.g., note not found or error occurred) 311 return Err(Error::NotFound); 312 } 313 314 // Convert the raw pointer to a Note instance 315 Ok(ProfileRecord::new_transactional( 316 profile_record_ptr, 317 len, 318 ProfileKey::new(primkey), 319 transaction, 320 )) 321 } 322 323 pub fn get_note_metadata<'a>( 324 &self, 325 txn: &'a Transaction, 326 id: &[u8; 32], 327 ) -> Result<NoteMetadata<'a>> { 328 let res = unsafe { 329 let res = bindings::ndb_get_note_meta( 330 txn.as_mut_ptr(), 331 id.as_ptr() as *const ::std::os::raw::c_uchar, 332 ); 333 334 if res.is_null() { 335 return Err(Error::NotFound); 336 } 337 338 &mut *res 339 }; 340 341 Ok(NoteMetadata::new(res)) 342 } 343 344 pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<NoteKey> { 345 let res = unsafe { 346 bindings::ndb_get_notekey_by_id( 347 txn.as_mut_ptr(), 348 id.as_ptr() as *const ::std::os::raw::c_uchar, 349 ) 350 }; 351 352 if res == 0 { 353 return Err(Error::NotFound); 354 } 355 356 Ok(NoteKey::new(res)) 357 } 358 359 pub fn get_profilekey_by_pubkey( 360 &self, 361 txn: &Transaction, 362 pubkey: &[u8; 32], 363 ) -> Result<ProfileKey> { 364 let res = unsafe { 365 bindings::ndb_get_profilekey_by_pubkey( 366 txn.as_mut_ptr(), 367 pubkey.as_ptr() as *const ::std::os::raw::c_uchar, 368 ) 369 }; 370 371 if res == 0 { 372 return Err(Error::NotFound); 373 } 374 375 Ok(ProfileKey::new(res)) 376 } 377 378 pub fn get_blocks_by_key<'a>( 379 &self, 380 txn: &'a Transaction, 381 note_key: NoteKey, 382 ) -> Result<Blocks<'a>> { 383 let blocks_ptr = unsafe { 384 bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64()) 385 }; 386 387 if blocks_ptr.is_null() { 388 return Err(Error::NotFound); 389 } 390 391 Ok(Blocks::new_transactional(blocks_ptr, txn)) 392 } 393 394 pub fn get_note_by_key<'a>( 395 &self, 396 transaction: &'a Transaction, 397 note_key: NoteKey, 398 ) -> Result<Note<'a>> { 399 let mut len: usize = 0; 400 401 let note_ptr = unsafe { 402 bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len) 403 }; 404 405 if note_ptr.is_null() { 406 // Handle null pointer (e.g., note not found or error occurred) 407 return Err(Error::NotFound); 408 } 409 410 // Convert the raw pointer to a Note instance 411 Ok(Note::new_transactional( 412 note_ptr, 413 len, 414 note_key, 415 transaction, 416 )) 417 } 418 419 /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id 420 pub fn get_note_by_id<'a>( 421 &self, 422 transaction: &'a Transaction, 423 id: &[u8; 32], 424 ) -> Result<Note<'a>> { 425 let mut len: usize = 0; 426 let mut primkey: u64 = 0; 427 428 let note_ptr = unsafe { 429 bindings::ndb_get_note_by_id( 430 transaction.as_mut_ptr(), 431 id.as_ptr(), 432 &mut len, 433 &mut primkey, 434 ) 435 }; 436 437 if note_ptr.is_null() { 438 // Handle null pointer (e.g., note not found or error occurred) 439 return Err(Error::NotFound); 440 } 441 442 // Convert the raw pointer to a Note instance 443 Ok(Note::new_transactional( 444 note_ptr, 445 len, 446 NoteKey::new(primkey), 447 transaction, 448 )) 449 } 450 451 pub fn search_profile<'a>( 452 &self, 453 transaction: &'a Transaction, 454 search: &str, 455 limit: u32, 456 ) -> Result<Vec<&'a [u8; 32]>> { 457 let mut results = Vec::new(); 458 459 let mut ndb_search = ndb_search { 460 key: std::ptr::null_mut(), 461 profile_key: 0, 462 cursor: std::ptr::null_mut(), 463 }; 464 465 let c_query = CString::new(search).map_err(|_| Error::DecodeError)?; 466 467 let success = unsafe { 468 bindings::ndb_search_profile( 469 transaction.as_mut_ptr(), 470 &mut ndb_search as *mut ndb_search, 471 c_query.as_c_str().as_ptr(), 472 ) 473 }; 474 475 if success == 0 { 476 return Ok(results); 477 } 478 479 // Add the first result 480 if let Some(key) = unsafe { ndb_search.key.as_ref() } { 481 results.push(&key.id); 482 } 483 484 // Iterate through additional results up to the limit 485 let mut remaining = limit; 486 while remaining > 0 { 487 let next_success = 488 unsafe { bindings::ndb_search_profile_next(&mut ndb_search as *mut ndb_search) }; 489 490 if next_success == 0 { 491 break; 492 } 493 494 if let Some(key) = unsafe { ndb_search.key.as_ref() } { 495 results.push(&key.id); 496 } 497 498 remaining -= 1; 499 } 500 501 unsafe { 502 bindings::ndb_search_profile_end(&mut ndb_search as *mut ndb_search); 503 } 504 505 Ok(results) 506 } 507 508 /// Get the underlying pointer to the context in C 509 pub fn as_ptr(&self) -> *mut bindings::ndb { 510 self.refs.ndb 511 } 512 } 513 514 #[cfg(test)] 515 mod tests { 516 use super::*; 517 use crate::config::Config; 518 use crate::test_util; 519 use tokio::time::{self, sleep, Duration}; 520 521 #[test] 522 fn ndb_init_works() { 523 let db = "target/testdbs/init_works"; 524 test_util::cleanup_db(db); 525 526 { 527 let cfg = Config::new(); 528 let _ = Ndb::new(db, &cfg).expect("ok"); 529 } 530 } 531 532 #[tokio::test] 533 async fn query_works() { 534 let db = "target/testdbs/query"; 535 test_util::cleanup_db(&db); 536 537 { 538 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 539 540 let filter = Filter::new().kinds(vec![1]).build(); 541 let filters = vec![filter]; 542 543 let sub = ndb.subscribe(&filters).expect("sub_id"); 544 let waiter = ndb.wait_for_notes(sub, 1); 545 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 546 let res = waiter.await.expect("await ok"); 547 assert_eq!(res, vec![NoteKey::new(1)]); 548 let txn = Transaction::new(&ndb).expect("txn"); 549 let res = ndb.query(&txn, &filters, 1).expect("query ok"); 550 assert_eq!(res.len(), 1); 551 assert_eq!( 552 hex::encode(res[0].note.id()), 553 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3" 554 ); 555 } 556 } 557 558 #[tokio::test] 559 async fn search_profile_works() { 560 let db = "target/testdbs/search_profile"; 561 test_util::cleanup_db(&db); 562 563 { 564 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 565 566 let filter = Filter::new().kinds(vec![0]).build(); 567 let filters = vec![filter]; 568 569 let sub_id = ndb.subscribe(&filters).expect("sub_id"); 570 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 571 572 ndb.process_event(r#"["EVENT","b",{ "id": "0b9f0e14727733e430dcb00c69b12a76a1e100f419ce369df837f7eb33e4523c", "pubkey": "3f770d65d3a764a9c5cb503ae123e62ec7598ad035d836e2a810f3877a745b24", "created_at": 1736785355, "kind": 0, "tags": [ [ "alt", "User profile for Derek Ross" ], [ "i", "twitter:derekmross", "1634343988407726081" ], [ "i", "github:derekross", "3edaf845975fa4500496a15039323fa3I" ] ], "content": "{\"about\":\"Building NostrPlebs.com and NostrNests.com. The purple pill helps the orange pill go down. Nostr is the social glue that binds all of your apps together.\",\"banner\":\"https://i.nostr.build/O2JE.jpg\",\"display_name\":\"Derek Ross\",\"lud16\":\"derekross@strike.me\",\"name\":\"Derek Ross\",\"nip05\":\"derekross@nostrplebs.com\",\"picture\":\"https://i.nostr.build/MVIJ6OOFSUzzjVEc.jpg\",\"website\":\"https://nostrplebs.com\",\"created_at\":1707238393}", "sig": "51e1225ccaf9b6739861dc218ac29045b09d5cf3a51b0ac6ea64bd36827d2d4394244e5f58a4e4a324c84eeda060e1a27e267e0d536e5a0e45b0b6bdc2c43bbc"}]"#).unwrap(); 573 574 ndb.process_event(r#"["EVENT","b",{ "id": "232a02ec7e1b2febf85370b52ed49bf34e2701c385c3d563511508dcf0767bcf", "pubkey": "4a0510f26880d40e432f4865cb5714d9d3c200ca6ebb16b418ae6c555f574967", "created_at": 1736017863, "kind": 0, "tags": [ [ "client", "Damus Notedeck" ] ], "content": "{\"display_name\":\"KernelKind\",\"name\":\"KernelKind\",\"about\":\"hello from notedeck!\",\"lud16\":\"kernelkind@getalby.com\"}", "sig": "18c7dea0da3c30677d6822a31a6dfd9ebc02a18a31d69f0f2ac9ba88409e437d3db0ac433639111df1e4948a6d18451d1582173ee4fcd018d0ec92939f2c1506"}]"#).unwrap(); 575 576 for _ in 0..2 { 577 let _ = sub.next().await; 578 } 579 let txn = Transaction::new(&ndb).expect("txn"); 580 581 let res = ndb.search_profile(&txn, "kernel", 1); 582 assert!(res.is_ok()); 583 let res = res.unwrap(); 584 assert!(res.len() >= 1); 585 let kernelkind_bytes: [u8; 32] = [ 586 0x4a, 0x05, 0x10, 0xf2, 0x68, 0x80, 0xd4, 0x0e, 0x43, 0x2f, 0x48, 0x65, 0xcb, 0x57, 587 0x14, 0xd9, 0xd3, 0xc2, 0x00, 0xca, 0x6e, 0xbb, 0x16, 0xb4, 0x18, 0xae, 0x6c, 0x55, 588 0x5f, 0x57, 0x49, 0x67, 589 ]; 590 assert_eq!(kernelkind_bytes, **res.first().unwrap()); 591 592 let res = ndb.search_profile(&txn, "Derek", 1); 593 assert!(res.is_ok()); 594 let res = res.unwrap(); 595 assert!(res.len() >= 1); 596 let derek_bytes: [u8; 32] = [ 597 0x3f, 0x77, 0x0d, 0x65, 0xd3, 0xa7, 0x64, 0xa9, 0xc5, 0xcb, 0x50, 0x3a, 0xe1, 0x23, 598 0xe6, 0x2e, 0xc7, 0x59, 0x8a, 0xd0, 0x35, 0xd8, 0x36, 0xe2, 0xa8, 0x10, 0xf3, 0x87, 599 0x7a, 0x74, 0x5b, 0x24, 600 ]; 601 assert_eq!(derek_bytes, **res.first().unwrap()); 602 } 603 } 604 605 #[tokio::test] 606 async fn subscribe_event_works() { 607 let db = "target/testdbs/subscribe"; 608 test_util::cleanup_db(&db); 609 610 { 611 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 612 613 let filter = Filter::new().kinds(vec![1]).build(); 614 615 let sub = ndb.subscribe(&[filter]).expect("sub_id"); 616 let waiter = ndb.wait_for_notes(sub, 1); 617 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 618 let res = waiter.await.expect("await ok"); 619 assert_eq!(res, vec![NoteKey::new(1)]); 620 } 621 } 622 623 #[tokio::test] 624 async fn multiple_events_work() { 625 let db = "target/testdbs/multiple_events"; 626 test_util::cleanup_db(&db); 627 628 { 629 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 630 631 let filter = Filter::new().kinds(vec![1]).build(); 632 633 let sub_id = ndb.subscribe(&[filter]).expect("sub_id"); 634 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 635 636 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 637 ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok"); 638 ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok"); 639 640 // this pause causes problems 641 sleep(Duration::from_millis(100)).await; 642 643 ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok"); 644 ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok"); 645 ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok"); 646 647 let timeout_duration = Duration::from_secs(2); 648 let result = time::timeout(timeout_duration, async { 649 let mut count = 0; 650 while count < 6 { 651 let res = sub.next(); 652 let _ = res.await.expect("await ok"); 653 count += 1; 654 println!("saw an event, count = {}", count); 655 } 656 }) 657 .await; 658 659 match result { 660 Ok(_) => println!("Test completed successfully"), 661 Err(_) => panic!("Test timed out"), 662 } 663 } 664 } 665 666 #[tokio::test] 667 async fn multiple_events_with_final_pause_work() { 668 let db = "target/testdbs/multiple_events_with_final_pause"; 669 test_util::cleanup_db(&db); 670 671 { 672 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 673 674 let filter = Filter::new().kinds(vec![1]).build(); 675 676 let sub_id = ndb.subscribe(&[filter]).expect("sub_id"); 677 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 678 679 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 680 ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok"); 681 ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok"); 682 683 sleep(Duration::from_millis(100)).await; 684 685 ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok"); 686 ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok"); 687 ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok"); 688 689 // this final pause causes extra problems 690 sleep(Duration::from_millis(100)).await; 691 692 let timeout_duration = Duration::from_secs(2); 693 let result = time::timeout(timeout_duration, async { 694 let mut count = 0; 695 while count < 6 { 696 let res = sub.next(); 697 let _ = res.await.expect("await ok"); 698 count += 1; 699 println!("saw an event, count = {}", count); 700 } 701 }) 702 .await; 703 704 match result { 705 Ok(_) => println!("Test completed successfully"), 706 Err(_) => panic!("Test timed out"), 707 } 708 } 709 } 710 711 #[test] 712 fn poll_note_works() { 713 let db = "target/testdbs/poll"; 714 test_util::cleanup_db(&db); 715 716 { 717 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 718 719 let filter = Filter::new().kinds(vec![1]).build(); 720 721 let sub = ndb.subscribe(&[filter]).expect("sub_id"); 722 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 723 // this is too fast, we should have nothing 724 let res = ndb.poll_for_notes(sub, 1); 725 assert_eq!(res, vec![]); 726 727 std::thread::sleep(std::time::Duration::from_millis(150)); 728 // now we should have something 729 let res = ndb.poll_for_notes(sub, 1); 730 assert_eq!(res, vec![NoteKey::new(1)]); 731 } 732 } 733 734 #[test] 735 fn process_event_works() { 736 let db = "target/testdbs/event_works"; 737 test_util::cleanup_db(&db); 738 739 { 740 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 741 ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 742 } 743 744 { 745 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 746 let id = 747 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3") 748 .expect("hex id"); 749 let mut txn = Transaction::new(&ndb).expect("txn"); 750 let id_bytes: [u8; 32] = id.try_into().expect("id bytes"); 751 let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note"); 752 assert_eq!(note.kind(), 1); 753 } 754 } 755 756 #[test] 757 #[cfg(target_os = "windows")] 758 fn test_windows_large_mapsize() { 759 use std::{fs, path::Path}; 760 761 let db = "target/testdbs/windows_large_mapsize"; 762 test_util::cleanup_db(&db); 763 764 { 765 // 32 TiB should be way too big for CI 766 let config = 767 Config::new().set_mapsize(1024usize * 1024usize * 1024usize * 1024usize * 32usize); 768 769 // in this case, nostrdb should try to keep resizing to 770 // smaller mapsizes until success 771 772 let ndb = Ndb::new(db, &config); 773 774 assert!(ndb.is_ok()); 775 } 776 777 let file_len = fs::metadata(Path::new(db).join("data.mdb")) 778 .expect("metadata") 779 .len(); 780 781 assert!(file_len > 0); 782 783 if cfg!(target_os = "windows") { 784 // on windows the default mapsize will be 1MB when we fail 785 // to open it 786 assert_ne!(file_len, 1048576); 787 } else { 788 assert!(file_len < 1024u64 * 1024u64); 789 } 790 791 // we should definitely clean this up... especially on windows 792 test_util::cleanup_db(&db); 793 } 794 795 #[tokio::test] 796 async fn test_unsub_on_drop() { 797 let db = "target/testdbs/test_unsub_on_drop"; 798 test_util::cleanup_db(&db); 799 800 { 801 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 802 let sub_id = { 803 let filter = Filter::new().kinds(vec![1]).build(); 804 let filters = vec![filter]; 805 806 let sub_id = ndb.subscribe(&filters).expect("sub_id"); 807 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 808 809 let res = sub.next(); 810 811 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 812 813 let res = res.await.expect("await ok"); 814 assert_eq!(res, vec![NoteKey::new(1)]); 815 816 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id)); 817 sub_id 818 }; 819 820 // ensure subscription state is removed after stream is dropped 821 assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id)); 822 assert_eq!(ndb.subscription_count(), 0); 823 } 824 825 test_util::cleanup_db(&db); 826 } 827 828 #[tokio::test] 829 async fn test_stream() { 830 let db = "target/testdbs/test_stream"; 831 test_util::cleanup_db(&db); 832 833 { 834 let mut ndb = Ndb::new(db, &Config::new()).expect("ndb"); 835 let sub_id = { 836 let filter = Filter::new().kinds(vec![1]).build(); 837 let filters = vec![filter]; 838 839 let sub_id = ndb.subscribe(&filters).expect("sub_id"); 840 let mut sub = sub_id.stream(&ndb).notes_per_await(1); 841 842 let res = sub.next(); 843 844 let _ = ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 845 846 let _ = ndb.process_event(r#"["EVENT",{"content":"👀","created_at":1761514455,"id":"66af95a6bdfec756344f48241562b684082ff9c76ea940c11c4fd85e91e1219c","kind":7,"pubkey":"d5805ae449e108e907091c67cdf49a9835b3cac3dd11489ad215c0ddf7c658fc","sig":"69f4a3fe7c1cc6aa9c9cc4a2e90e4b71c3b9afaad262e68b92336e0493ff1a748b5dcc20ab6e86d4551dc5ea680ddfa1c08d47f9e4845927e143e8ef2183479b","tags":[["e","d44ad96cb8924092a76bc2afddeb12eb85233c0d03a7d9adc42c2a85a79a4305","wss://relay.primal.net/","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"],["p","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9","wss://relay.primal.net/"],["k","1"]]}]"#); 847 let res = res.await.expect("await ok"); 848 assert_eq!(res, vec![NoteKey::new(1)]); 849 850 // ensure that unsubscribing kills the stream 851 assert!(ndb.unsubscribe(sub_id).is_ok()); 852 assert!(sub.next().await.is_none()); 853 854 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id)); 855 sub_id 856 }; 857 858 // ensure subscription state is removed after stream is dropped 859 assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id)); 860 } 861 862 test_util::cleanup_db(&db); 863 } 864 }