damus

nostr ios client
git clone git://jb55.com/damus
Log | Files | Refs | README | LICENSE

PostBox.swift (5246B)


      1 //
      2 //  PostBox.swift
      3 //  damus
      4 //
      5 //  Created by William Casarin on 2023-03-20.
      6 //
      7 
      8 import Foundation
      9 
     10 
     11 class Relayer {
     12     let relay: RelayURL
     13     var attempts: Int
     14     var retry_after: Double
     15     var last_attempt: Int64?
     16 
     17     init(relay: RelayURL, attempts: Int, retry_after: Double) {
     18         self.relay = relay
     19         self.attempts = attempts
     20         self.retry_after = retry_after
     21         self.last_attempt = nil
     22     }
     23 }
     24 
     25 enum OnFlush {
     26     case once((PostedEvent) -> Void)
     27     case all((PostedEvent) -> Void)
     28 }
     29 
     30 class PostedEvent {
     31     let event: NostrEvent
     32     let skip_ephemeral: Bool
     33     var remaining: [Relayer]
     34     let flush_after: Date?
     35     var flushed_once: Bool
     36     let on_flush: OnFlush?
     37 
     38     init(event: NostrEvent, remaining: [RelayURL], skip_ephemeral: Bool, flush_after: Date?, on_flush: OnFlush?) {
     39         self.event = event
     40         self.skip_ephemeral = skip_ephemeral
     41         self.flush_after = flush_after
     42         self.on_flush = on_flush
     43         self.flushed_once = false
     44         self.remaining = remaining.map {
     45             Relayer(relay: $0, attempts: 0, retry_after: 10.0)
     46         }
     47     }
     48 }
     49 
     50 enum CancelSendErr {
     51     case nothing_to_cancel
     52     case not_delayed
     53     case too_late
     54 }
     55 
     56 class PostBox {
     57     let pool: RelayPool
     58     var events: [NoteId: PostedEvent]
     59 
     60     init(pool: RelayPool) {
     61         self.pool = pool
     62         self.events = [:]
     63         pool.register_handler(sub_id: "postbox", handler: handle_event)
     64     }
     65     
     66     // only works reliably on delay-sent events
     67     func cancel_send(evid: NoteId) -> CancelSendErr? {
     68         guard let ev = events[evid] else {
     69             return .nothing_to_cancel
     70         }
     71         
     72         guard let after = ev.flush_after else {
     73             return .not_delayed
     74         }
     75         
     76         guard Date.now < after else {
     77             return .too_late
     78         }
     79         
     80         events.removeValue(forKey: evid)
     81         return nil
     82     }
     83     
     84     func try_flushing_events() {
     85         let now = Int64(Date().timeIntervalSince1970)
     86         for kv in events {
     87             let event = kv.value
     88             
     89             // some are delayed
     90             if let after = event.flush_after, Date.now.timeIntervalSince1970 < after.timeIntervalSince1970 {
     91                 continue
     92             }
     93             
     94             for relayer in event.remaining {
     95                 if relayer.last_attempt == nil ||
     96                    (now >= (relayer.last_attempt! + Int64(relayer.retry_after))) {
     97                     print("attempt #\(relayer.attempts) to flush event '\(event.event.content)' to \(relayer.relay) after \(relayer.retry_after) seconds")
     98                     flush_event(event, to_relay: relayer)
     99                 }
    100             }
    101         }
    102     }
    103 
    104     func handle_event(relay_id: RelayURL, _ ev: NostrConnectionEvent) {
    105         guard case .nostr_event(let resp) = ev else {
    106             return
    107         }
    108         
    109         guard case .ok(let cr) = resp else {
    110             return
    111         }
    112         
    113         remove_relayer(relay_id: relay_id, event_id: cr.event_id)
    114     }
    115 
    116     @discardableResult
    117     func remove_relayer(relay_id: RelayURL, event_id: NoteId) -> Bool {
    118         guard let ev = self.events[event_id] else {
    119             return false
    120         }
    121         
    122         if let on_flush = ev.on_flush {
    123             switch on_flush {
    124             case .once(let cb):
    125                 if !ev.flushed_once {
    126                     ev.flushed_once = true
    127                     cb(ev)
    128                 }
    129             case .all(let cb):
    130                 cb(ev)
    131             }
    132         }
    133         
    134         let prev_count = ev.remaining.count
    135         ev.remaining = ev.remaining.filter { $0.relay != relay_id }
    136         let after_count = ev.remaining.count
    137         if ev.remaining.count == 0 {
    138             self.events.removeValue(forKey: event_id)
    139         }
    140         return prev_count != after_count
    141     }
    142     
    143     private func flush_event(_ event: PostedEvent, to_relay: Relayer? = nil) {
    144         var relayers = event.remaining
    145         if let to_relay {
    146             relayers = [to_relay]
    147         }
    148         
    149         for relayer in relayers {
    150             relayer.attempts += 1
    151             relayer.last_attempt = Int64(Date().timeIntervalSince1970)
    152             relayer.retry_after *= 1.5
    153             if pool.get_relay(relayer.relay) != nil {
    154                 print("flushing event \(event.event.id) to \(relayer.relay)")
    155             } else {
    156                 print("could not find relay when flushing: \(relayer.relay)")
    157             }
    158             pool.send(.event(event.event), to: [relayer.relay], skip_ephemeral: event.skip_ephemeral)
    159         }
    160     }
    161 
    162     func send(_ event: NostrEvent, to: [RelayURL]? = nil, skip_ephemeral: Bool = true, delay: TimeInterval? = nil, on_flush: OnFlush? = nil) {
    163         // Don't add event if we already have it
    164         if events[event.id] != nil {
    165             return
    166         }
    167 
    168         let remaining = to ?? pool.our_descriptors.map { $0.url }
    169         let after = delay.map { d in Date.now.addingTimeInterval(d) }
    170         let posted_ev = PostedEvent(event: event, remaining: remaining, skip_ephemeral: skip_ephemeral, flush_after: after, on_flush: on_flush)
    171 
    172         events[event.id] = posted_ev
    173         
    174         if after == nil {
    175             flush_event(posted_ev)
    176         }
    177     }
    178 }
    179 
    180