damus

nostr ios client
git clone git://jb55.com/damus
Log | Files | Refs | README | LICENSE

RelayConnection.swift (7998B)


      1 //
      2 //  NostrConnection.swift
      3 //  damus
      4 //
      5 //  Created by William Casarin on 2022-04-02.
      6 //
      7 
      8 import Combine
      9 import Foundation
     10 
     11 enum NostrConnectionEvent {
     12     case ws_event(WebSocketEvent)
     13     case nostr_event(NostrResponse)
     14 }
     15 
     16 final class RelayConnection: ObservableObject {
     17     @Published private(set) var isConnected = false
     18     @Published private(set) var isConnecting = false
     19     private var isDisabled = false
     20     
     21     private(set) var last_connection_attempt: TimeInterval = 0
     22     private(set) var last_pong: Date? = nil
     23     private(set) var backoff: TimeInterval = 1.0
     24     private lazy var socket = WebSocket(relay_url.url)
     25     private var subscriptionToken: AnyCancellable?
     26 
     27     private var handleEvent: (NostrConnectionEvent) -> ()
     28     private var processEvent: (WebSocketEvent) -> ()
     29     private let relay_url: RelayURL
     30     var log: RelayLog?
     31 
     32     init(url: RelayURL,
     33          handleEvent: @escaping (NostrConnectionEvent) -> (),
     34          processEvent: @escaping (WebSocketEvent) -> ())
     35     {
     36         self.relay_url = url
     37         self.handleEvent = handleEvent
     38         self.processEvent = processEvent
     39     }
     40     
     41     func ping() {
     42         socket.ping { [weak self] err in
     43             guard let self else {
     44                 return
     45             }
     46             
     47             if err == nil {
     48                 self.last_pong = .now
     49                 Log.info("Got pong from '%s'", for: .networking, self.relay_url.absoluteString)
     50                 self.log?.add("Successful ping")
     51             } else {
     52                 Log.info("Ping failed, reconnecting to '%s'", for: .networking, self.relay_url.absoluteString)
     53                 self.isConnected = false
     54                 self.isConnecting = false
     55                 self.reconnect_with_backoff()
     56                 self.log?.add("Ping failed")
     57             }
     58         }
     59     }
     60     
     61     func connect(force: Bool = false) {
     62         if !force && (isConnected || isConnecting) {
     63             return
     64         }
     65         
     66         isConnecting = true
     67         last_connection_attempt = Date().timeIntervalSince1970
     68         
     69         subscriptionToken = socket.subject
     70             .receive(on: DispatchQueue.global(qos: .default))
     71             .sink { [weak self] completion in
     72                 switch completion {
     73                 case .failure(let error):
     74                     self?.receive(event: .error(error))
     75                 case .finished:
     76                     self?.receive(event: .disconnected(.normalClosure, nil))
     77                 }
     78             } receiveValue: { [weak self] event in
     79                 self?.receive(event: event)
     80             }
     81             
     82         socket.connect()
     83     }
     84 
     85     func disconnect() {
     86         socket.disconnect()
     87         subscriptionToken = nil
     88         
     89         isConnected = false
     90         isConnecting = false
     91     }
     92     
     93     func disablePermanently() {
     94         isDisabled = true
     95     }
     96     
     97     func send_raw(_ req: String) {
     98         socket.send(.string(req))
     99     }
    100     
    101     func send(_ req: NostrRequestType, callback: ((String) -> Void)? = nil) {
    102         switch req {
    103         case .typical(let req):
    104             guard let req = make_nostr_req(req) else {
    105                 print("failed to encode nostr req: \(req)")
    106                 return
    107             }
    108             send_raw(req)
    109             callback?(req)
    110             
    111         case .custom(let req):
    112             send_raw(req)
    113             callback?(req)
    114         }
    115     }
    116     
    117     private func receive(event: WebSocketEvent) {
    118         processEvent(event)
    119         switch event {
    120         case .connected:
    121             DispatchQueue.main.async {
    122                 self.backoff = 1.0
    123                 self.isConnected = true
    124                 self.isConnecting = false
    125             }
    126         case .message(let message):
    127             self.receive(message: message)
    128         case .disconnected(let closeCode, let reason):
    129             if closeCode != .normalClosure {
    130                 Log.error("⚠️ Warning: RelayConnection (%d) closed with code: %s", for: .networking, String(describing: closeCode), String(describing: reason))
    131             }
    132             DispatchQueue.main.async {
    133                 self.isConnected = false
    134                 self.isConnecting = false
    135                 self.reconnect()
    136             }
    137         case .error(let error):
    138             Log.error("⚠️ Warning: RelayConnection (%s) error: %s", for: .networking, self.relay_url.absoluteString, error.localizedDescription)
    139             let nserr = error as NSError
    140             if nserr.domain == NSPOSIXErrorDomain && nserr.code == 57 {
    141                 // ignore socket not connected?
    142                 return
    143             }
    144             if nserr.domain == NSURLErrorDomain && nserr.code == -999 {
    145                 // these aren't real error, it just means task was cancelled
    146                 return
    147             }
    148             DispatchQueue.main.async {
    149                 self.isConnected = false
    150                 self.isConnecting = false
    151                 self.reconnect_with_backoff()
    152             }
    153         }
    154         DispatchQueue.main.async {
    155             self.handleEvent(.ws_event(event))
    156         }
    157         
    158         if let description = event.description {
    159             log?.add(description)
    160         }
    161     }
    162     
    163     func reconnect_with_backoff() {
    164         self.backoff *= 2.0
    165         self.reconnect_in(after: self.backoff)
    166     }
    167     
    168     func reconnect() {
    169         guard !isConnecting && !isDisabled else {
    170             self.log?.add("Cancelling reconnect, already connecting")
    171             return  // we're already trying to connect or we're disabled
    172         }
    173 
    174         guard !self.isConnected else {
    175             self.log?.add("Cancelling reconnect, already connected")
    176             return
    177         }
    178 
    179         disconnect()
    180         connect()
    181         log?.add("Reconnecting...")
    182     }
    183     
    184     func reconnect_in(after: TimeInterval) {
    185         DispatchQueue.main.asyncAfter(deadline: .now() + after) {
    186             self.reconnect()
    187         }
    188     }
    189     
    190     private func receive(message: URLSessionWebSocketTask.Message) {
    191         switch message {
    192         case .string(let messageString):
    193             if let ev = decode_nostr_event(txt: messageString) {
    194                 DispatchQueue.main.async {
    195                     self.handleEvent(.nostr_event(ev))
    196                 }
    197                 return
    198             }
    199             print("failed to decode event \(messageString)")
    200         case .data(let messageData):
    201             if let messageString = String(data: messageData, encoding: .utf8) {
    202                 receive(message: .string(messageString))
    203             }
    204         @unknown default:
    205             print("An unexpected URLSessionWebSocketTask.Message was received.")
    206         }
    207     }
    208 }
    209 
    210 func make_nostr_req(_ req: NostrRequest) -> String? {
    211     switch req {
    212     case .subscribe(let sub):
    213         return make_nostr_subscription_req(sub.filters, sub_id: sub.sub_id)
    214     case .unsubscribe(let sub_id):
    215         return make_nostr_unsubscribe_req(sub_id)
    216     case .event(let ev):
    217         return make_nostr_push_event(ev: ev)
    218     case .auth(let ev):
    219         return make_nostr_auth_event(ev: ev)
    220     }
    221 }
    222 
    223 func make_nostr_auth_event(ev: NostrEvent) -> String? {
    224     guard let event = encode_json(ev) else {
    225         return nil
    226     }
    227     let encoded = "[\"AUTH\",\(event)]"
    228     print(encoded)
    229     return encoded
    230 }
    231 
    232 func make_nostr_push_event(ev: NostrEvent) -> String? {
    233     guard let event = encode_json(ev) else {
    234         return nil
    235     }
    236     let encoded = "[\"EVENT\",\(event)]"
    237     print(encoded)
    238     return encoded
    239 }
    240 
    241 func make_nostr_unsubscribe_req(_ sub_id: String) -> String? {
    242     "[\"CLOSE\",\"\(sub_id)\"]"
    243 }
    244 
    245 func make_nostr_subscription_req(_ filters: [NostrFilter], sub_id: String) -> String? {
    246     let encoder = JSONEncoder()
    247     var req = "[\"REQ\",\"\(sub_id)\""
    248     for filter in filters {
    249         req += ","
    250         guard let filter_json = try? encoder.encode(filter) else {
    251             return nil
    252         }
    253         let filter_json_str = String(decoding: filter_json, as: UTF8.self)
    254         req += filter_json_str
    255     }
    256     req += "]"
    257     return req
    258 }