ndb.rs (17116B)
1 use std::ffi::CString; 2 use std::ptr; 3 4 use crate::{ 5 bindings, Blocks, Config, Error, Filter, Note, NoteKey, ProfileKey, ProfileRecord, QueryResult, 6 Result, Subscription, Transaction, 7 }; 8 use std::fs; 9 use std::os::raw::c_int; 10 use std::path::Path; 11 use std::sync::Arc; 12 use tokio::task; // Make sure to import the task module 13 use tracing::debug; 14 15 #[derive(Debug)] 16 struct NdbRef { 17 ndb: *mut bindings::ndb, 18 19 /// Have we configured a rust closure for our callback? If so we need 20 /// to clean that up when this is dropped 21 has_rust_closure: bool, 22 rust_cb_ctx: *mut ::std::os::raw::c_void, 23 } 24 25 /// SAFETY: thread safety is ensured by nostrdb 26 unsafe impl Send for NdbRef {} 27 28 /// SAFETY: thread safety is ensured by nostrdb 29 unsafe impl Sync for NdbRef {} 30 31 /// The database is automatically closed when [Ndb] is [Drop]ped. 32 impl Drop for NdbRef { 33 fn drop(&mut self) { 34 unsafe { 35 bindings::ndb_destroy(self.ndb); 36 37 if self.has_rust_closure && !self.rust_cb_ctx.is_null() { 38 // Rebuild the Box from the raw pointer and drop it. 39 let _ = Box::from_raw(self.rust_cb_ctx as *mut Box<dyn FnMut()>); 40 } 41 } 42 } 43 } 44 45 /// A nostrdb context. Construct one of these with [Ndb::new]. 46 #[derive(Debug, Clone)] 47 pub struct Ndb { 48 refs: Arc<NdbRef>, 49 } 50 51 impl Ndb { 52 /// Construct a new nostrdb context. Takes a directory where the database 53 /// is/will be located and a nostrdb config. 54 pub fn new(db_dir: &str, config: &Config) -> Result<Self> { 55 let db_dir_cstr = match CString::new(db_dir) { 56 Ok(cstr) => cstr, 57 Err(_) => return Err(Error::DbOpenFailed), 58 }; 59 let mut ndb: *mut bindings::ndb = ptr::null_mut(); 60 61 let path = Path::new(db_dir); 62 if !path.exists() { 63 let _ = fs::create_dir_all(path); 64 } 65 66 let min_mapsize = 1024 * 1024 * 512; 67 let mut mapsize = config.config.mapsize; 68 let mut config = *config; 69 70 let result = loop { 71 let result = 72 unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) }; 73 74 if result == 0 { 75 mapsize /= 2; 76 config = config.set_mapsize(mapsize); 77 debug!("ndb init failed, reducing mapsize to {}", mapsize); 78 79 if mapsize > min_mapsize { 80 continue; 81 } else { 82 break 0; 83 } 84 } else { 85 break result; 86 } 87 }; 88 89 if result == 0 { 90 return Err(Error::DbOpenFailed); 91 } 92 93 let has_rust_closure = !config.config.sub_cb_ctx.is_null(); 94 let rust_cb_ctx = config.config.sub_cb_ctx; 95 let refs = Arc::new(NdbRef { 96 ndb, 97 has_rust_closure, 98 rust_cb_ctx, 99 }); 100 101 Ok(Ndb { refs }) 102 } 103 104 /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]` 105 /// This function returns immediately and doesn't provide any information on 106 /// if ingestion was successful or not. 107 pub fn process_event(&self, json: &str) -> Result<()> { 108 // Convert the Rust string to a C-style string 109 let c_json = CString::new(json).expect("CString::new failed"); 110 let c_json_ptr = c_json.as_ptr(); 111 112 // Get the length of the string 113 let len = json.len() as libc::c_int; 114 115 let res = unsafe { bindings::ndb_process_event(self.as_ptr(), c_json_ptr, len) }; 116 117 if res == 0 { 118 return Err(Error::NoteProcessFailed); 119 } 120 121 Ok(()) 122 } 123 124 pub fn query<'a>( 125 &self, 126 txn: &'a Transaction, 127 filters: &[Filter], 128 max_results: i32, 129 ) -> Result<Vec<QueryResult<'a>>> { 130 let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect(); 131 let mut out: Vec<bindings::ndb_query_result> = vec![]; 132 let mut returned: i32 = 0; 133 out.reserve_exact(max_results as usize); 134 let res = unsafe { 135 bindings::ndb_query( 136 txn.as_mut_ptr(), 137 ndb_filters.as_mut_ptr(), 138 ndb_filters.len() as i32, 139 out.as_mut_ptr(), 140 max_results, 141 &mut returned as *mut i32, 142 ) 143 }; 144 if res == 1 { 145 unsafe { 146 out.set_len(returned as usize); 147 }; 148 Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect()) 149 } else { 150 Err(Error::QueryError) 151 } 152 } 153 154 pub fn subscription_count(&self) -> u32 { 155 unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 } 156 } 157 158 pub fn unsubscribe(&self, sub: Subscription) -> Result<()> { 159 let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub.id()) }; 160 161 if r == 0 { 162 Err(Error::SubscriptionError) 163 } else { 164 Ok(()) 165 } 166 } 167 168 pub fn subscribe(&self, filters: &[Filter]) -> Result<Subscription> { 169 unsafe { 170 let mut ndb_filters: Vec<bindings::ndb_filter> = 171 filters.iter().map(|a| a.data).collect(); 172 let id = bindings::ndb_subscribe( 173 self.as_ptr(), 174 ndb_filters.as_mut_ptr(), 175 filters.len() as i32, 176 ); 177 if id == 0 { 178 Err(Error::SubscriptionError) 179 } else { 180 Ok(Subscription::new(id)) 181 } 182 } 183 } 184 185 pub fn poll_for_notes(&self, sub: Subscription, max_notes: u32) -> Vec<NoteKey> { 186 let mut vec = vec![]; 187 vec.reserve_exact(max_notes as usize); 188 189 unsafe { 190 let res = bindings::ndb_poll_for_notes( 191 self.as_ptr(), 192 sub.id(), 193 vec.as_mut_ptr(), 194 max_notes as c_int, 195 ); 196 vec.set_len(res as usize); 197 }; 198 199 vec.into_iter().map(NoteKey::new).collect() 200 } 201 202 pub async fn wait_for_notes( 203 &self, 204 sub_id: Subscription, 205 max_notes: u32, 206 ) -> Result<Vec<NoteKey>> { 207 let ndb = self.clone(); 208 let handle = task::spawn_blocking(move || { 209 let mut vec: Vec<u64> = vec![]; 210 vec.reserve_exact(max_notes as usize); 211 let res = unsafe { 212 bindings::ndb_wait_for_notes( 213 ndb.as_ptr(), 214 sub_id.id(), 215 vec.as_mut_ptr(), 216 max_notes as c_int, 217 ) 218 }; 219 if res == 0 { 220 Err(Error::SubscriptionError) 221 } else { 222 unsafe { 223 vec.set_len(res as usize); 224 }; 225 Ok(vec) 226 } 227 }); 228 229 match handle.await { 230 Ok(Ok(res)) => Ok(res.into_iter().map(NoteKey::new).collect()), 231 Ok(Err(err)) => Err(err), 232 Err(_) => Err(Error::SubscriptionError), 233 } 234 } 235 236 pub fn get_profile_by_key<'a>( 237 &self, 238 transaction: &'a Transaction, 239 key: ProfileKey, 240 ) -> Result<ProfileRecord<'a>> { 241 let mut len: usize = 0; 242 243 let profile_record_ptr = unsafe { 244 bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len) 245 }; 246 247 if profile_record_ptr.is_null() { 248 // Handle null pointer (e.g., note not found or error occurred) 249 return Err(Error::NotFound); 250 } 251 252 // Convert the raw pointer to a Note instance 253 Ok(ProfileRecord::new_transactional( 254 profile_record_ptr, 255 len, 256 key, 257 transaction, 258 )) 259 } 260 261 pub fn get_profile_by_pubkey<'a>( 262 &self, 263 transaction: &'a Transaction, 264 id: &[u8; 32], 265 ) -> Result<ProfileRecord<'a>> { 266 let mut len: usize = 0; 267 let mut primkey: u64 = 0; 268 269 let profile_record_ptr = unsafe { 270 bindings::ndb_get_profile_by_pubkey( 271 transaction.as_mut_ptr(), 272 id.as_ptr(), 273 &mut len, 274 &mut primkey, 275 ) 276 }; 277 278 if profile_record_ptr.is_null() { 279 // Handle null pointer (e.g., note not found or error occurred) 280 return Err(Error::NotFound); 281 } 282 283 // Convert the raw pointer to a Note instance 284 Ok(ProfileRecord::new_transactional( 285 profile_record_ptr, 286 len, 287 ProfileKey::new(primkey), 288 transaction, 289 )) 290 } 291 292 pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<u64> { 293 let res = unsafe { 294 bindings::ndb_get_notekey_by_id( 295 txn.as_mut_ptr(), 296 id.as_ptr() as *const ::std::os::raw::c_uchar, 297 ) 298 }; 299 300 if res == 0 { 301 return Err(Error::NotFound); 302 } 303 304 Ok(res) 305 } 306 307 pub fn get_blocks_by_key<'a>( 308 &self, 309 txn: &'a Transaction, 310 note_key: NoteKey, 311 ) -> Result<Blocks<'a>> { 312 let blocks_ptr = unsafe { 313 bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64()) 314 }; 315 316 if blocks_ptr.is_null() { 317 return Err(Error::NotFound); 318 } 319 320 Ok(Blocks::new_transactional(blocks_ptr, txn)) 321 } 322 323 pub fn get_note_by_key<'a>( 324 &self, 325 transaction: &'a Transaction, 326 note_key: NoteKey, 327 ) -> Result<Note<'a>> { 328 let mut len: usize = 0; 329 330 let note_ptr = unsafe { 331 bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len) 332 }; 333 334 if note_ptr.is_null() { 335 // Handle null pointer (e.g., note not found or error occurred) 336 return Err(Error::NotFound); 337 } 338 339 // Convert the raw pointer to a Note instance 340 Ok(Note::new_transactional( 341 note_ptr, 342 len, 343 note_key, 344 transaction, 345 )) 346 } 347 348 /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id 349 pub fn get_note_by_id<'a>( 350 &self, 351 transaction: &'a Transaction, 352 id: &[u8; 32], 353 ) -> Result<Note<'a>> { 354 let mut len: usize = 0; 355 let mut primkey: u64 = 0; 356 357 let note_ptr = unsafe { 358 bindings::ndb_get_note_by_id( 359 transaction.as_mut_ptr(), 360 id.as_ptr(), 361 &mut len, 362 &mut primkey, 363 ) 364 }; 365 366 if note_ptr.is_null() { 367 // Handle null pointer (e.g., note not found or error occurred) 368 return Err(Error::NotFound); 369 } 370 371 // Convert the raw pointer to a Note instance 372 Ok(Note::new_transactional( 373 note_ptr, 374 len, 375 NoteKey::new(primkey), 376 transaction, 377 )) 378 } 379 380 /// Get the underlying pointer to the context in C 381 pub fn as_ptr(&self) -> *mut bindings::ndb { 382 self.refs.ndb 383 } 384 } 385 386 #[cfg(test)] 387 mod tests { 388 use super::*; 389 use crate::config::Config; 390 use crate::test_util; 391 392 #[test] 393 fn ndb_init_works() { 394 let db = "target/testdbs/init_works"; 395 test_util::cleanup_db(db); 396 397 { 398 let cfg = Config::new(); 399 let _ = Ndb::new(db, &cfg).expect("ok"); 400 } 401 } 402 403 #[tokio::test] 404 async fn query_works() { 405 let db = "target/testdbs/query"; 406 test_util::cleanup_db(&db); 407 408 { 409 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 410 411 let filter = Filter::new().kinds(vec![1]).build(); 412 let filters = vec![filter]; 413 414 let sub = ndb.subscribe(&filters).expect("sub_id"); 415 let waiter = ndb.wait_for_notes(sub, 1); 416 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 417 let res = waiter.await.expect("await ok"); 418 assert_eq!(res, vec![NoteKey::new(1)]); 419 let txn = Transaction::new(&ndb).expect("txn"); 420 let res = ndb.query(&txn, &filters, 1).expect("query ok"); 421 assert_eq!(res.len(), 1); 422 assert_eq!( 423 hex::encode(res[0].note.id()), 424 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3" 425 ); 426 } 427 } 428 429 #[tokio::test] 430 async fn subscribe_event_works() { 431 let db = "target/testdbs/subscribe"; 432 test_util::cleanup_db(&db); 433 434 { 435 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 436 437 let filter = Filter::new().kinds(vec![1]).build(); 438 439 let sub = ndb.subscribe(&[filter]).expect("sub_id"); 440 let waiter = ndb.wait_for_notes(sub, 1); 441 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 442 let res = waiter.await.expect("await ok"); 443 assert_eq!(res, vec![NoteKey::new(1)]); 444 } 445 } 446 447 #[test] 448 fn poll_note_works() { 449 let db = "target/testdbs/poll"; 450 test_util::cleanup_db(&db); 451 452 { 453 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 454 455 let filter = Filter::new().kinds(vec![1]).build(); 456 457 let sub = ndb.subscribe(&[filter]).expect("sub_id"); 458 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 459 // this is too fast, we should have nothing 460 let res = ndb.poll_for_notes(sub, 1); 461 assert_eq!(res, vec![]); 462 463 std::thread::sleep(std::time::Duration::from_millis(100)); 464 // now we should have something 465 let res = ndb.poll_for_notes(sub, 1); 466 assert_eq!(res, vec![NoteKey::new(1)]); 467 } 468 } 469 470 #[test] 471 fn process_event_works() { 472 let db = "target/testdbs/event_works"; 473 test_util::cleanup_db(&db); 474 475 { 476 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 477 ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 478 } 479 480 { 481 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 482 let id = 483 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3") 484 .expect("hex id"); 485 let mut txn = Transaction::new(&ndb).expect("txn"); 486 let id_bytes: [u8; 32] = id.try_into().expect("id bytes"); 487 let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note"); 488 assert_eq!(note.kind(), 1); 489 } 490 } 491 492 #[test] 493 #[cfg(target_os = "windows")] 494 fn test_windows_large_mapsize() { 495 use std::{fs, path::Path}; 496 497 let db = "target/testdbs/windows_large_mapsize"; 498 test_util::cleanup_db(&db); 499 500 { 501 // 32 TiB should be way too big for CI 502 let config = 503 Config::new().set_mapsize(1024usize * 1024usize * 1024usize * 1024usize * 32usize); 504 505 // in this case, nostrdb should try to keep resizing to 506 // smaller mapsizes until success 507 508 let ndb = Ndb::new(db, &config); 509 510 assert!(ndb.is_ok()); 511 } 512 513 let file_len = fs::metadata(Path::new(db).join("data.mdb")) 514 .expect("metadata") 515 .len(); 516 517 assert!(file_len > 0); 518 519 if cfg!(target_os = "windows") { 520 // on windows the default mapsize will be 1MB when we fail 521 // to open it 522 assert_ne!(file_len, 1048576); 523 } else { 524 assert!(file_len < 1024u64 * 1024u64); 525 } 526 527 // we should definitely clean this up... especially on windows 528 test_util::cleanup_db(&db); 529 } 530 }