ndb.rs (14726B)
1 use std::ffi::CString; 2 use std::ptr; 3 4 use crate::{ 5 bindings, Blocks, Config, Error, Filter, Note, NoteKey, ProfileKey, ProfileRecord, QueryResult, 6 Result, Subscription, Transaction, 7 }; 8 use std::fs; 9 use std::os::raw::c_int; 10 use std::path::Path; 11 use std::sync::Arc; 12 use tokio::task; // Make sure to import the task module 13 14 #[derive(Debug)] 15 struct NdbRef { 16 ndb: *mut bindings::ndb, 17 } 18 19 /// It's safe to have multi-threaded references to this because thread safety 20 /// is guaranteed by LMDB 21 unsafe impl Send for NdbRef {} 22 unsafe impl Sync for NdbRef {} 23 24 /// The database is automatically closed when [Ndb] is [Drop]ped. 25 impl Drop for NdbRef { 26 fn drop(&mut self) { 27 unsafe { 28 bindings::ndb_destroy(self.ndb); 29 } 30 } 31 } 32 33 /// A nostrdb context. Construct one of these with [Ndb::new]. 34 #[derive(Debug, Clone)] 35 pub struct Ndb { 36 refs: Arc<NdbRef>, 37 } 38 39 impl Ndb { 40 /// Construct a new nostrdb context. Takes a directory where the database 41 /// is/will be located and a nostrdb config. 42 pub fn new(db_dir: &str, config: &Config) -> Result<Self> { 43 let db_dir_cstr = match CString::new(db_dir) { 44 Ok(cstr) => cstr, 45 Err(_) => return Err(Error::DbOpenFailed), 46 }; 47 let mut ndb: *mut bindings::ndb = ptr::null_mut(); 48 49 let path = Path::new(db_dir); 50 if !path.exists() { 51 let _ = fs::create_dir_all(path); 52 } 53 54 let result = unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) }; 55 56 if result == 0 { 57 return Err(Error::DbOpenFailed); 58 } 59 60 let refs = Arc::new(NdbRef { ndb }); 61 Ok(Ndb { refs }) 62 } 63 64 /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]` 65 /// This function returns immediately and doesn't provide any information on 66 /// if ingestion was successful or not. 67 pub fn process_event(&self, json: &str) -> Result<()> { 68 // Convert the Rust string to a C-style string 69 let c_json = CString::new(json).expect("CString::new failed"); 70 let c_json_ptr = c_json.as_ptr(); 71 72 // Get the length of the string 73 let len = json.len() as libc::c_int; 74 75 let res = unsafe { bindings::ndb_process_event(self.as_ptr(), c_json_ptr, len) }; 76 77 if res == 0 { 78 return Err(Error::NoteProcessFailed); 79 } 80 81 Ok(()) 82 } 83 84 pub fn query<'a>( 85 &self, 86 txn: &'a Transaction, 87 filters: Vec<Filter>, 88 max_results: i32, 89 ) -> Result<Vec<QueryResult<'a>>> { 90 let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect(); 91 let mut out: Vec<bindings::ndb_query_result> = vec![]; 92 let mut returned: i32 = 0; 93 out.reserve_exact(max_results as usize); 94 let res = unsafe { 95 bindings::ndb_query( 96 txn.as_mut_ptr(), 97 ndb_filters.as_mut_ptr(), 98 ndb_filters.len() as i32, 99 out.as_mut_ptr(), 100 max_results, 101 &mut returned as *mut i32, 102 ) 103 }; 104 if res == 1 { 105 unsafe { 106 out.set_len(returned as usize); 107 }; 108 Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect()) 109 } else { 110 Err(Error::QueryError) 111 } 112 } 113 114 pub fn subscription_count(&self) -> u32 { 115 unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 } 116 } 117 118 pub fn unsubscribe(&self, sub_id: u64) -> Result<()> { 119 let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub_id) }; 120 121 if r == 0 { 122 Err(Error::SubscriptionError) 123 } else { 124 Ok(()) 125 } 126 } 127 128 pub fn subscribe(&self, filters: Vec<Filter>) -> Result<Subscription> { 129 unsafe { 130 let mut ndb_filters: Vec<bindings::ndb_filter> = 131 filters.iter().map(|a| a.data).collect(); 132 let id = bindings::ndb_subscribe( 133 self.as_ptr(), 134 ndb_filters.as_mut_ptr(), 135 filters.len() as i32, 136 ); 137 if id == 0 { 138 Err(Error::SubscriptionError) 139 } else { 140 Ok(Subscription { filters, id }) 141 } 142 } 143 } 144 145 pub fn poll_for_notes(&self, sub_id: u64, max_notes: u32) -> Vec<NoteKey> { 146 let mut vec = vec![]; 147 vec.reserve_exact(max_notes as usize); 148 149 unsafe { 150 let res = bindings::ndb_poll_for_notes( 151 self.as_ptr(), 152 sub_id, 153 vec.as_mut_ptr(), 154 max_notes as c_int, 155 ); 156 vec.set_len(res as usize); 157 }; 158 159 vec.into_iter().map(NoteKey::new).collect() 160 } 161 162 pub async fn wait_for_notes(&self, sub_id: u64, max_notes: u32) -> Result<Vec<NoteKey>> { 163 let ndb = self.clone(); 164 let handle = task::spawn_blocking(move || { 165 let mut vec: Vec<u64> = vec![]; 166 vec.reserve_exact(max_notes as usize); 167 let res = unsafe { 168 bindings::ndb_wait_for_notes( 169 ndb.as_ptr(), 170 sub_id, 171 vec.as_mut_ptr(), 172 max_notes as c_int, 173 ) 174 }; 175 if res == 0 { 176 Err(Error::SubscriptionError) 177 } else { 178 unsafe { 179 vec.set_len(res as usize); 180 }; 181 Ok(vec) 182 } 183 }); 184 185 match handle.await { 186 Ok(Ok(res)) => Ok(res.into_iter().map(NoteKey::new).collect()), 187 Ok(Err(err)) => Err(err), 188 Err(_) => Err(Error::SubscriptionError), 189 } 190 } 191 192 pub fn get_profile_by_key<'a>( 193 &self, 194 transaction: &'a Transaction, 195 key: ProfileKey, 196 ) -> Result<ProfileRecord<'a>> { 197 let mut len: usize = 0; 198 199 let profile_record_ptr = unsafe { 200 bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len) 201 }; 202 203 if profile_record_ptr.is_null() { 204 // Handle null pointer (e.g., note not found or error occurred) 205 return Err(Error::NotFound); 206 } 207 208 // Convert the raw pointer to a Note instance 209 Ok(ProfileRecord::new_transactional( 210 profile_record_ptr, 211 len, 212 key, 213 transaction, 214 )) 215 } 216 217 pub fn get_profile_by_pubkey<'a>( 218 &self, 219 transaction: &'a Transaction, 220 id: &[u8; 32], 221 ) -> Result<ProfileRecord<'a>> { 222 let mut len: usize = 0; 223 let mut primkey: u64 = 0; 224 225 let profile_record_ptr = unsafe { 226 bindings::ndb_get_profile_by_pubkey( 227 transaction.as_mut_ptr(), 228 id.as_ptr(), 229 &mut len, 230 &mut primkey, 231 ) 232 }; 233 234 if profile_record_ptr.is_null() { 235 // Handle null pointer (e.g., note not found or error occurred) 236 return Err(Error::NotFound); 237 } 238 239 // Convert the raw pointer to a Note instance 240 Ok(ProfileRecord::new_transactional( 241 profile_record_ptr, 242 len, 243 ProfileKey::new(primkey), 244 transaction, 245 )) 246 } 247 248 pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<u64> { 249 let res = unsafe { 250 bindings::ndb_get_notekey_by_id( 251 txn.as_mut_ptr(), 252 id.as_ptr() as *const ::std::os::raw::c_uchar, 253 ) 254 }; 255 256 if res == 0 { 257 return Err(Error::NotFound); 258 } 259 260 Ok(res) 261 } 262 263 pub fn get_blocks_by_key<'a>( 264 &self, 265 txn: &'a Transaction, 266 note_key: NoteKey, 267 ) -> Result<Blocks<'a>> { 268 let blocks_ptr = unsafe { 269 bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64()) 270 }; 271 272 if blocks_ptr.is_null() { 273 return Err(Error::NotFound); 274 } 275 276 Ok(Blocks::new_transactional(blocks_ptr, txn)) 277 } 278 279 pub fn get_note_by_key<'a>( 280 &self, 281 transaction: &'a Transaction, 282 note_key: NoteKey, 283 ) -> Result<Note<'a>> { 284 let mut len: usize = 0; 285 286 let note_ptr = unsafe { 287 bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len) 288 }; 289 290 if note_ptr.is_null() { 291 // Handle null pointer (e.g., note not found or error occurred) 292 return Err(Error::NotFound); 293 } 294 295 // Convert the raw pointer to a Note instance 296 Ok(Note::new_transactional( 297 note_ptr, 298 len, 299 note_key, 300 transaction, 301 )) 302 } 303 304 /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id 305 pub fn get_note_by_id<'a>( 306 &self, 307 transaction: &'a Transaction, 308 id: &[u8; 32], 309 ) -> Result<Note<'a>> { 310 let mut len: usize = 0; 311 let mut primkey: u64 = 0; 312 313 let note_ptr = unsafe { 314 bindings::ndb_get_note_by_id( 315 transaction.as_mut_ptr(), 316 id.as_ptr(), 317 &mut len, 318 &mut primkey, 319 ) 320 }; 321 322 if note_ptr.is_null() { 323 // Handle null pointer (e.g., note not found or error occurred) 324 return Err(Error::NotFound); 325 } 326 327 // Convert the raw pointer to a Note instance 328 Ok(Note::new_transactional( 329 note_ptr, 330 len, 331 NoteKey::new(primkey), 332 transaction, 333 )) 334 } 335 336 /// Get the underlying pointer to the context in C 337 pub fn as_ptr(&self) -> *mut bindings::ndb { 338 self.refs.ndb 339 } 340 } 341 342 #[cfg(test)] 343 mod tests { 344 use super::*; 345 use crate::config::Config; 346 use crate::test_util; 347 348 #[test] 349 fn ndb_init_works() { 350 let db = "target/testdbs/init_works"; 351 test_util::cleanup_db(db); 352 353 { 354 let cfg = Config::new(); 355 let _ = Ndb::new(db, &cfg).expect("ok"); 356 } 357 } 358 359 #[tokio::test] 360 async fn query_works() { 361 let db = "target/testdbs/query"; 362 test_util::cleanup_db(&db); 363 364 { 365 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 366 367 let filter = Filter::new().kinds(vec![1]).build(); 368 let filters = vec![filter]; 369 370 let sub = ndb.subscribe(filters.clone()).expect("sub_id"); 371 let waiter = ndb.wait_for_notes(sub.id, 1); 372 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 373 let res = waiter.await.expect("await ok"); 374 assert_eq!(res, vec![NoteKey::new(1)]); 375 let txn = Transaction::new(&ndb).expect("txn"); 376 let res = ndb.query(&txn, filters, 1).expect("query ok"); 377 assert_eq!(res.len(), 1); 378 assert_eq!( 379 hex::encode(res[0].note.id()), 380 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3" 381 ); 382 } 383 } 384 385 #[tokio::test] 386 async fn subscribe_event_works() { 387 let db = "target/testdbs/subscribe"; 388 test_util::cleanup_db(&db); 389 390 { 391 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 392 393 let filter = Filter::new().kinds(vec![1]).build(); 394 395 let sub = ndb.subscribe(vec![filter]).expect("sub_id"); 396 let waiter = ndb.wait_for_notes(sub.id, 1); 397 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 398 let res = waiter.await.expect("await ok"); 399 assert_eq!(res, vec![NoteKey::new(1)]); 400 } 401 } 402 403 #[test] 404 fn poll_note_works() { 405 let db = "target/testdbs/poll"; 406 test_util::cleanup_db(&db); 407 408 { 409 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 410 411 let filter = Filter::new().kinds(vec![1]).build(); 412 413 let sub = ndb.subscribe(vec![filter]).expect("sub_id"); 414 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 415 // this is too fast, we should have nothing 416 let res = ndb.poll_for_notes(sub.id, 1); 417 assert_eq!(res, vec![]); 418 419 std::thread::sleep(std::time::Duration::from_millis(100)); 420 // now we should have something 421 let res = ndb.poll_for_notes(sub.id, 1); 422 assert_eq!(res, vec![NoteKey::new(1)]); 423 } 424 } 425 426 #[test] 427 fn process_event_works() { 428 let db = "target/testdbs/event_works"; 429 test_util::cleanup_db(&db); 430 431 { 432 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 433 ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 434 } 435 436 { 437 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 438 let id = 439 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3") 440 .expect("hex id"); 441 let mut txn = Transaction::new(&ndb).expect("txn"); 442 let id_bytes: [u8; 32] = id.try_into().expect("id bytes"); 443 let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note"); 444 assert_eq!(note.kind(), 1); 445 } 446 } 447 }