damus

nostr ios client
git clone git://jb55.com/damus
Log | Files | Refs | README | LICENSE

Ndb.swift (39268B)


      1 //
      2 //  Ndb.swift
      3 //  damus
      4 //
      5 //  Created by William Casarin on 2023-08-25.
      6 //
      7 
      8 import Foundation
      9 import OSLog
     10 
     11 fileprivate let APPLICATION_GROUP_IDENTIFIER = "group.com.damus"
     12 
     13 enum NdbSearchOrder {
     14     case oldest_first
     15     case newest_first
     16 }
     17 
     18 
     19 enum DatabaseError: Error {
     20     case failed_open
     21 
     22     var errorDescription: String? {
     23         switch self {
     24         case .failed_open:
     25             return "Failed to open database"
     26         }
     27     }
     28 }
     29 
     30 class Ndb {
     31     var ndb: ndb_t
     32     let path: String?
     33     let owns_db: Bool
     34     var generation: Int
     35     private var closed: Bool
     36     private var callbackHandler: Ndb.CallbackHandler
     37 
     38     var is_closed: Bool {
     39         self.closed || self.ndb.ndb == nil
     40     }
     41     
     42     static func safemode() -> Ndb? {
     43         guard let path = db_path ?? old_db_path else { return nil }
     44 
     45         // delete the database and start fresh
     46         if Self.db_files_exist(path: path) {
     47             let file_manager = FileManager.default
     48             for db_file in db_files {
     49                 try? file_manager.removeItem(atPath: "\(path)/\(db_file)")
     50             }
     51         }
     52 
     53         guard let ndb = Ndb(path: path) else {
     54             return nil
     55         }
     56 
     57         return ndb
     58     }
     59 
     60     // NostrDB used to be stored on the app container's document directory
     61     static private var old_db_path: String? {
     62         guard let path = FileManager.default.urls(for: .documentDirectory, in: .userDomainMask).first?.absoluteString else {
     63             return nil
     64         }
     65         return remove_file_prefix(path)
     66     }
     67 
     68     static var db_path: String? {
     69         // Use the `group.com.damus` container, so that it can be accessible from other targets
     70         // e.g. The notification service extension needs to access Ndb data, which is done through this shared file container.
     71         guard let containerURL = FileManager.default.containerURL(forSecurityApplicationGroupIdentifier: APPLICATION_GROUP_IDENTIFIER) else {
     72             return nil
     73         }
     74         return remove_file_prefix(containerURL.absoluteString)
     75     }
     76     
     77     static private var db_files: [String] = ["data.mdb", "lock.mdb"]
     78 
     79     static var empty: Ndb {
     80         print("txn: NOSTRDB EMPTY")
     81         return Ndb(ndb: ndb_t(ndb: nil))
     82     }
     83     
     84     static func open(path: String? = nil, owns_db_file: Bool = true, callbackHandler: Ndb.CallbackHandler) -> ndb_t? {
     85         var ndb_p: OpaquePointer? = nil
     86 
     87         let ingest_threads: Int32 = 4
     88         var mapsize: Int = 1024 * 1024 * 1024 * 32
     89         
     90         if path == nil && owns_db_file {
     91             // `nil` path indicates the default path will be used.
     92             // The default path changed over time, so migrate the database to the new location if needed
     93             do {
     94                 try Self.migrate_db_location_if_needed()
     95             }
     96             catch {
     97                 // If it fails to migrate, the app can still run without serious consequences. Log instead.
     98                 Log.error("Error migrating NostrDB to new file container", for: .storage)
     99             }
    100         }
    101         
    102         guard let db_path = Self.db_path,
    103               owns_db_file || Self.db_files_exist(path: db_path) else {
    104             return nil      // If the caller claims to not own the DB file, and the DB files do not exist, then we should not initialize Ndb
    105         }
    106 
    107         guard let path = path.map(remove_file_prefix) ?? Ndb.db_path else {
    108             return nil
    109         }
    110 
    111         let ok = path.withCString { testdir in
    112             var ok = false
    113             while !ok && mapsize > 1024 * 1024 * 700 {
    114                 var cfg = ndb_config(flags: 0, ingester_threads: ingest_threads, mapsize: mapsize, filter_context: nil, ingest_filter: nil, sub_cb_ctx: nil, sub_cb: nil)
    115                 
    116                 // Here we hook up the global callback function for subscription callbacks.
    117                 // We do an "unretained" pass here because the lifetime of the callback handler is larger than the lifetime of the nostrdb monitor in the C code.
    118                 // The NostrDB monitor that makes the callbacks should in theory _never_ outlive the callback handler.
    119                 //
    120                 // This means that:
    121                 // - for as long as nostrdb is running, its parent Ndb instance will be alive, keeping the callback handler alive.
    122                 // - when the Ndb instance is deinitialized — and the callback handler comes down with it — the `deinit` function will destroy the nostrdb monitor, preventing it from accessing freed memory.
    123                 //
    124                 // Therefore, we do not need to increase reference count to callbackHandler. The tightly coupled lifetimes will ensure that it is always alive when the ndb_monitor is alive.
    125                 let ctx: UnsafeMutableRawPointer = Unmanaged.passUnretained(callbackHandler).toOpaque()
    126                 ndb_config_set_subscription_callback(&cfg, subscription_callback, ctx)
    127                 
    128                 let res = ndb_init(&ndb_p, testdir, &cfg);
    129                 ok = res != 0;
    130                 if !ok {
    131                     Log.error("ndb_init failed: %d, reducing mapsize from %d to %d", for: .storage, res, mapsize, mapsize / 2)
    132                     mapsize /= 2
    133                 }
    134             }
    135             return ok
    136         }
    137 
    138         if !ok {
    139             return nil
    140         }
    141         
    142         let ndb_instance = ndb_t(ndb: ndb_p)
    143         Task { await callbackHandler.set(ndb: ndb_instance) }
    144         return ndb_instance
    145     }
    146 
    147     init?(path: String? = nil, owns_db_file: Bool = true) {
    148         let callbackHandler = Ndb.CallbackHandler()
    149         guard let db = Self.open(path: path, owns_db_file: owns_db_file, callbackHandler: callbackHandler) else {
    150             return nil
    151         }
    152         
    153         self.generation = 0
    154         self.path = path
    155         self.owns_db = owns_db_file
    156         self.ndb = db
    157         self.closed = false
    158         self.callbackHandler = callbackHandler
    159     }
    160     
    161     private static func migrate_db_location_if_needed() throws {
    162         guard let old_db_path, let db_path else {
    163             throw Errors.cannot_find_db_path
    164         }
    165         
    166         let file_manager = FileManager.default
    167         
    168         let old_db_files_exist = Self.db_files_exist(path: old_db_path)
    169         let new_db_files_exist = Self.db_files_exist(path: db_path)
    170         
    171         // Migration rules:
    172         // 1. If DB files exist in the old path but not the new one, move files to the new path
    173         // 2. If files do not exist anywhere, do nothing (let new DB be initialized)
    174         // 3. If files exist in the new path, but not the old one, nothing needs to be done
    175         // 4. If files exist on both, do nothing.
    176         // Scenario 4 likely means that user has downgraded and re-upgraded.
    177         // Although it might make sense to get the most recent DB, it might lead to data loss.
    178         // If we leave both intact, it makes it easier to fix later, as no data loss would occur.
    179         if old_db_files_exist && !new_db_files_exist {
    180             Log.info("Migrating NostrDB to new file location…", for: .storage)
    181             do {
    182                 try db_files.forEach { db_file in
    183                     let old_path = "\(old_db_path)/\(db_file)"
    184                     let new_path = "\(db_path)/\(db_file)"
    185                     try file_manager.moveItem(atPath: old_path, toPath: new_path)
    186                 }
    187                 Log.info("NostrDB files successfully migrated to the new location", for: .storage)
    188             } catch {
    189                 throw Errors.db_file_migration_error
    190             }
    191         }
    192     }
    193     
    194     private static func db_files_exist(path: String) -> Bool {
    195         return db_files.allSatisfy { FileManager.default.fileExists(atPath: "\(path)/\($0)") }
    196     }
    197 
    198     init(ndb: ndb_t) {
    199         self.ndb = ndb
    200         self.generation = 0
    201         self.path = nil
    202         self.owns_db = true
    203         self.closed = false
    204         // This simple initialization will cause subscriptions not to be ever called. Probably fine because this initializer is used only for empty example ndb instances.
    205         self.callbackHandler = Ndb.CallbackHandler()
    206     }
    207     
    208     func close() {
    209         guard !self.is_closed else { return }
    210         self.closed = true
    211         print("txn: CLOSING NOSTRDB")
    212         ndb_destroy(self.ndb.ndb)
    213         self.generation += 1
    214         print("txn: NOSTRDB CLOSED")
    215     }
    216 
    217     func reopen() -> Bool {
    218         guard self.is_closed,
    219               let db = Self.open(path: self.path, owns_db_file: self.owns_db, callbackHandler: self.callbackHandler) else {
    220             return false
    221         }
    222         
    223         print("txn: NOSTRDB REOPENED (gen \(generation))")
    224 
    225         self.closed = false
    226         self.ndb = db
    227         return true
    228     }
    229     
    230     func lookup_blocks_by_key_with_txn(_ key: NoteKey, txn: RawNdbTxnAccessible) -> NdbBlockGroup.BlocksMetadata? {
    231         guard let blocks = ndb_get_blocks_by_key(self.ndb.ndb, &txn.txn, key) else {
    232             return nil
    233         }
    234 
    235         return NdbBlockGroup.BlocksMetadata(ptr: blocks)
    236     }
    237 
    238     func lookup_blocks_by_key(_ key: NoteKey) -> SafeNdbTxn<NdbBlockGroup.BlocksMetadata?>? {
    239         SafeNdbTxn<NdbBlockGroup.BlocksMetadata?>.new(on: self) { txn in
    240             lookup_blocks_by_key_with_txn(key, txn: txn)
    241         }
    242     }
    243 
    244     func lookup_note_by_key_with_txn<Y>(_ key: NoteKey, txn: NdbTxn<Y>) -> NdbNote? {
    245         var size: Int = 0
    246         guard let note_p = ndb_get_note_by_key(&txn.txn, key, &size) else {
    247             return nil
    248         }
    249         let ptr = ndb_note_ptr(ptr: note_p)
    250         return NdbNote(note: ptr, size: size, owned: false, key: key)
    251     }
    252 
    253     func text_search(query: String, limit: Int = 128, order: NdbSearchOrder = .newest_first) -> [NoteKey] {
    254         guard let txn = NdbTxn(ndb: self) else { return [] }
    255         var results = ndb_text_search_results()
    256         let res = query.withCString { q in
    257             let order = order == .newest_first ? NDB_ORDER_DESCENDING : NDB_ORDER_ASCENDING
    258             var config = ndb_text_search_config(order: order, limit: Int32(limit))
    259             return ndb_text_search(&txn.txn, q, &results, &config)
    260         }
    261 
    262         if res == 0 {
    263             return []
    264         }
    265 
    266         var note_ids = [NoteKey]()
    267         for i in 0..<results.num_results {
    268             // seriously wtf
    269             switch i {
    270             case 0: note_ids.append(results.results.0.key.note_id)
    271             case 1: note_ids.append(results.results.1.key.note_id)
    272             case 2: note_ids.append(results.results.2.key.note_id)
    273             case 3: note_ids.append(results.results.3.key.note_id)
    274             case 4: note_ids.append(results.results.4.key.note_id)
    275             case 5: note_ids.append(results.results.5.key.note_id)
    276             case 6: note_ids.append(results.results.6.key.note_id)
    277             case 7: note_ids.append(results.results.7.key.note_id)
    278             case 8: note_ids.append(results.results.8.key.note_id)
    279             case 9: note_ids.append(results.results.9.key.note_id)
    280             case 10: note_ids.append(results.results.10.key.note_id)
    281             case 11: note_ids.append(results.results.11.key.note_id)
    282             case 12: note_ids.append(results.results.12.key.note_id)
    283             case 13: note_ids.append(results.results.13.key.note_id)
    284             case 14: note_ids.append(results.results.14.key.note_id)
    285             case 15: note_ids.append(results.results.15.key.note_id)
    286             case 16: note_ids.append(results.results.16.key.note_id)
    287             case 17: note_ids.append(results.results.17.key.note_id)
    288             case 18: note_ids.append(results.results.18.key.note_id)
    289             case 19: note_ids.append(results.results.19.key.note_id)
    290             case 20: note_ids.append(results.results.20.key.note_id)
    291             case 21: note_ids.append(results.results.21.key.note_id)
    292             case 22: note_ids.append(results.results.22.key.note_id)
    293             case 23: note_ids.append(results.results.23.key.note_id)
    294             case 24: note_ids.append(results.results.24.key.note_id)
    295             case 25: note_ids.append(results.results.25.key.note_id)
    296             case 26: note_ids.append(results.results.26.key.note_id)
    297             case 27: note_ids.append(results.results.27.key.note_id)
    298             case 28: note_ids.append(results.results.28.key.note_id)
    299             case 29: note_ids.append(results.results.29.key.note_id)
    300             case 30: note_ids.append(results.results.30.key.note_id)
    301             case 31: note_ids.append(results.results.31.key.note_id)
    302             case 32: note_ids.append(results.results.32.key.note_id)
    303             case 33: note_ids.append(results.results.33.key.note_id)
    304             case 34: note_ids.append(results.results.34.key.note_id)
    305             case 35: note_ids.append(results.results.35.key.note_id)
    306             case 36: note_ids.append(results.results.36.key.note_id)
    307             case 37: note_ids.append(results.results.37.key.note_id)
    308             case 38: note_ids.append(results.results.38.key.note_id)
    309             case 39: note_ids.append(results.results.39.key.note_id)
    310             case 40: note_ids.append(results.results.40.key.note_id)
    311             case 41: note_ids.append(results.results.41.key.note_id)
    312             case 42: note_ids.append(results.results.42.key.note_id)
    313             case 43: note_ids.append(results.results.43.key.note_id)
    314             case 44: note_ids.append(results.results.44.key.note_id)
    315             case 45: note_ids.append(results.results.45.key.note_id)
    316             case 46: note_ids.append(results.results.46.key.note_id)
    317             case 47: note_ids.append(results.results.47.key.note_id)
    318             case 48: note_ids.append(results.results.48.key.note_id)
    319             case 49: note_ids.append(results.results.49.key.note_id)
    320             case 50: note_ids.append(results.results.50.key.note_id)
    321             case 51: note_ids.append(results.results.51.key.note_id)
    322             case 52: note_ids.append(results.results.52.key.note_id)
    323             case 53: note_ids.append(results.results.53.key.note_id)
    324             case 54: note_ids.append(results.results.54.key.note_id)
    325             case 55: note_ids.append(results.results.55.key.note_id)
    326             case 56: note_ids.append(results.results.56.key.note_id)
    327             case 57: note_ids.append(results.results.57.key.note_id)
    328             case 58: note_ids.append(results.results.58.key.note_id)
    329             case 59: note_ids.append(results.results.59.key.note_id)
    330             case 60: note_ids.append(results.results.60.key.note_id)
    331             case 61: note_ids.append(results.results.61.key.note_id)
    332             case 62: note_ids.append(results.results.62.key.note_id)
    333             case 63: note_ids.append(results.results.63.key.note_id)
    334             case 64: note_ids.append(results.results.64.key.note_id)
    335             case 65: note_ids.append(results.results.65.key.note_id)
    336             case 66: note_ids.append(results.results.66.key.note_id)
    337             case 67: note_ids.append(results.results.67.key.note_id)
    338             case 68: note_ids.append(results.results.68.key.note_id)
    339             case 69: note_ids.append(results.results.69.key.note_id)
    340             case 70: note_ids.append(results.results.70.key.note_id)
    341             case 71: note_ids.append(results.results.71.key.note_id)
    342             case 72: note_ids.append(results.results.72.key.note_id)
    343             case 73: note_ids.append(results.results.73.key.note_id)
    344             case 74: note_ids.append(results.results.74.key.note_id)
    345             case 75: note_ids.append(results.results.75.key.note_id)
    346             case 76: note_ids.append(results.results.76.key.note_id)
    347             case 77: note_ids.append(results.results.77.key.note_id)
    348             case 78: note_ids.append(results.results.78.key.note_id)
    349             case 79: note_ids.append(results.results.79.key.note_id)
    350             case 80: note_ids.append(results.results.80.key.note_id)
    351             case 81: note_ids.append(results.results.81.key.note_id)
    352             case 82: note_ids.append(results.results.82.key.note_id)
    353             case 83: note_ids.append(results.results.83.key.note_id)
    354             case 84: note_ids.append(results.results.84.key.note_id)
    355             case 85: note_ids.append(results.results.85.key.note_id)
    356             case 86: note_ids.append(results.results.86.key.note_id)
    357             case 87: note_ids.append(results.results.87.key.note_id)
    358             case 88: note_ids.append(results.results.88.key.note_id)
    359             case 89: note_ids.append(results.results.89.key.note_id)
    360             case 90: note_ids.append(results.results.90.key.note_id)
    361             case 91: note_ids.append(results.results.91.key.note_id)
    362             case 92: note_ids.append(results.results.92.key.note_id)
    363             case 93: note_ids.append(results.results.93.key.note_id)
    364             case 94: note_ids.append(results.results.94.key.note_id)
    365             case 95: note_ids.append(results.results.95.key.note_id)
    366             case 96: note_ids.append(results.results.96.key.note_id)
    367             case 97: note_ids.append(results.results.97.key.note_id)
    368             case 98: note_ids.append(results.results.98.key.note_id)
    369             case 99: note_ids.append(results.results.99.key.note_id)
    370             case 100: note_ids.append(results.results.100.key.note_id)
    371             case 101: note_ids.append(results.results.101.key.note_id)
    372             case 102: note_ids.append(results.results.102.key.note_id)
    373             case 103: note_ids.append(results.results.103.key.note_id)
    374             case 104: note_ids.append(results.results.104.key.note_id)
    375             case 105: note_ids.append(results.results.105.key.note_id)
    376             case 106: note_ids.append(results.results.106.key.note_id)
    377             case 107: note_ids.append(results.results.107.key.note_id)
    378             case 108: note_ids.append(results.results.108.key.note_id)
    379             case 109: note_ids.append(results.results.109.key.note_id)
    380             case 110: note_ids.append(results.results.110.key.note_id)
    381             case 111: note_ids.append(results.results.111.key.note_id)
    382             case 112: note_ids.append(results.results.112.key.note_id)
    383             case 113: note_ids.append(results.results.113.key.note_id)
    384             case 114: note_ids.append(results.results.114.key.note_id)
    385             case 115: note_ids.append(results.results.115.key.note_id)
    386             case 116: note_ids.append(results.results.116.key.note_id)
    387             case 117: note_ids.append(results.results.117.key.note_id)
    388             case 118: note_ids.append(results.results.118.key.note_id)
    389             case 119: note_ids.append(results.results.119.key.note_id)
    390             case 120: note_ids.append(results.results.120.key.note_id)
    391             case 121: note_ids.append(results.results.121.key.note_id)
    392             case 122: note_ids.append(results.results.122.key.note_id)
    393             case 123: note_ids.append(results.results.123.key.note_id)
    394             case 124: note_ids.append(results.results.124.key.note_id)
    395             case 125: note_ids.append(results.results.125.key.note_id)
    396             case 126: note_ids.append(results.results.126.key.note_id)
    397             case 127: note_ids.append(results.results.127.key.note_id)
    398             default:
    399                 break
    400             }
    401         }
    402 
    403         return note_ids
    404     }
    405 
    406     func lookup_note_by_key(_ key: NoteKey) -> NdbTxn<NdbNote?>? {
    407         return NdbTxn(ndb: self) { txn in
    408             lookup_note_by_key_with_txn(key, txn: txn)
    409         }
    410     }
    411 
    412     private func lookup_profile_by_key_inner<Y>(_ key: ProfileKey, txn: NdbTxn<Y>) -> ProfileRecord? {
    413         var size: Int = 0
    414         guard let profile_p = ndb_get_profile_by_key(&txn.txn, key, &size) else {
    415             return nil
    416         }
    417 
    418         return profile_flatbuf_to_record(ptr: profile_p, size: size, key: key)
    419     }
    420 
    421     private func profile_flatbuf_to_record(ptr: UnsafeMutableRawPointer, size: Int, key: UInt64) -> ProfileRecord? {
    422         do {
    423             var buf = ByteBuffer(assumingMemoryBound: ptr, capacity: size)
    424             let rec: NdbProfileRecord = try getDebugCheckedRoot(byteBuffer: &buf)
    425             return ProfileRecord(data: rec, key: key)
    426         } catch {
    427             // Handle error appropriately
    428             print("UNUSUAL: \(error)")
    429             return nil
    430         }
    431     }
    432 
    433     private func lookup_note_with_txn_inner<Y>(id: NoteId, txn: NdbTxn<Y>) -> NdbNote? {
    434         return id.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NdbNote? in
    435             var key: UInt64 = 0
    436             var size: Int = 0
    437             guard let baseAddress = ptr.baseAddress,
    438                   let note_p = ndb_get_note_by_id(&txn.txn, baseAddress, &size, &key) else {
    439                 return nil
    440             }
    441             let ptr = ndb_note_ptr(ptr: note_p)
    442             return NdbNote(note: ptr, size: size, owned: false, key: key)
    443         }
    444     }
    445 
    446     private func lookup_profile_with_txn_inner<Y>(pubkey: Pubkey, txn: NdbTxn<Y>) -> ProfileRecord? {
    447         return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> ProfileRecord? in
    448             var size: Int = 0
    449             var key: UInt64 = 0
    450 
    451             guard let baseAddress = ptr.baseAddress,
    452                   let profile_p = ndb_get_profile_by_pubkey(&txn.txn, baseAddress, &size, &key)
    453             else {
    454                 return nil
    455             }
    456 
    457             return profile_flatbuf_to_record(ptr: profile_p, size: size, key: key)
    458         }
    459     }
    460 
    461     func lookup_profile_by_key_with_txn<Y>(key: ProfileKey, txn: NdbTxn<Y>) -> ProfileRecord? {
    462         lookup_profile_by_key_inner(key, txn: txn)
    463     }
    464 
    465     func lookup_profile_by_key(key: ProfileKey) -> NdbTxn<ProfileRecord?>? {
    466         return NdbTxn(ndb: self) { txn in
    467             lookup_profile_by_key_inner(key, txn: txn)
    468         }
    469     }
    470 
    471     func lookup_note_with_txn<Y>(id: NoteId, txn: NdbTxn<Y>) -> NdbNote? {
    472         lookup_note_with_txn_inner(id: id, txn: txn)
    473     }
    474 
    475     func lookup_profile_key(_ pubkey: Pubkey) -> ProfileKey? {
    476         guard let txn = NdbTxn(ndb: self, with: { txn in
    477             lookup_profile_key_with_txn(pubkey, txn: txn)
    478         }) else {
    479             return nil
    480         }
    481 
    482         return txn.value
    483     }
    484 
    485     func lookup_profile_key_with_txn<Y>(_ pubkey: Pubkey, txn: NdbTxn<Y>) -> ProfileKey? {
    486         return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NoteKey? in
    487             guard let p = ptr.baseAddress else { return nil }
    488             let r = ndb_get_profilekey_by_pubkey(&txn.txn, p)
    489             if r == 0 {
    490                 return nil
    491             }
    492             return r
    493         }
    494     }
    495 
    496     func lookup_note_key_with_txn(_ id: NoteId, txn: some RawNdbTxnAccessible) -> NoteKey? {
    497         guard !closed else { return nil }
    498         return id.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NoteKey? in
    499             guard let p = ptr.baseAddress else {
    500                 return nil
    501             }
    502             let r = ndb_get_notekey_by_id(&txn.txn, p)
    503             if r == 0 {
    504                 return nil
    505             }
    506             return r
    507         }
    508     }
    509 
    510     func lookup_note_key(_ id: NoteId) -> NoteKey? {
    511         guard let txn = NdbTxn(ndb: self, with: { txn in
    512             lookup_note_key_with_txn(id, txn: txn)
    513         }) else {
    514             return nil
    515         }
    516 
    517         return txn.value
    518     }
    519 
    520     func lookup_note(_ id: NoteId, txn_name: String? = nil) -> NdbTxn<NdbNote?>? {
    521         NdbTxn(ndb: self, name: txn_name) { txn in
    522             lookup_note_with_txn_inner(id: id, txn: txn)
    523         }
    524     }
    525 
    526     func lookup_profile(_ pubkey: Pubkey, txn_name: String? = nil) -> NdbTxn<ProfileRecord?>? {
    527         NdbTxn(ndb: self, name: txn_name) { txn in
    528             lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn)
    529         }
    530     }
    531 
    532     func lookup_profile_with_txn<Y>(_ pubkey: Pubkey, txn: NdbTxn<Y>) -> ProfileRecord? {
    533         lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn)
    534     }
    535     
    536     func process_client_event(_ str: String) -> Bool {
    537         guard !self.is_closed else { return false }
    538         return str.withCString { cstr in
    539             return ndb_process_client_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0
    540         }
    541     }
    542 
    543     func write_profile_last_fetched(pubkey: Pubkey, fetched_at: UInt64) {
    544         guard !closed else { return }
    545         let _ = pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> () in
    546             guard let p = ptr.baseAddress else { return }
    547             ndb_write_last_profile_fetch(ndb.ndb, p, fetched_at)
    548         }
    549     }
    550 
    551     func read_profile_last_fetched<Y>(txn: NdbTxn<Y>, pubkey: Pubkey) -> UInt64? {
    552         guard !closed else { return nil }
    553         return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> UInt64? in
    554             guard let p = ptr.baseAddress else { return nil }
    555             let res = ndb_read_last_profile_fetch(&txn.txn, p)
    556             if res == 0 {
    557                 return nil
    558             }
    559 
    560             return res
    561         }
    562     }
    563 
    564     func process_event(_ str: String) -> Bool {
    565         guard !is_closed else { return false }
    566         return str.withCString { cstr in
    567             return ndb_process_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0
    568         }
    569     }
    570 
    571     func process_events(_ str: String) -> Bool {
    572         guard !is_closed else { return false }
    573         return str.withCString { cstr in
    574             return ndb_process_events(ndb.ndb, cstr, str.utf8.count) != 0
    575         }
    576     }
    577 
    578     func search_profile<Y>(_ search: String, limit: Int, txn: NdbTxn<Y>) -> [Pubkey] {
    579         var pks = Array<Pubkey>()
    580 
    581         return search.withCString { q in
    582             var s = ndb_search()
    583             guard ndb_search_profile(&txn.txn, &s, q) != 0 else {
    584                 return pks
    585             }
    586 
    587             defer { ndb_search_profile_end(&s) }
    588             pks.append(Pubkey(Data(bytes: &s.key.pointee.id.0, count: 32)))
    589 
    590             var n = limit
    591             while n > 0 {
    592                 guard ndb_search_profile_next(&s) != 0 else {
    593                     return pks
    594                 }
    595                 pks.append(Pubkey(Data(bytes: &s.key.pointee.id.0, count: 32)))
    596 
    597                 n -= 1
    598             }
    599 
    600             return pks
    601         }
    602     }
    603     
    604     // MARK: NdbFilter queries and subscriptions
    605     
    606     /// Safe wrapper around the `ndb_query` C function
    607     /// - Parameters:
    608     ///   - txn: Database transaction
    609     ///   - filters: Array of NdbFilter objects
    610     ///   - maxResults: Maximum number of results to return
    611     /// - Returns: Array of note keys matching the filters
    612     /// - Throws: NdbStreamError if the query fails
    613     func query<Y>(with txn: NdbTxn<Y>, filters: [NdbFilter], maxResults: Int) throws(NdbStreamError) -> [NoteKey] {
    614         let filtersPointer = UnsafeMutablePointer<ndb_filter>.allocate(capacity: filters.count)
    615         defer { filtersPointer.deallocate() }
    616         
    617         for (index, ndbFilter) in filters.enumerated() {
    618             filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter
    619         }
    620         
    621         let count = UnsafeMutablePointer<Int32>.allocate(capacity: 1)
    622         defer { count.deallocate() }
    623         
    624         let results = UnsafeMutablePointer<ndb_query_result>.allocate(capacity: maxResults)
    625         defer { results.deallocate() }
    626         
    627         guard ndb_query(&txn.txn, filtersPointer, Int32(filters.count), results, Int32(maxResults), count) == 1 else {
    628             throw NdbStreamError.initialQueryFailed
    629         }
    630         
    631         var noteIds: [NoteKey] = []
    632         for i in 0..<count.pointee {
    633             noteIds.append(results.advanced(by: Int(i)).pointee.note_id)
    634         }
    635         
    636         return noteIds
    637     }
    638     
    639     /// Safe wrapper around `ndb_subscribe` that handles all pointer management
    640     /// - Parameters:
    641     ///   - filters: Array of NdbFilter objects
    642     /// - Returns: AsyncStream of StreamItem events for new matches only
    643     private func ndbSubscribe(filters: [NdbFilter]) -> AsyncStream<StreamItem> {
    644         return AsyncStream<StreamItem> { continuation in
    645             // Allocate filters pointer - will be deallocated when subscription ends
    646             // Cannot use `defer` to deallocate `filtersPointer` because it needs to remain valid for the lifetime of the subscription, which extends beyond this block's scope.
    647             let filtersPointer = UnsafeMutablePointer<ndb_filter>.allocate(capacity: filters.count)
    648             for (index, ndbFilter) in filters.enumerated() {
    649                 filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter
    650             }
    651             
    652             var streaming = true
    653             var subid: UInt64 = 0
    654             var terminationStarted = false
    655             
    656             // Set up termination handler
    657             continuation.onTermination = { @Sendable _ in
    658                 guard !terminationStarted else { return }   // Avoid race conditions between two termination closures
    659                 terminationStarted = true
    660                 Log.debug("ndb_wait: stream: Terminated early", for: .ndb)
    661                 streaming = false
    662                 // Clean up resources on early termination
    663                 if subid != 0 {
    664                     ndb_unsubscribe(self.ndb.ndb, subid)
    665                     Task { await self.unsetCallback(subscriptionId: subid) }
    666                 }
    667                 filtersPointer.deallocate()
    668             }
    669             
    670             if !streaming {
    671                 return
    672             }
    673             
    674             // Set up subscription
    675             subid = ndb_subscribe(self.ndb.ndb, filtersPointer, Int32(filters.count))
    676             
    677             // Set the subscription callback
    678             Task {
    679                 await self.setCallback(for: subid, callback: { noteKey in
    680                     continuation.yield(.event(noteKey))
    681                 })
    682             }
    683             
    684             // Update termination handler to include subscription cleanup
    685             continuation.onTermination = { @Sendable _ in
    686                 guard !terminationStarted else { return }   // Avoid race conditions between two termination closures
    687                 terminationStarted = true
    688                 Log.debug("ndb_wait: stream: Terminated early", for: .ndb)
    689                 streaming = false
    690                 ndb_unsubscribe(self.ndb.ndb, subid)
    691                 Task { await self.unsetCallback(subscriptionId: subid) }
    692                 filtersPointer.deallocate()
    693             }
    694         }
    695     }
    696     
    697     func subscribe(filters: [NdbFilter], maxSimultaneousResults: Int = 1000) throws(NdbStreamError) -> AsyncStream<StreamItem> {
    698         // Fetch initial results
    699         guard let txn = NdbTxn(ndb: self) else { throw .cannotOpenTransaction }
    700         
    701         // Use our safe wrapper instead of direct C function call
    702         let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults)
    703         
    704         // Create a subscription for new events
    705         let newEventsStream = ndbSubscribe(filters: filters)
    706         
    707         // Create a cascading stream that combines initial results with new events
    708         return AsyncStream<StreamItem> { continuation in
    709             // Stream all results already present in the database
    710             for noteId in noteIds {
    711                 continuation.yield(.event(noteId))
    712             }
    713             
    714             // Indicate this is the end of the results currently present in the database
    715             continuation.yield(.eose)
    716             
    717             // Create a task to forward events from the subscription stream
    718             let forwardingTask = Task {
    719                 for await item in newEventsStream {
    720                     continuation.yield(item)
    721                 }
    722                 continuation.finish()
    723             }
    724             
    725             // Handle termination by canceling the forwarding task
    726             continuation.onTermination = { @Sendable _ in
    727                 forwardingTask.cancel()
    728             }
    729         }
    730     }
    731     
    732     private func waitWithoutTimeout(for noteId: NoteId) async throws(NdbLookupError) -> NdbTxn<NdbNote>? {
    733         do {
    734             for try await item in try self.subscribe(filters: [NostrFilter(ids: [noteId])]) {
    735                 switch item {
    736                 case .eose:
    737                     continue
    738                 case .event(let noteKey):
    739                     guard let txn = NdbTxn(ndb: self) else { throw NdbLookupError.cannotOpenTransaction }
    740                     guard let note = self.lookup_note_by_key_with_txn(noteKey, txn: txn) else { throw NdbLookupError.internalInconsistency }
    741                     if note.id == noteId {
    742                         Log.debug("ndb wait: %d has matching id %s. Returning transaction", for: .ndb, noteKey, noteId.hex())
    743                         return NdbTxn<NdbNote>.pure(ndb: self, val: note)
    744                     }
    745                 }
    746             }
    747         }
    748         catch {
    749             if let error = error as? NdbStreamError { throw NdbLookupError.streamError(error) }
    750             else if let error = error as? NdbLookupError { throw error }
    751             else { throw .internalInconsistency }
    752         }
    753         return nil
    754     }
    755     
    756     func waitFor(noteId: NoteId, timeout: TimeInterval = 10) async throws(NdbLookupError) -> NdbTxn<NdbNote>? {
    757         do {
    758             return try await withCheckedThrowingContinuation({ continuation in
    759                 var done = false
    760                 let waitTask = Task {
    761                     do {
    762                         Log.debug("ndb_wait: Waiting for %s", for: .ndb, noteId.hex())
    763                         let result = try await self.waitWithoutTimeout(for: noteId)
    764                         if !done {
    765                             Log.debug("ndb_wait: Found %s", for: .ndb, noteId.hex())
    766                             continuation.resume(returning: result)
    767                             done = true
    768                         }
    769                     }
    770                     catch {
    771                         if Task.isCancelled {
    772                             return  // the timeout task will handle throwing the timeout error
    773                         }
    774                         if !done {
    775                             Log.debug("ndb_wait: Error on %s: %s", for: .ndb, noteId.hex(), error.localizedDescription)
    776                             continuation.resume(throwing: error)
    777                             done = true
    778                         }
    779                     }
    780                 }
    781                 
    782                 let timeoutTask = Task {
    783                     try await Task.sleep(for: .seconds(Int(timeout)))
    784                     if !done {
    785                         Log.debug("ndb_wait: Timeout on %s. Cancelling wait task…", for: .ndb, noteId.hex())
    786                         done = true
    787                         print("ndb_wait: throwing timeout error")
    788                         continuation.resume(throwing: NdbLookupError.timeout)
    789                     }
    790                     waitTask.cancel()
    791                 }
    792             })
    793         }
    794         catch {
    795             if let error = error as? NdbLookupError { throw error }
    796             else { throw .internalInconsistency }
    797         }
    798     }
    799     
    800     // MARK: Internal ndb callback interfaces
    801     
    802     internal func setCallback(for subscriptionId: UInt64, callback: @escaping (NoteKey) -> Void) async {
    803         await self.callbackHandler.set(callback: callback, for: subscriptionId)
    804     }
    805     
    806     internal func unsetCallback(subscriptionId: UInt64) async {
    807         await self.callbackHandler.unset(subid: subscriptionId)
    808     }
    809     
    810     // MARK: Helpers
    811     
    812     enum Errors: Error {
    813         case cannot_find_db_path
    814         case db_file_migration_error
    815     }
    816     
    817     // MARK: Deinitialization
    818 
    819     deinit {
    820         print("txn: Ndb de-init")
    821         self.close()
    822     }
    823 }
    824 
    825 
    826 // MARK: - Extensions and helper structures and functions
    827 
    828 extension Ndb {
    829     /// A class that is used to handle callbacks from nostrdb
    830     ///
    831     /// This is a separate class from `Ndb` because it simplifies the initialization logic
    832     actor CallbackHandler {
    833         /// Holds the ndb instance in the C codebase. Should be shared with `Ndb`
    834         var ndb: ndb_t? = nil
    835         /// A map from nostrdb subscription ids to callbacks
    836         var subscriptionCallbackMap: [UInt64: (NoteKey) -> Void] = [:]
    837         
    838         func set(callback: @escaping (NoteKey) -> Void, for subid: UInt64) {
    839             subscriptionCallbackMap[subid] = callback
    840         }
    841         
    842         func unset(subid: UInt64) {
    843             subscriptionCallbackMap[subid] = nil
    844         }
    845         
    846         func set(ndb: ndb_t?) {
    847             self.ndb = ndb
    848         }
    849         
    850         /// Handles callbacks from nostrdb subscriptions, and routes them to the correct callback
    851         func handleSubscriptionCallback(subId: UInt64, maxCapacity: Int32 = 1000) {
    852             if let callback = subscriptionCallbackMap[subId] {
    853                 let result = UnsafeMutablePointer<UInt64>.allocate(capacity: Int(maxCapacity))
    854                 defer { result.deallocate() }  // Ensure we deallocate memory before leaving the function to avoid memory leaks
    855                 if let ndb {
    856                     let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity)
    857                     for i in 0..<numberOfNotes {
    858                         callback(result.advanced(by: Int(i)).pointee)
    859                     }
    860                 }
    861             }
    862         }
    863     }
    864     
    865     /// An item that comes out of a subscription stream
    866     enum StreamItem {
    867         /// End of currently stored events
    868         case eose
    869         /// An event in NostrDB available at the given note key
    870         case event(NoteKey)
    871     }
    872     
    873     /// An error that may happen during nostrdb streaming
    874     enum NdbStreamError: Error {
    875         case cannotOpenTransaction
    876         case cannotConvertFilter(any Error)
    877         case initialQueryFailed
    878         case timeout
    879     }
    880     
    881     /// An error that may happen when looking something up
    882     enum NdbLookupError: Error {
    883         case cannotOpenTransaction
    884         case streamError(NdbStreamError)
    885         case internalInconsistency
    886         case timeout
    887     }
    888 }
    889 
    890 /// This callback "trampoline" function will be called when new notes arrive for NostrDB subscriptions.
    891 ///
    892 /// This is needed as a separate global function in order to allow us to pass it to the C code as a callback (We can't pass native Swift fuctions directly as callbacks).
    893 ///
    894 /// - Parameters:
    895 ///   - ctx: A pointer to a context object setup during initialization. This allows this function to "find" the correct place to call. MUST be a pointer to a `CallbackHandler`, otherwise this will trigger a crash
    896 ///   - subid: The NostrDB subscription ID, which identifies the subscription that is being called back
    897 @_cdecl("subscription_callback")
    898 public func subscription_callback(ctx: UnsafeMutableRawPointer?, subid: UInt64) {
    899     guard let ctx else { return }
    900     let handler = Unmanaged<Ndb.CallbackHandler>.fromOpaque(ctx).takeUnretainedValue()
    901     Task {
    902         await handler.handleSubscriptionCallback(subId: subid)
    903     }
    904 }
    905 
    906 #if DEBUG
    907 func getDebugCheckedRoot<T: FlatBufferObject>(byteBuffer: inout ByteBuffer) throws -> T {
    908     return getRoot(byteBuffer: &byteBuffer)
    909 }
    910 #else
    911 func getDebugCheckedRoot<T: FlatBufferObject>(byteBuffer: inout ByteBuffer) throws -> T {
    912     return getRoot(byteBuffer: &byteBuffer)
    913 }
    914 #endif
    915 
    916 func remove_file_prefix(_ str: String) -> String {
    917     return str.replacingOccurrences(of: "file://", with: "")
    918 }
    919