damus

nostr ios client
git clone git://jb55.com/damus
Log | Files | Refs | README | LICENSE

RelayConnection.swift (7390B)


      1 //
      2 //  NostrConnection.swift
      3 //  damus
      4 //
      5 //  Created by William Casarin on 2022-04-02.
      6 //
      7 
      8 import Combine
      9 import Foundation
     10 
     11 enum NostrConnectionEvent {
     12     case ws_event(WebSocketEvent)
     13     case nostr_event(NostrResponse)
     14 }
     15 
     16 final class RelayConnection: ObservableObject {
     17     @Published private(set) var isConnected = false
     18     @Published private(set) var isConnecting = false
     19     private var isDisabled = false
     20     
     21     private(set) var last_connection_attempt: TimeInterval = 0
     22     private(set) var last_pong: Date? = nil
     23     private(set) var backoff: TimeInterval = 1.0
     24     private lazy var socket = WebSocket(relay_url.url)
     25     private var subscriptionToken: AnyCancellable?
     26 
     27     private var handleEvent: (NostrConnectionEvent) -> ()
     28     private var processEvent: (WebSocketEvent) -> ()
     29     private let relay_url: RelayURL
     30     var log: RelayLog?
     31 
     32     init(url: RelayURL,
     33          handleEvent: @escaping (NostrConnectionEvent) -> (),
     34          processEvent: @escaping (WebSocketEvent) -> ())
     35     {
     36         self.relay_url = url
     37         self.handleEvent = handleEvent
     38         self.processEvent = processEvent
     39     }
     40     
     41     func ping() {
     42         socket.ping { [weak self] err in
     43             guard let self else {
     44                 return
     45             }
     46             
     47             if err == nil {
     48                 self.last_pong = .now
     49                 self.log?.add("Successful ping")
     50             } else {
     51                 print("pong failed, reconnecting \(self.relay_url.id)")
     52                 self.isConnected = false
     53                 self.isConnecting = false
     54                 self.reconnect_with_backoff()
     55                 self.log?.add("Ping failed")
     56             }
     57         }
     58     }
     59     
     60     func connect(force: Bool = false) {
     61         if !force && (isConnected || isConnecting) {
     62             return
     63         }
     64         
     65         isConnecting = true
     66         last_connection_attempt = Date().timeIntervalSince1970
     67         
     68         subscriptionToken = socket.subject
     69             .receive(on: DispatchQueue.global(qos: .default))
     70             .sink { [weak self] completion in
     71                 switch completion {
     72                 case .failure(let error):
     73                     self?.receive(event: .error(error))
     74                 case .finished:
     75                     self?.receive(event: .disconnected(.normalClosure, nil))
     76                 }
     77             } receiveValue: { [weak self] event in
     78                 self?.receive(event: event)
     79             }
     80             
     81         socket.connect()
     82     }
     83 
     84     func disconnect() {
     85         socket.disconnect()
     86         subscriptionToken = nil
     87         
     88         isConnected = false
     89         isConnecting = false
     90     }
     91     
     92     func disablePermanently() {
     93         isDisabled = true
     94     }
     95     
     96     func send_raw(_ req: String) {
     97         socket.send(.string(req))
     98     }
     99     
    100     func send(_ req: NostrRequestType, callback: ((String) -> Void)? = nil) {
    101         switch req {
    102         case .typical(let req):
    103             guard let req = make_nostr_req(req) else {
    104                 print("failed to encode nostr req: \(req)")
    105                 return
    106             }
    107             send_raw(req)
    108             callback?(req)
    109             
    110         case .custom(let req):
    111             send_raw(req)
    112             callback?(req)
    113         }
    114     }
    115     
    116     private func receive(event: WebSocketEvent) {
    117         processEvent(event)
    118         switch event {
    119         case .connected:
    120             DispatchQueue.main.async {
    121                 self.backoff = 1.0
    122                 self.isConnected = true
    123                 self.isConnecting = false
    124             }
    125         case .message(let message):
    126             self.receive(message: message)
    127         case .disconnected(let closeCode, let reason):
    128             if closeCode != .normalClosure {
    129                 print("⚠️ Warning: RelayConnection (\(self.relay_url)) closed with code \(closeCode), reason: \(String(describing: reason))")
    130             }
    131             DispatchQueue.main.async {
    132                 self.isConnected = false
    133                 self.isConnecting = false
    134                 self.reconnect()
    135             }
    136         case .error(let error):
    137             print("⚠️ Warning: RelayConnection (\(self.relay_url)) error: \(error)")
    138             let nserr = error as NSError
    139             if nserr.domain == NSPOSIXErrorDomain && nserr.code == 57 {
    140                 // ignore socket not connected?
    141                 return
    142             }
    143             DispatchQueue.main.async {
    144                 self.isConnected = false
    145                 self.isConnecting = false
    146                 self.reconnect_with_backoff()
    147             }
    148         }
    149         DispatchQueue.main.async {
    150             self.handleEvent(.ws_event(event))
    151         }
    152         
    153         if let description = event.description {
    154             log?.add(description)
    155         }
    156     }
    157     
    158     func reconnect_with_backoff() {
    159         self.backoff *= 1.5
    160         self.reconnect_in(after: self.backoff)
    161     }
    162     
    163     func reconnect() {
    164         guard !isConnecting && !isDisabled else {
    165             return  // we're already trying to connect or we're disabled
    166         }
    167         disconnect()
    168         connect()
    169         log?.add("Reconnecting...")
    170     }
    171     
    172     func reconnect_in(after: TimeInterval) {
    173         DispatchQueue.main.asyncAfter(deadline: .now() + after) {
    174             self.reconnect()
    175         }
    176     }
    177     
    178     private func receive(message: URLSessionWebSocketTask.Message) {
    179         switch message {
    180         case .string(let messageString):
    181             if let ev = decode_nostr_event(txt: messageString) {
    182                 DispatchQueue.main.async {
    183                     self.handleEvent(.nostr_event(ev))
    184                 }
    185                 return
    186             }
    187             print("failed to decode event \(messageString)")
    188         case .data(let messageData):
    189             if let messageString = String(data: messageData, encoding: .utf8) {
    190                 receive(message: .string(messageString))
    191             }
    192         @unknown default:
    193             print("An unexpected URLSessionWebSocketTask.Message was received.")
    194         }
    195     }
    196 }
    197 
    198 func make_nostr_req(_ req: NostrRequest) -> String? {
    199     switch req {
    200     case .subscribe(let sub):
    201         return make_nostr_subscription_req(sub.filters, sub_id: sub.sub_id)
    202     case .unsubscribe(let sub_id):
    203         return make_nostr_unsubscribe_req(sub_id)
    204     case .event(let ev):
    205         return make_nostr_push_event(ev: ev)
    206     case .auth(let ev):
    207         return make_nostr_auth_event(ev: ev)
    208     }
    209 }
    210 
    211 func make_nostr_auth_event(ev: NostrEvent) -> String? {
    212     guard let event = encode_json(ev) else {
    213         return nil
    214     }
    215     let encoded = "[\"AUTH\",\(event)]"
    216     print(encoded)
    217     return encoded
    218 }
    219 
    220 func make_nostr_push_event(ev: NostrEvent) -> String? {
    221     guard let event = encode_json(ev) else {
    222         return nil
    223     }
    224     let encoded = "[\"EVENT\",\(event)]"
    225     print(encoded)
    226     return encoded
    227 }
    228 
    229 func make_nostr_unsubscribe_req(_ sub_id: String) -> String? {
    230     "[\"CLOSE\",\"\(sub_id)\"]"
    231 }
    232 
    233 func make_nostr_subscription_req(_ filters: [NostrFilter], sub_id: String) -> String? {
    234     let encoder = JSONEncoder()
    235     var req = "[\"REQ\",\"\(sub_id)\""
    236     for filter in filters {
    237         req += ","
    238         guard let filter_json = try? encoder.encode(filter) else {
    239             return nil
    240         }
    241         let filter_json_str = String(decoding: filter_json, as: UTF8.self)
    242         req += filter_json_str
    243     }
    244     req += "]"
    245     return req
    246 }