ndb.rs (14412B)
1 use std::ffi::CString; 2 use std::ptr; 3 4 use crate::{ 5 bindings, Blocks, Config, Error, Filter, Note, NoteKey, ProfileKey, ProfileRecord, QueryResult, 6 Result, Subscription, Transaction, 7 }; 8 use std::fs; 9 use std::os::raw::c_int; 10 use std::path::Path; 11 use std::sync::Arc; 12 use tokio::task; // Make sure to import the task module 13 14 #[derive(Debug)] 15 struct NdbRef { 16 ndb: *mut bindings::ndb, 17 } 18 19 /// It's safe to have multi-threaded references to this because thread safety 20 /// is guaranteed by LMDB 21 unsafe impl Send for NdbRef {} 22 unsafe impl Sync for NdbRef {} 23 24 /// The database is automatically closed when [Ndb] is [Drop]ped. 25 impl Drop for NdbRef { 26 fn drop(&mut self) { 27 unsafe { 28 bindings::ndb_destroy(self.ndb); 29 } 30 } 31 } 32 33 /// A nostrdb context. Construct one of these with [Ndb::new]. 34 #[derive(Debug, Clone)] 35 pub struct Ndb { 36 refs: Arc<NdbRef>, 37 } 38 39 impl Ndb { 40 /// Construct a new nostrdb context. Takes a directory where the database 41 /// is/will be located and a nostrdb config. 42 pub fn new(db_dir: &str, config: &Config) -> Result<Self> { 43 let db_dir_cstr = match CString::new(db_dir) { 44 Ok(cstr) => cstr, 45 Err(_) => return Err(Error::DbOpenFailed), 46 }; 47 let mut ndb: *mut bindings::ndb = ptr::null_mut(); 48 49 let path = Path::new(db_dir); 50 if !path.exists() { 51 let _ = fs::create_dir_all(path); 52 } 53 54 let result = unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) }; 55 56 if result == 0 { 57 return Err(Error::DbOpenFailed); 58 } 59 60 let refs = Arc::new(NdbRef { ndb }); 61 Ok(Ndb { refs }) 62 } 63 64 /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]` 65 /// This function returns immediately and doesn't provide any information on 66 /// if ingestion was successful or not. 67 pub fn process_event(&self, json: &str) -> Result<()> { 68 // Convert the Rust string to a C-style string 69 let c_json = CString::new(json).expect("CString::new failed"); 70 let c_json_ptr = c_json.as_ptr(); 71 72 // Get the length of the string 73 let len = json.len() as libc::c_int; 74 75 let res = unsafe { bindings::ndb_process_event(self.as_ptr(), c_json_ptr, len) }; 76 77 if res == 0 { 78 return Err(Error::NoteProcessFailed); 79 } 80 81 Ok(()) 82 } 83 84 pub fn query<'a>( 85 &self, 86 txn: &'a Transaction, 87 filters: Vec<Filter>, 88 max_results: i32, 89 ) -> Result<Vec<QueryResult<'a>>> { 90 let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect(); 91 let mut out: Vec<bindings::ndb_query_result> = vec![]; 92 let mut returned: i32 = 0; 93 out.reserve_exact(max_results as usize); 94 let res = unsafe { 95 bindings::ndb_query( 96 txn.as_mut_ptr(), 97 ndb_filters.as_mut_ptr(), 98 ndb_filters.len() as i32, 99 out.as_mut_ptr(), 100 max_results, 101 &mut returned as *mut i32, 102 ) 103 }; 104 if res == 1 { 105 unsafe { 106 out.set_len(returned as usize); 107 }; 108 Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect()) 109 } else { 110 Err(Error::QueryError) 111 } 112 } 113 114 pub fn subscribe(&self, filters: Vec<Filter>) -> Result<Subscription> { 115 unsafe { 116 let mut ndb_filters: Vec<bindings::ndb_filter> = 117 filters.iter().map(|a| a.data).collect(); 118 let id = bindings::ndb_subscribe( 119 self.as_ptr(), 120 ndb_filters.as_mut_ptr(), 121 filters.len() as i32, 122 ); 123 if id == 0 { 124 Err(Error::SubscriptionError) 125 } else { 126 Ok(Subscription { filters, id }) 127 } 128 } 129 } 130 131 pub fn poll_for_notes(&self, sub: &Subscription, max_notes: u32) -> Vec<NoteKey> { 132 let mut vec = vec![]; 133 vec.reserve_exact(max_notes as usize); 134 let sub_id = sub.id; 135 136 unsafe { 137 let res = bindings::ndb_poll_for_notes( 138 self.as_ptr(), 139 sub_id, 140 vec.as_mut_ptr(), 141 max_notes as c_int, 142 ); 143 vec.set_len(res as usize); 144 }; 145 146 vec.into_iter().map(NoteKey::new).collect() 147 } 148 149 pub async fn wait_for_notes(&self, sub: &Subscription, max_notes: u32) -> Result<Vec<NoteKey>> { 150 let ndb = self.clone(); 151 let sub_id = sub.id; 152 let handle = task::spawn_blocking(move || { 153 let mut vec: Vec<u64> = vec![]; 154 vec.reserve_exact(max_notes as usize); 155 let res = unsafe { 156 bindings::ndb_wait_for_notes( 157 ndb.as_ptr(), 158 sub_id, 159 vec.as_mut_ptr(), 160 max_notes as c_int, 161 ) 162 }; 163 if res == 0 { 164 Err(Error::SubscriptionError) 165 } else { 166 unsafe { 167 vec.set_len(res as usize); 168 }; 169 Ok(vec) 170 } 171 }); 172 173 match handle.await { 174 Ok(Ok(res)) => Ok(res.into_iter().map(NoteKey::new).collect()), 175 Ok(Err(err)) => Err(err), 176 Err(_) => Err(Error::SubscriptionError), 177 } 178 } 179 180 pub fn get_profile_by_key<'a>( 181 &self, 182 transaction: &'a Transaction, 183 key: ProfileKey, 184 ) -> Result<ProfileRecord<'a>> { 185 let mut len: usize = 0; 186 187 let profile_record_ptr = unsafe { 188 bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len) 189 }; 190 191 if profile_record_ptr.is_null() { 192 // Handle null pointer (e.g., note not found or error occurred) 193 return Err(Error::NotFound); 194 } 195 196 // Convert the raw pointer to a Note instance 197 Ok(ProfileRecord::new_transactional( 198 profile_record_ptr, 199 len, 200 key, 201 transaction, 202 )) 203 } 204 205 pub fn get_profile_by_pubkey<'a>( 206 &self, 207 transaction: &'a Transaction, 208 id: &[u8; 32], 209 ) -> Result<ProfileRecord<'a>> { 210 let mut len: usize = 0; 211 let mut primkey: u64 = 0; 212 213 let profile_record_ptr = unsafe { 214 bindings::ndb_get_profile_by_pubkey( 215 transaction.as_mut_ptr(), 216 id.as_ptr(), 217 &mut len, 218 &mut primkey, 219 ) 220 }; 221 222 if profile_record_ptr.is_null() { 223 // Handle null pointer (e.g., note not found or error occurred) 224 return Err(Error::NotFound); 225 } 226 227 // Convert the raw pointer to a Note instance 228 Ok(ProfileRecord::new_transactional( 229 profile_record_ptr, 230 len, 231 ProfileKey::new(primkey), 232 transaction, 233 )) 234 } 235 236 pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<u64> { 237 let res = unsafe { 238 bindings::ndb_get_notekey_by_id( 239 txn.as_mut_ptr(), 240 id.as_ptr() as *const ::std::os::raw::c_uchar, 241 ) 242 }; 243 244 if res == 0 { 245 return Err(Error::NotFound); 246 } 247 248 Ok(res) 249 } 250 251 pub fn get_blocks_by_key<'a>( 252 &self, 253 txn: &'a Transaction, 254 note_key: NoteKey, 255 ) -> Result<Blocks<'a>> { 256 let blocks_ptr = unsafe { 257 bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64()) 258 }; 259 260 if blocks_ptr.is_null() { 261 return Err(Error::NotFound); 262 } 263 264 Ok(Blocks::new_transactional(blocks_ptr, txn)) 265 } 266 267 pub fn get_note_by_key<'a>( 268 &self, 269 transaction: &'a Transaction, 270 note_key: NoteKey, 271 ) -> Result<Note<'a>> { 272 let mut len: usize = 0; 273 274 let note_ptr = unsafe { 275 bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len) 276 }; 277 278 if note_ptr.is_null() { 279 // Handle null pointer (e.g., note not found or error occurred) 280 return Err(Error::NotFound); 281 } 282 283 // Convert the raw pointer to a Note instance 284 Ok(Note::new_transactional( 285 note_ptr, 286 len, 287 note_key, 288 transaction, 289 )) 290 } 291 292 /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id 293 pub fn get_note_by_id<'a>( 294 &self, 295 transaction: &'a Transaction, 296 id: &[u8; 32], 297 ) -> Result<Note<'a>> { 298 let mut len: usize = 0; 299 let mut primkey: u64 = 0; 300 301 let note_ptr = unsafe { 302 bindings::ndb_get_note_by_id( 303 transaction.as_mut_ptr(), 304 id.as_ptr(), 305 &mut len, 306 &mut primkey, 307 ) 308 }; 309 310 if note_ptr.is_null() { 311 // Handle null pointer (e.g., note not found or error occurred) 312 return Err(Error::NotFound); 313 } 314 315 // Convert the raw pointer to a Note instance 316 Ok(Note::new_transactional( 317 note_ptr, 318 len, 319 NoteKey::new(primkey), 320 transaction, 321 )) 322 } 323 324 /// Get the underlying pointer to the context in C 325 pub fn as_ptr(&self) -> *mut bindings::ndb { 326 self.refs.ndb 327 } 328 } 329 330 #[cfg(test)] 331 mod tests { 332 use super::*; 333 use crate::config::Config; 334 use crate::test_util; 335 336 #[test] 337 fn ndb_init_works() { 338 let db = "target/testdbs/init_works"; 339 test_util::cleanup_db(db); 340 341 { 342 let cfg = Config::new(); 343 let _ = Ndb::new(db, &cfg).expect("ok"); 344 } 345 } 346 347 #[tokio::test] 348 async fn query_works() { 349 let db = "target/testdbs/query"; 350 test_util::cleanup_db(&db); 351 352 { 353 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 354 355 let filter = Filter::new().kinds(vec![1]).build(); 356 let filters = vec![filter]; 357 358 let sub = ndb.subscribe(filters.clone()).expect("sub_id"); 359 let waiter = ndb.wait_for_notes(&sub, 1); 360 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 361 let res = waiter.await.expect("await ok"); 362 assert_eq!(res, vec![NoteKey::new(1)]); 363 let txn = Transaction::new(&ndb).expect("txn"); 364 let res = ndb.query(&txn, filters, 1).expect("query ok"); 365 assert_eq!(res.len(), 1); 366 assert_eq!( 367 hex::encode(res[0].note.id()), 368 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3" 369 ); 370 } 371 } 372 373 #[tokio::test] 374 async fn subscribe_event_works() { 375 let db = "target/testdbs/subscribe"; 376 test_util::cleanup_db(&db); 377 378 { 379 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 380 381 let filter = Filter::new().kinds(vec![1]).build(); 382 383 let sub = ndb.subscribe(vec![filter]).expect("sub_id"); 384 let waiter = ndb.wait_for_notes(&sub, 1); 385 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 386 let res = waiter.await.expect("await ok"); 387 assert_eq!(res, vec![NoteKey::new(1)]); 388 } 389 } 390 391 #[test] 392 fn poll_note_works() { 393 let db = "target/testdbs/poll"; 394 test_util::cleanup_db(&db); 395 396 { 397 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 398 399 let filter = Filter::new().kinds(vec![1]).build(); 400 401 let sub = ndb.subscribe(vec![filter]).expect("sub_id"); 402 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 403 // this is too fast, we should have nothing 404 let res = ndb.poll_for_notes(&sub, 1); 405 assert_eq!(res, vec![]); 406 407 std::thread::sleep(std::time::Duration::from_millis(100)); 408 // now we should have something 409 let res = ndb.poll_for_notes(&sub, 1); 410 assert_eq!(res, vec![NoteKey::new(1)]); 411 } 412 } 413 414 #[test] 415 fn process_event_works() { 416 let db = "target/testdbs/event_works"; 417 test_util::cleanup_db(&db); 418 419 { 420 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 421 ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok"); 422 } 423 424 { 425 let ndb = Ndb::new(db, &Config::new()).expect("ndb"); 426 let id = 427 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3") 428 .expect("hex id"); 429 let mut txn = Transaction::new(&ndb).expect("txn"); 430 let id_bytes: [u8; 32] = id.try_into().expect("id bytes"); 431 let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note"); 432 assert_eq!(note.kind(), 1); 433 } 434 } 435 }