PostBox.swift (5246B)
1 // 2 // PostBox.swift 3 // damus 4 // 5 // Created by William Casarin on 2023-03-20. 6 // 7 8 import Foundation 9 10 11 class Relayer { 12 let relay: RelayURL 13 var attempts: Int 14 var retry_after: Double 15 var last_attempt: Int64? 16 17 init(relay: RelayURL, attempts: Int, retry_after: Double) { 18 self.relay = relay 19 self.attempts = attempts 20 self.retry_after = retry_after 21 self.last_attempt = nil 22 } 23 } 24 25 enum OnFlush { 26 case once((PostedEvent) -> Void) 27 case all((PostedEvent) -> Void) 28 } 29 30 class PostedEvent { 31 let event: NostrEvent 32 let skip_ephemeral: Bool 33 var remaining: [Relayer] 34 let flush_after: Date? 35 var flushed_once: Bool 36 let on_flush: OnFlush? 37 38 init(event: NostrEvent, remaining: [RelayURL], skip_ephemeral: Bool, flush_after: Date?, on_flush: OnFlush?) { 39 self.event = event 40 self.skip_ephemeral = skip_ephemeral 41 self.flush_after = flush_after 42 self.on_flush = on_flush 43 self.flushed_once = false 44 self.remaining = remaining.map { 45 Relayer(relay: $0, attempts: 0, retry_after: 10.0) 46 } 47 } 48 } 49 50 enum CancelSendErr { 51 case nothing_to_cancel 52 case not_delayed 53 case too_late 54 } 55 56 class PostBox { 57 let pool: RelayPool 58 var events: [NoteId: PostedEvent] 59 60 init(pool: RelayPool) { 61 self.pool = pool 62 self.events = [:] 63 pool.register_handler(sub_id: "postbox", handler: handle_event) 64 } 65 66 // only works reliably on delay-sent events 67 func cancel_send(evid: NoteId) -> CancelSendErr? { 68 guard let ev = events[evid] else { 69 return .nothing_to_cancel 70 } 71 72 guard let after = ev.flush_after else { 73 return .not_delayed 74 } 75 76 guard Date.now < after else { 77 return .too_late 78 } 79 80 events.removeValue(forKey: evid) 81 return nil 82 } 83 84 func try_flushing_events() { 85 let now = Int64(Date().timeIntervalSince1970) 86 for kv in events { 87 let event = kv.value 88 89 // some are delayed 90 if let after = event.flush_after, Date.now.timeIntervalSince1970 < after.timeIntervalSince1970 { 91 continue 92 } 93 94 for relayer in event.remaining { 95 if relayer.last_attempt == nil || 96 (now >= (relayer.last_attempt! + Int64(relayer.retry_after))) { 97 print("attempt #\(relayer.attempts) to flush event '\(event.event.content)' to \(relayer.relay) after \(relayer.retry_after) seconds") 98 flush_event(event, to_relay: relayer) 99 } 100 } 101 } 102 } 103 104 func handle_event(relay_id: RelayURL, _ ev: NostrConnectionEvent) { 105 guard case .nostr_event(let resp) = ev else { 106 return 107 } 108 109 guard case .ok(let cr) = resp else { 110 return 111 } 112 113 remove_relayer(relay_id: relay_id, event_id: cr.event_id) 114 } 115 116 @discardableResult 117 func remove_relayer(relay_id: RelayURL, event_id: NoteId) -> Bool { 118 guard let ev = self.events[event_id] else { 119 return false 120 } 121 122 if let on_flush = ev.on_flush { 123 switch on_flush { 124 case .once(let cb): 125 if !ev.flushed_once { 126 ev.flushed_once = true 127 cb(ev) 128 } 129 case .all(let cb): 130 cb(ev) 131 } 132 } 133 134 let prev_count = ev.remaining.count 135 ev.remaining = ev.remaining.filter { $0.relay != relay_id } 136 let after_count = ev.remaining.count 137 if ev.remaining.count == 0 { 138 self.events.removeValue(forKey: event_id) 139 } 140 return prev_count != after_count 141 } 142 143 private func flush_event(_ event: PostedEvent, to_relay: Relayer? = nil) { 144 var relayers = event.remaining 145 if let to_relay { 146 relayers = [to_relay] 147 } 148 149 for relayer in relayers { 150 relayer.attempts += 1 151 relayer.last_attempt = Int64(Date().timeIntervalSince1970) 152 relayer.retry_after *= 1.5 153 if pool.get_relay(relayer.relay) != nil { 154 print("flushing event \(event.event.id) to \(relayer.relay)") 155 } else { 156 print("could not find relay when flushing: \(relayer.relay)") 157 } 158 pool.send(.event(event.event), to: [relayer.relay], skip_ephemeral: event.skip_ephemeral) 159 } 160 } 161 162 func send(_ event: NostrEvent, to: [RelayURL]? = nil, skip_ephemeral: Bool = true, delay: TimeInterval? = nil, on_flush: OnFlush? = nil) { 163 // Don't add event if we already have it 164 if events[event.id] != nil { 165 return 166 } 167 168 let remaining = to ?? pool.our_descriptors.map { $0.url } 169 let after = delay.map { d in Date.now.addingTimeInterval(d) } 170 let posted_ev = PostedEvent(event: event, remaining: remaining, skip_ephemeral: skip_ephemeral, flush_after: after, on_flush: on_flush) 171 172 events[event.id] = posted_ev 173 174 if after == nil { 175 flush_event(posted_ev) 176 } 177 } 178 } 179 180