Ndb.swift (39268B)
1 // 2 // Ndb.swift 3 // damus 4 // 5 // Created by William Casarin on 2023-08-25. 6 // 7 8 import Foundation 9 import OSLog 10 11 fileprivate let APPLICATION_GROUP_IDENTIFIER = "group.com.damus" 12 13 enum NdbSearchOrder { 14 case oldest_first 15 case newest_first 16 } 17 18 19 enum DatabaseError: Error { 20 case failed_open 21 22 var errorDescription: String? { 23 switch self { 24 case .failed_open: 25 return "Failed to open database" 26 } 27 } 28 } 29 30 class Ndb { 31 var ndb: ndb_t 32 let path: String? 33 let owns_db: Bool 34 var generation: Int 35 private var closed: Bool 36 private var callbackHandler: Ndb.CallbackHandler 37 38 var is_closed: Bool { 39 self.closed || self.ndb.ndb == nil 40 } 41 42 static func safemode() -> Ndb? { 43 guard let path = db_path ?? old_db_path else { return nil } 44 45 // delete the database and start fresh 46 if Self.db_files_exist(path: path) { 47 let file_manager = FileManager.default 48 for db_file in db_files { 49 try? file_manager.removeItem(atPath: "\(path)/\(db_file)") 50 } 51 } 52 53 guard let ndb = Ndb(path: path) else { 54 return nil 55 } 56 57 return ndb 58 } 59 60 // NostrDB used to be stored on the app container's document directory 61 static private var old_db_path: String? { 62 guard let path = FileManager.default.urls(for: .documentDirectory, in: .userDomainMask).first?.absoluteString else { 63 return nil 64 } 65 return remove_file_prefix(path) 66 } 67 68 static var db_path: String? { 69 // Use the `group.com.damus` container, so that it can be accessible from other targets 70 // e.g. The notification service extension needs to access Ndb data, which is done through this shared file container. 71 guard let containerURL = FileManager.default.containerURL(forSecurityApplicationGroupIdentifier: APPLICATION_GROUP_IDENTIFIER) else { 72 return nil 73 } 74 return remove_file_prefix(containerURL.absoluteString) 75 } 76 77 static private var db_files: [String] = ["data.mdb", "lock.mdb"] 78 79 static var empty: Ndb { 80 print("txn: NOSTRDB EMPTY") 81 return Ndb(ndb: ndb_t(ndb: nil)) 82 } 83 84 static func open(path: String? = nil, owns_db_file: Bool = true, callbackHandler: Ndb.CallbackHandler) -> ndb_t? { 85 var ndb_p: OpaquePointer? = nil 86 87 let ingest_threads: Int32 = 4 88 var mapsize: Int = 1024 * 1024 * 1024 * 32 89 90 if path == nil && owns_db_file { 91 // `nil` path indicates the default path will be used. 92 // The default path changed over time, so migrate the database to the new location if needed 93 do { 94 try Self.migrate_db_location_if_needed() 95 } 96 catch { 97 // If it fails to migrate, the app can still run without serious consequences. Log instead. 98 Log.error("Error migrating NostrDB to new file container", for: .storage) 99 } 100 } 101 102 guard let db_path = Self.db_path, 103 owns_db_file || Self.db_files_exist(path: db_path) else { 104 return nil // If the caller claims to not own the DB file, and the DB files do not exist, then we should not initialize Ndb 105 } 106 107 guard let path = path.map(remove_file_prefix) ?? Ndb.db_path else { 108 return nil 109 } 110 111 let ok = path.withCString { testdir in 112 var ok = false 113 while !ok && mapsize > 1024 * 1024 * 700 { 114 var cfg = ndb_config(flags: 0, ingester_threads: ingest_threads, mapsize: mapsize, filter_context: nil, ingest_filter: nil, sub_cb_ctx: nil, sub_cb: nil) 115 116 // Here we hook up the global callback function for subscription callbacks. 117 // We do an "unretained" pass here because the lifetime of the callback handler is larger than the lifetime of the nostrdb monitor in the C code. 118 // The NostrDB monitor that makes the callbacks should in theory _never_ outlive the callback handler. 119 // 120 // This means that: 121 // - for as long as nostrdb is running, its parent Ndb instance will be alive, keeping the callback handler alive. 122 // - when the Ndb instance is deinitialized — and the callback handler comes down with it — the `deinit` function will destroy the nostrdb monitor, preventing it from accessing freed memory. 123 // 124 // Therefore, we do not need to increase reference count to callbackHandler. The tightly coupled lifetimes will ensure that it is always alive when the ndb_monitor is alive. 125 let ctx: UnsafeMutableRawPointer = Unmanaged.passUnretained(callbackHandler).toOpaque() 126 ndb_config_set_subscription_callback(&cfg, subscription_callback, ctx) 127 128 let res = ndb_init(&ndb_p, testdir, &cfg); 129 ok = res != 0; 130 if !ok { 131 Log.error("ndb_init failed: %d, reducing mapsize from %d to %d", for: .storage, res, mapsize, mapsize / 2) 132 mapsize /= 2 133 } 134 } 135 return ok 136 } 137 138 if !ok { 139 return nil 140 } 141 142 let ndb_instance = ndb_t(ndb: ndb_p) 143 Task { await callbackHandler.set(ndb: ndb_instance) } 144 return ndb_instance 145 } 146 147 init?(path: String? = nil, owns_db_file: Bool = true) { 148 let callbackHandler = Ndb.CallbackHandler() 149 guard let db = Self.open(path: path, owns_db_file: owns_db_file, callbackHandler: callbackHandler) else { 150 return nil 151 } 152 153 self.generation = 0 154 self.path = path 155 self.owns_db = owns_db_file 156 self.ndb = db 157 self.closed = false 158 self.callbackHandler = callbackHandler 159 } 160 161 private static func migrate_db_location_if_needed() throws { 162 guard let old_db_path, let db_path else { 163 throw Errors.cannot_find_db_path 164 } 165 166 let file_manager = FileManager.default 167 168 let old_db_files_exist = Self.db_files_exist(path: old_db_path) 169 let new_db_files_exist = Self.db_files_exist(path: db_path) 170 171 // Migration rules: 172 // 1. If DB files exist in the old path but not the new one, move files to the new path 173 // 2. If files do not exist anywhere, do nothing (let new DB be initialized) 174 // 3. If files exist in the new path, but not the old one, nothing needs to be done 175 // 4. If files exist on both, do nothing. 176 // Scenario 4 likely means that user has downgraded and re-upgraded. 177 // Although it might make sense to get the most recent DB, it might lead to data loss. 178 // If we leave both intact, it makes it easier to fix later, as no data loss would occur. 179 if old_db_files_exist && !new_db_files_exist { 180 Log.info("Migrating NostrDB to new file location…", for: .storage) 181 do { 182 try db_files.forEach { db_file in 183 let old_path = "\(old_db_path)/\(db_file)" 184 let new_path = "\(db_path)/\(db_file)" 185 try file_manager.moveItem(atPath: old_path, toPath: new_path) 186 } 187 Log.info("NostrDB files successfully migrated to the new location", for: .storage) 188 } catch { 189 throw Errors.db_file_migration_error 190 } 191 } 192 } 193 194 private static func db_files_exist(path: String) -> Bool { 195 return db_files.allSatisfy { FileManager.default.fileExists(atPath: "\(path)/\($0)") } 196 } 197 198 init(ndb: ndb_t) { 199 self.ndb = ndb 200 self.generation = 0 201 self.path = nil 202 self.owns_db = true 203 self.closed = false 204 // This simple initialization will cause subscriptions not to be ever called. Probably fine because this initializer is used only for empty example ndb instances. 205 self.callbackHandler = Ndb.CallbackHandler() 206 } 207 208 func close() { 209 guard !self.is_closed else { return } 210 self.closed = true 211 print("txn: CLOSING NOSTRDB") 212 ndb_destroy(self.ndb.ndb) 213 self.generation += 1 214 print("txn: NOSTRDB CLOSED") 215 } 216 217 func reopen() -> Bool { 218 guard self.is_closed, 219 let db = Self.open(path: self.path, owns_db_file: self.owns_db, callbackHandler: self.callbackHandler) else { 220 return false 221 } 222 223 print("txn: NOSTRDB REOPENED (gen \(generation))") 224 225 self.closed = false 226 self.ndb = db 227 return true 228 } 229 230 func lookup_blocks_by_key_with_txn(_ key: NoteKey, txn: RawNdbTxnAccessible) -> NdbBlockGroup.BlocksMetadata? { 231 guard let blocks = ndb_get_blocks_by_key(self.ndb.ndb, &txn.txn, key) else { 232 return nil 233 } 234 235 return NdbBlockGroup.BlocksMetadata(ptr: blocks) 236 } 237 238 func lookup_blocks_by_key(_ key: NoteKey) -> SafeNdbTxn<NdbBlockGroup.BlocksMetadata?>? { 239 SafeNdbTxn<NdbBlockGroup.BlocksMetadata?>.new(on: self) { txn in 240 lookup_blocks_by_key_with_txn(key, txn: txn) 241 } 242 } 243 244 func lookup_note_by_key_with_txn<Y>(_ key: NoteKey, txn: NdbTxn<Y>) -> NdbNote? { 245 var size: Int = 0 246 guard let note_p = ndb_get_note_by_key(&txn.txn, key, &size) else { 247 return nil 248 } 249 let ptr = ndb_note_ptr(ptr: note_p) 250 return NdbNote(note: ptr, size: size, owned: false, key: key) 251 } 252 253 func text_search(query: String, limit: Int = 128, order: NdbSearchOrder = .newest_first) -> [NoteKey] { 254 guard let txn = NdbTxn(ndb: self) else { return [] } 255 var results = ndb_text_search_results() 256 let res = query.withCString { q in 257 let order = order == .newest_first ? NDB_ORDER_DESCENDING : NDB_ORDER_ASCENDING 258 var config = ndb_text_search_config(order: order, limit: Int32(limit)) 259 return ndb_text_search(&txn.txn, q, &results, &config) 260 } 261 262 if res == 0 { 263 return [] 264 } 265 266 var note_ids = [NoteKey]() 267 for i in 0..<results.num_results { 268 // seriously wtf 269 switch i { 270 case 0: note_ids.append(results.results.0.key.note_id) 271 case 1: note_ids.append(results.results.1.key.note_id) 272 case 2: note_ids.append(results.results.2.key.note_id) 273 case 3: note_ids.append(results.results.3.key.note_id) 274 case 4: note_ids.append(results.results.4.key.note_id) 275 case 5: note_ids.append(results.results.5.key.note_id) 276 case 6: note_ids.append(results.results.6.key.note_id) 277 case 7: note_ids.append(results.results.7.key.note_id) 278 case 8: note_ids.append(results.results.8.key.note_id) 279 case 9: note_ids.append(results.results.9.key.note_id) 280 case 10: note_ids.append(results.results.10.key.note_id) 281 case 11: note_ids.append(results.results.11.key.note_id) 282 case 12: note_ids.append(results.results.12.key.note_id) 283 case 13: note_ids.append(results.results.13.key.note_id) 284 case 14: note_ids.append(results.results.14.key.note_id) 285 case 15: note_ids.append(results.results.15.key.note_id) 286 case 16: note_ids.append(results.results.16.key.note_id) 287 case 17: note_ids.append(results.results.17.key.note_id) 288 case 18: note_ids.append(results.results.18.key.note_id) 289 case 19: note_ids.append(results.results.19.key.note_id) 290 case 20: note_ids.append(results.results.20.key.note_id) 291 case 21: note_ids.append(results.results.21.key.note_id) 292 case 22: note_ids.append(results.results.22.key.note_id) 293 case 23: note_ids.append(results.results.23.key.note_id) 294 case 24: note_ids.append(results.results.24.key.note_id) 295 case 25: note_ids.append(results.results.25.key.note_id) 296 case 26: note_ids.append(results.results.26.key.note_id) 297 case 27: note_ids.append(results.results.27.key.note_id) 298 case 28: note_ids.append(results.results.28.key.note_id) 299 case 29: note_ids.append(results.results.29.key.note_id) 300 case 30: note_ids.append(results.results.30.key.note_id) 301 case 31: note_ids.append(results.results.31.key.note_id) 302 case 32: note_ids.append(results.results.32.key.note_id) 303 case 33: note_ids.append(results.results.33.key.note_id) 304 case 34: note_ids.append(results.results.34.key.note_id) 305 case 35: note_ids.append(results.results.35.key.note_id) 306 case 36: note_ids.append(results.results.36.key.note_id) 307 case 37: note_ids.append(results.results.37.key.note_id) 308 case 38: note_ids.append(results.results.38.key.note_id) 309 case 39: note_ids.append(results.results.39.key.note_id) 310 case 40: note_ids.append(results.results.40.key.note_id) 311 case 41: note_ids.append(results.results.41.key.note_id) 312 case 42: note_ids.append(results.results.42.key.note_id) 313 case 43: note_ids.append(results.results.43.key.note_id) 314 case 44: note_ids.append(results.results.44.key.note_id) 315 case 45: note_ids.append(results.results.45.key.note_id) 316 case 46: note_ids.append(results.results.46.key.note_id) 317 case 47: note_ids.append(results.results.47.key.note_id) 318 case 48: note_ids.append(results.results.48.key.note_id) 319 case 49: note_ids.append(results.results.49.key.note_id) 320 case 50: note_ids.append(results.results.50.key.note_id) 321 case 51: note_ids.append(results.results.51.key.note_id) 322 case 52: note_ids.append(results.results.52.key.note_id) 323 case 53: note_ids.append(results.results.53.key.note_id) 324 case 54: note_ids.append(results.results.54.key.note_id) 325 case 55: note_ids.append(results.results.55.key.note_id) 326 case 56: note_ids.append(results.results.56.key.note_id) 327 case 57: note_ids.append(results.results.57.key.note_id) 328 case 58: note_ids.append(results.results.58.key.note_id) 329 case 59: note_ids.append(results.results.59.key.note_id) 330 case 60: note_ids.append(results.results.60.key.note_id) 331 case 61: note_ids.append(results.results.61.key.note_id) 332 case 62: note_ids.append(results.results.62.key.note_id) 333 case 63: note_ids.append(results.results.63.key.note_id) 334 case 64: note_ids.append(results.results.64.key.note_id) 335 case 65: note_ids.append(results.results.65.key.note_id) 336 case 66: note_ids.append(results.results.66.key.note_id) 337 case 67: note_ids.append(results.results.67.key.note_id) 338 case 68: note_ids.append(results.results.68.key.note_id) 339 case 69: note_ids.append(results.results.69.key.note_id) 340 case 70: note_ids.append(results.results.70.key.note_id) 341 case 71: note_ids.append(results.results.71.key.note_id) 342 case 72: note_ids.append(results.results.72.key.note_id) 343 case 73: note_ids.append(results.results.73.key.note_id) 344 case 74: note_ids.append(results.results.74.key.note_id) 345 case 75: note_ids.append(results.results.75.key.note_id) 346 case 76: note_ids.append(results.results.76.key.note_id) 347 case 77: note_ids.append(results.results.77.key.note_id) 348 case 78: note_ids.append(results.results.78.key.note_id) 349 case 79: note_ids.append(results.results.79.key.note_id) 350 case 80: note_ids.append(results.results.80.key.note_id) 351 case 81: note_ids.append(results.results.81.key.note_id) 352 case 82: note_ids.append(results.results.82.key.note_id) 353 case 83: note_ids.append(results.results.83.key.note_id) 354 case 84: note_ids.append(results.results.84.key.note_id) 355 case 85: note_ids.append(results.results.85.key.note_id) 356 case 86: note_ids.append(results.results.86.key.note_id) 357 case 87: note_ids.append(results.results.87.key.note_id) 358 case 88: note_ids.append(results.results.88.key.note_id) 359 case 89: note_ids.append(results.results.89.key.note_id) 360 case 90: note_ids.append(results.results.90.key.note_id) 361 case 91: note_ids.append(results.results.91.key.note_id) 362 case 92: note_ids.append(results.results.92.key.note_id) 363 case 93: note_ids.append(results.results.93.key.note_id) 364 case 94: note_ids.append(results.results.94.key.note_id) 365 case 95: note_ids.append(results.results.95.key.note_id) 366 case 96: note_ids.append(results.results.96.key.note_id) 367 case 97: note_ids.append(results.results.97.key.note_id) 368 case 98: note_ids.append(results.results.98.key.note_id) 369 case 99: note_ids.append(results.results.99.key.note_id) 370 case 100: note_ids.append(results.results.100.key.note_id) 371 case 101: note_ids.append(results.results.101.key.note_id) 372 case 102: note_ids.append(results.results.102.key.note_id) 373 case 103: note_ids.append(results.results.103.key.note_id) 374 case 104: note_ids.append(results.results.104.key.note_id) 375 case 105: note_ids.append(results.results.105.key.note_id) 376 case 106: note_ids.append(results.results.106.key.note_id) 377 case 107: note_ids.append(results.results.107.key.note_id) 378 case 108: note_ids.append(results.results.108.key.note_id) 379 case 109: note_ids.append(results.results.109.key.note_id) 380 case 110: note_ids.append(results.results.110.key.note_id) 381 case 111: note_ids.append(results.results.111.key.note_id) 382 case 112: note_ids.append(results.results.112.key.note_id) 383 case 113: note_ids.append(results.results.113.key.note_id) 384 case 114: note_ids.append(results.results.114.key.note_id) 385 case 115: note_ids.append(results.results.115.key.note_id) 386 case 116: note_ids.append(results.results.116.key.note_id) 387 case 117: note_ids.append(results.results.117.key.note_id) 388 case 118: note_ids.append(results.results.118.key.note_id) 389 case 119: note_ids.append(results.results.119.key.note_id) 390 case 120: note_ids.append(results.results.120.key.note_id) 391 case 121: note_ids.append(results.results.121.key.note_id) 392 case 122: note_ids.append(results.results.122.key.note_id) 393 case 123: note_ids.append(results.results.123.key.note_id) 394 case 124: note_ids.append(results.results.124.key.note_id) 395 case 125: note_ids.append(results.results.125.key.note_id) 396 case 126: note_ids.append(results.results.126.key.note_id) 397 case 127: note_ids.append(results.results.127.key.note_id) 398 default: 399 break 400 } 401 } 402 403 return note_ids 404 } 405 406 func lookup_note_by_key(_ key: NoteKey) -> NdbTxn<NdbNote?>? { 407 return NdbTxn(ndb: self) { txn in 408 lookup_note_by_key_with_txn(key, txn: txn) 409 } 410 } 411 412 private func lookup_profile_by_key_inner<Y>(_ key: ProfileKey, txn: NdbTxn<Y>) -> ProfileRecord? { 413 var size: Int = 0 414 guard let profile_p = ndb_get_profile_by_key(&txn.txn, key, &size) else { 415 return nil 416 } 417 418 return profile_flatbuf_to_record(ptr: profile_p, size: size, key: key) 419 } 420 421 private func profile_flatbuf_to_record(ptr: UnsafeMutableRawPointer, size: Int, key: UInt64) -> ProfileRecord? { 422 do { 423 var buf = ByteBuffer(assumingMemoryBound: ptr, capacity: size) 424 let rec: NdbProfileRecord = try getDebugCheckedRoot(byteBuffer: &buf) 425 return ProfileRecord(data: rec, key: key) 426 } catch { 427 // Handle error appropriately 428 print("UNUSUAL: \(error)") 429 return nil 430 } 431 } 432 433 private func lookup_note_with_txn_inner<Y>(id: NoteId, txn: NdbTxn<Y>) -> NdbNote? { 434 return id.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NdbNote? in 435 var key: UInt64 = 0 436 var size: Int = 0 437 guard let baseAddress = ptr.baseAddress, 438 let note_p = ndb_get_note_by_id(&txn.txn, baseAddress, &size, &key) else { 439 return nil 440 } 441 let ptr = ndb_note_ptr(ptr: note_p) 442 return NdbNote(note: ptr, size: size, owned: false, key: key) 443 } 444 } 445 446 private func lookup_profile_with_txn_inner<Y>(pubkey: Pubkey, txn: NdbTxn<Y>) -> ProfileRecord? { 447 return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> ProfileRecord? in 448 var size: Int = 0 449 var key: UInt64 = 0 450 451 guard let baseAddress = ptr.baseAddress, 452 let profile_p = ndb_get_profile_by_pubkey(&txn.txn, baseAddress, &size, &key) 453 else { 454 return nil 455 } 456 457 return profile_flatbuf_to_record(ptr: profile_p, size: size, key: key) 458 } 459 } 460 461 func lookup_profile_by_key_with_txn<Y>(key: ProfileKey, txn: NdbTxn<Y>) -> ProfileRecord? { 462 lookup_profile_by_key_inner(key, txn: txn) 463 } 464 465 func lookup_profile_by_key(key: ProfileKey) -> NdbTxn<ProfileRecord?>? { 466 return NdbTxn(ndb: self) { txn in 467 lookup_profile_by_key_inner(key, txn: txn) 468 } 469 } 470 471 func lookup_note_with_txn<Y>(id: NoteId, txn: NdbTxn<Y>) -> NdbNote? { 472 lookup_note_with_txn_inner(id: id, txn: txn) 473 } 474 475 func lookup_profile_key(_ pubkey: Pubkey) -> ProfileKey? { 476 guard let txn = NdbTxn(ndb: self, with: { txn in 477 lookup_profile_key_with_txn(pubkey, txn: txn) 478 }) else { 479 return nil 480 } 481 482 return txn.value 483 } 484 485 func lookup_profile_key_with_txn<Y>(_ pubkey: Pubkey, txn: NdbTxn<Y>) -> ProfileKey? { 486 return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NoteKey? in 487 guard let p = ptr.baseAddress else { return nil } 488 let r = ndb_get_profilekey_by_pubkey(&txn.txn, p) 489 if r == 0 { 490 return nil 491 } 492 return r 493 } 494 } 495 496 func lookup_note_key_with_txn(_ id: NoteId, txn: some RawNdbTxnAccessible) -> NoteKey? { 497 guard !closed else { return nil } 498 return id.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NoteKey? in 499 guard let p = ptr.baseAddress else { 500 return nil 501 } 502 let r = ndb_get_notekey_by_id(&txn.txn, p) 503 if r == 0 { 504 return nil 505 } 506 return r 507 } 508 } 509 510 func lookup_note_key(_ id: NoteId) -> NoteKey? { 511 guard let txn = NdbTxn(ndb: self, with: { txn in 512 lookup_note_key_with_txn(id, txn: txn) 513 }) else { 514 return nil 515 } 516 517 return txn.value 518 } 519 520 func lookup_note(_ id: NoteId, txn_name: String? = nil) -> NdbTxn<NdbNote?>? { 521 NdbTxn(ndb: self, name: txn_name) { txn in 522 lookup_note_with_txn_inner(id: id, txn: txn) 523 } 524 } 525 526 func lookup_profile(_ pubkey: Pubkey, txn_name: String? = nil) -> NdbTxn<ProfileRecord?>? { 527 NdbTxn(ndb: self, name: txn_name) { txn in 528 lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn) 529 } 530 } 531 532 func lookup_profile_with_txn<Y>(_ pubkey: Pubkey, txn: NdbTxn<Y>) -> ProfileRecord? { 533 lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn) 534 } 535 536 func process_client_event(_ str: String) -> Bool { 537 guard !self.is_closed else { return false } 538 return str.withCString { cstr in 539 return ndb_process_client_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0 540 } 541 } 542 543 func write_profile_last_fetched(pubkey: Pubkey, fetched_at: UInt64) { 544 guard !closed else { return } 545 let _ = pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> () in 546 guard let p = ptr.baseAddress else { return } 547 ndb_write_last_profile_fetch(ndb.ndb, p, fetched_at) 548 } 549 } 550 551 func read_profile_last_fetched<Y>(txn: NdbTxn<Y>, pubkey: Pubkey) -> UInt64? { 552 guard !closed else { return nil } 553 return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> UInt64? in 554 guard let p = ptr.baseAddress else { return nil } 555 let res = ndb_read_last_profile_fetch(&txn.txn, p) 556 if res == 0 { 557 return nil 558 } 559 560 return res 561 } 562 } 563 564 func process_event(_ str: String) -> Bool { 565 guard !is_closed else { return false } 566 return str.withCString { cstr in 567 return ndb_process_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0 568 } 569 } 570 571 func process_events(_ str: String) -> Bool { 572 guard !is_closed else { return false } 573 return str.withCString { cstr in 574 return ndb_process_events(ndb.ndb, cstr, str.utf8.count) != 0 575 } 576 } 577 578 func search_profile<Y>(_ search: String, limit: Int, txn: NdbTxn<Y>) -> [Pubkey] { 579 var pks = Array<Pubkey>() 580 581 return search.withCString { q in 582 var s = ndb_search() 583 guard ndb_search_profile(&txn.txn, &s, q) != 0 else { 584 return pks 585 } 586 587 defer { ndb_search_profile_end(&s) } 588 pks.append(Pubkey(Data(bytes: &s.key.pointee.id.0, count: 32))) 589 590 var n = limit 591 while n > 0 { 592 guard ndb_search_profile_next(&s) != 0 else { 593 return pks 594 } 595 pks.append(Pubkey(Data(bytes: &s.key.pointee.id.0, count: 32))) 596 597 n -= 1 598 } 599 600 return pks 601 } 602 } 603 604 // MARK: NdbFilter queries and subscriptions 605 606 /// Safe wrapper around the `ndb_query` C function 607 /// - Parameters: 608 /// - txn: Database transaction 609 /// - filters: Array of NdbFilter objects 610 /// - maxResults: Maximum number of results to return 611 /// - Returns: Array of note keys matching the filters 612 /// - Throws: NdbStreamError if the query fails 613 func query<Y>(with txn: NdbTxn<Y>, filters: [NdbFilter], maxResults: Int) throws(NdbStreamError) -> [NoteKey] { 614 let filtersPointer = UnsafeMutablePointer<ndb_filter>.allocate(capacity: filters.count) 615 defer { filtersPointer.deallocate() } 616 617 for (index, ndbFilter) in filters.enumerated() { 618 filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter 619 } 620 621 let count = UnsafeMutablePointer<Int32>.allocate(capacity: 1) 622 defer { count.deallocate() } 623 624 let results = UnsafeMutablePointer<ndb_query_result>.allocate(capacity: maxResults) 625 defer { results.deallocate() } 626 627 guard ndb_query(&txn.txn, filtersPointer, Int32(filters.count), results, Int32(maxResults), count) == 1 else { 628 throw NdbStreamError.initialQueryFailed 629 } 630 631 var noteIds: [NoteKey] = [] 632 for i in 0..<count.pointee { 633 noteIds.append(results.advanced(by: Int(i)).pointee.note_id) 634 } 635 636 return noteIds 637 } 638 639 /// Safe wrapper around `ndb_subscribe` that handles all pointer management 640 /// - Parameters: 641 /// - filters: Array of NdbFilter objects 642 /// - Returns: AsyncStream of StreamItem events for new matches only 643 private func ndbSubscribe(filters: [NdbFilter]) -> AsyncStream<StreamItem> { 644 return AsyncStream<StreamItem> { continuation in 645 // Allocate filters pointer - will be deallocated when subscription ends 646 // Cannot use `defer` to deallocate `filtersPointer` because it needs to remain valid for the lifetime of the subscription, which extends beyond this block's scope. 647 let filtersPointer = UnsafeMutablePointer<ndb_filter>.allocate(capacity: filters.count) 648 for (index, ndbFilter) in filters.enumerated() { 649 filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter 650 } 651 652 var streaming = true 653 var subid: UInt64 = 0 654 var terminationStarted = false 655 656 // Set up termination handler 657 continuation.onTermination = { @Sendable _ in 658 guard !terminationStarted else { return } // Avoid race conditions between two termination closures 659 terminationStarted = true 660 Log.debug("ndb_wait: stream: Terminated early", for: .ndb) 661 streaming = false 662 // Clean up resources on early termination 663 if subid != 0 { 664 ndb_unsubscribe(self.ndb.ndb, subid) 665 Task { await self.unsetCallback(subscriptionId: subid) } 666 } 667 filtersPointer.deallocate() 668 } 669 670 if !streaming { 671 return 672 } 673 674 // Set up subscription 675 subid = ndb_subscribe(self.ndb.ndb, filtersPointer, Int32(filters.count)) 676 677 // Set the subscription callback 678 Task { 679 await self.setCallback(for: subid, callback: { noteKey in 680 continuation.yield(.event(noteKey)) 681 }) 682 } 683 684 // Update termination handler to include subscription cleanup 685 continuation.onTermination = { @Sendable _ in 686 guard !terminationStarted else { return } // Avoid race conditions between two termination closures 687 terminationStarted = true 688 Log.debug("ndb_wait: stream: Terminated early", for: .ndb) 689 streaming = false 690 ndb_unsubscribe(self.ndb.ndb, subid) 691 Task { await self.unsetCallback(subscriptionId: subid) } 692 filtersPointer.deallocate() 693 } 694 } 695 } 696 697 func subscribe(filters: [NdbFilter], maxSimultaneousResults: Int = 1000) throws(NdbStreamError) -> AsyncStream<StreamItem> { 698 // Fetch initial results 699 guard let txn = NdbTxn(ndb: self) else { throw .cannotOpenTransaction } 700 701 // Use our safe wrapper instead of direct C function call 702 let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults) 703 704 // Create a subscription for new events 705 let newEventsStream = ndbSubscribe(filters: filters) 706 707 // Create a cascading stream that combines initial results with new events 708 return AsyncStream<StreamItem> { continuation in 709 // Stream all results already present in the database 710 for noteId in noteIds { 711 continuation.yield(.event(noteId)) 712 } 713 714 // Indicate this is the end of the results currently present in the database 715 continuation.yield(.eose) 716 717 // Create a task to forward events from the subscription stream 718 let forwardingTask = Task { 719 for await item in newEventsStream { 720 continuation.yield(item) 721 } 722 continuation.finish() 723 } 724 725 // Handle termination by canceling the forwarding task 726 continuation.onTermination = { @Sendable _ in 727 forwardingTask.cancel() 728 } 729 } 730 } 731 732 private func waitWithoutTimeout(for noteId: NoteId) async throws(NdbLookupError) -> NdbTxn<NdbNote>? { 733 do { 734 for try await item in try self.subscribe(filters: [NostrFilter(ids: [noteId])]) { 735 switch item { 736 case .eose: 737 continue 738 case .event(let noteKey): 739 guard let txn = NdbTxn(ndb: self) else { throw NdbLookupError.cannotOpenTransaction } 740 guard let note = self.lookup_note_by_key_with_txn(noteKey, txn: txn) else { throw NdbLookupError.internalInconsistency } 741 if note.id == noteId { 742 Log.debug("ndb wait: %d has matching id %s. Returning transaction", for: .ndb, noteKey, noteId.hex()) 743 return NdbTxn<NdbNote>.pure(ndb: self, val: note) 744 } 745 } 746 } 747 } 748 catch { 749 if let error = error as? NdbStreamError { throw NdbLookupError.streamError(error) } 750 else if let error = error as? NdbLookupError { throw error } 751 else { throw .internalInconsistency } 752 } 753 return nil 754 } 755 756 func waitFor(noteId: NoteId, timeout: TimeInterval = 10) async throws(NdbLookupError) -> NdbTxn<NdbNote>? { 757 do { 758 return try await withCheckedThrowingContinuation({ continuation in 759 var done = false 760 let waitTask = Task { 761 do { 762 Log.debug("ndb_wait: Waiting for %s", for: .ndb, noteId.hex()) 763 let result = try await self.waitWithoutTimeout(for: noteId) 764 if !done { 765 Log.debug("ndb_wait: Found %s", for: .ndb, noteId.hex()) 766 continuation.resume(returning: result) 767 done = true 768 } 769 } 770 catch { 771 if Task.isCancelled { 772 return // the timeout task will handle throwing the timeout error 773 } 774 if !done { 775 Log.debug("ndb_wait: Error on %s: %s", for: .ndb, noteId.hex(), error.localizedDescription) 776 continuation.resume(throwing: error) 777 done = true 778 } 779 } 780 } 781 782 let timeoutTask = Task { 783 try await Task.sleep(for: .seconds(Int(timeout))) 784 if !done { 785 Log.debug("ndb_wait: Timeout on %s. Cancelling wait task…", for: .ndb, noteId.hex()) 786 done = true 787 print("ndb_wait: throwing timeout error") 788 continuation.resume(throwing: NdbLookupError.timeout) 789 } 790 waitTask.cancel() 791 } 792 }) 793 } 794 catch { 795 if let error = error as? NdbLookupError { throw error } 796 else { throw .internalInconsistency } 797 } 798 } 799 800 // MARK: Internal ndb callback interfaces 801 802 internal func setCallback(for subscriptionId: UInt64, callback: @escaping (NoteKey) -> Void) async { 803 await self.callbackHandler.set(callback: callback, for: subscriptionId) 804 } 805 806 internal func unsetCallback(subscriptionId: UInt64) async { 807 await self.callbackHandler.unset(subid: subscriptionId) 808 } 809 810 // MARK: Helpers 811 812 enum Errors: Error { 813 case cannot_find_db_path 814 case db_file_migration_error 815 } 816 817 // MARK: Deinitialization 818 819 deinit { 820 print("txn: Ndb de-init") 821 self.close() 822 } 823 } 824 825 826 // MARK: - Extensions and helper structures and functions 827 828 extension Ndb { 829 /// A class that is used to handle callbacks from nostrdb 830 /// 831 /// This is a separate class from `Ndb` because it simplifies the initialization logic 832 actor CallbackHandler { 833 /// Holds the ndb instance in the C codebase. Should be shared with `Ndb` 834 var ndb: ndb_t? = nil 835 /// A map from nostrdb subscription ids to callbacks 836 var subscriptionCallbackMap: [UInt64: (NoteKey) -> Void] = [:] 837 838 func set(callback: @escaping (NoteKey) -> Void, for subid: UInt64) { 839 subscriptionCallbackMap[subid] = callback 840 } 841 842 func unset(subid: UInt64) { 843 subscriptionCallbackMap[subid] = nil 844 } 845 846 func set(ndb: ndb_t?) { 847 self.ndb = ndb 848 } 849 850 /// Handles callbacks from nostrdb subscriptions, and routes them to the correct callback 851 func handleSubscriptionCallback(subId: UInt64, maxCapacity: Int32 = 1000) { 852 if let callback = subscriptionCallbackMap[subId] { 853 let result = UnsafeMutablePointer<UInt64>.allocate(capacity: Int(maxCapacity)) 854 defer { result.deallocate() } // Ensure we deallocate memory before leaving the function to avoid memory leaks 855 if let ndb { 856 let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity) 857 for i in 0..<numberOfNotes { 858 callback(result.advanced(by: Int(i)).pointee) 859 } 860 } 861 } 862 } 863 } 864 865 /// An item that comes out of a subscription stream 866 enum StreamItem { 867 /// End of currently stored events 868 case eose 869 /// An event in NostrDB available at the given note key 870 case event(NoteKey) 871 } 872 873 /// An error that may happen during nostrdb streaming 874 enum NdbStreamError: Error { 875 case cannotOpenTransaction 876 case cannotConvertFilter(any Error) 877 case initialQueryFailed 878 case timeout 879 } 880 881 /// An error that may happen when looking something up 882 enum NdbLookupError: Error { 883 case cannotOpenTransaction 884 case streamError(NdbStreamError) 885 case internalInconsistency 886 case timeout 887 } 888 } 889 890 /// This callback "trampoline" function will be called when new notes arrive for NostrDB subscriptions. 891 /// 892 /// This is needed as a separate global function in order to allow us to pass it to the C code as a callback (We can't pass native Swift fuctions directly as callbacks). 893 /// 894 /// - Parameters: 895 /// - ctx: A pointer to a context object setup during initialization. This allows this function to "find" the correct place to call. MUST be a pointer to a `CallbackHandler`, otherwise this will trigger a crash 896 /// - subid: The NostrDB subscription ID, which identifies the subscription that is being called back 897 @_cdecl("subscription_callback") 898 public func subscription_callback(ctx: UnsafeMutableRawPointer?, subid: UInt64) { 899 guard let ctx else { return } 900 let handler = Unmanaged<Ndb.CallbackHandler>.fromOpaque(ctx).takeUnretainedValue() 901 Task { 902 await handler.handleSubscriptionCallback(subId: subid) 903 } 904 } 905 906 #if DEBUG 907 func getDebugCheckedRoot<T: FlatBufferObject>(byteBuffer: inout ByteBuffer) throws -> T { 908 return getRoot(byteBuffer: &byteBuffer) 909 } 910 #else 911 func getDebugCheckedRoot<T: FlatBufferObject>(byteBuffer: inout ByteBuffer) throws -> T { 912 return getRoot(byteBuffer: &byteBuffer) 913 } 914 #endif 915 916 func remove_file_prefix(_ str: String) -> String { 917 return str.replacingOccurrences(of: "file://", with: "") 918 } 919