damus

nostr ios client
git clone git://jb55.com/damus
Log | Files | Refs | README | LICENSE

RelayConnection.swift (9245B)


      1 //
      2 //  NostrConnection.swift
      3 //  damus
      4 //
      5 //  Created by William Casarin on 2022-04-02.
      6 //
      7 
      8 import Combine
      9 import Foundation
     10 
     11 enum NostrConnectionEvent {
     12     /// Other non-message websocket events
     13     case ws_connection_event(WSConnectionEvent)
     14     /// A nostr response
     15     case nostr_event(NostrResponse)
     16     
     17     /// Models non-messaging websocket events
     18     ///
     19     /// Implementation note: Messaging events should use `.nostr_event` in `NostrConnectionEvent`
     20     enum WSConnectionEvent {
     21         case connected
     22         case disconnected(URLSessionWebSocketTask.CloseCode, String?)
     23         case error(Error)
     24         
     25         static func from(full_ws_event: WebSocketEvent) -> Self? {
     26             switch full_ws_event {
     27             case .connected:
     28                 return .connected
     29             case .message(_):
     30                 return nil
     31             case .disconnected(let closeCode, let string):
     32                 return .disconnected(closeCode, string)
     33             case .error(let error):
     34                 return .error(error)
     35             }
     36         }
     37     }
     38 }
     39 
     40 final class RelayConnection: ObservableObject {
     41     @Published private(set) var isConnected = false
     42     @Published private(set) var isConnecting = false
     43     private var isDisabled = false
     44     
     45     private(set) var last_connection_attempt: TimeInterval = 0
     46     private(set) var last_pong: Date? = nil
     47     private(set) var backoff: TimeInterval = 1.0
     48     private lazy var socket = WebSocket(relay_url.url)
     49     private var subscriptionToken: AnyCancellable?
     50 
     51     private var handleEvent: (NostrConnectionEvent) -> ()
     52     private var processEvent: (WebSocketEvent) -> ()
     53     private let relay_url: RelayURL
     54     var log: RelayLog?
     55 
     56     init(url: RelayURL,
     57          handleEvent: @escaping (NostrConnectionEvent) -> (),
     58          processUnverifiedWSEvent: @escaping (WebSocketEvent) -> ())
     59     {
     60         self.relay_url = url
     61         self.handleEvent = handleEvent
     62         self.processEvent = processUnverifiedWSEvent
     63     }
     64     
     65     func ping() {
     66         socket.ping { [weak self] err in
     67             guard let self else {
     68                 return
     69             }
     70             
     71             if err == nil {
     72                 self.last_pong = .now
     73                 Log.info("Got pong from '%s'", for: .networking, self.relay_url.absoluteString)
     74                 self.log?.add("Successful ping")
     75             } else {
     76                 Log.info("Ping failed, reconnecting to '%s'", for: .networking, self.relay_url.absoluteString)
     77                 self.isConnected = false
     78                 self.isConnecting = false
     79                 self.reconnect_with_backoff()
     80                 self.log?.add("Ping failed")
     81             }
     82         }
     83     }
     84     
     85     func connect(force: Bool = false) {
     86         if !force && (isConnected || isConnecting) {
     87             return
     88         }
     89         
     90         isConnecting = true
     91         last_connection_attempt = Date().timeIntervalSince1970
     92         
     93         subscriptionToken = socket.subject
     94             .receive(on: DispatchQueue.global(qos: .default))
     95             .sink { [weak self] completion in
     96                 switch completion {
     97                 case .failure(let error):
     98                     self?.receive(event: .error(error))
     99                 case .finished:
    100                     self?.receive(event: .disconnected(.normalClosure, nil))
    101                 }
    102             } receiveValue: { [weak self] event in
    103                 self?.receive(event: event)
    104             }
    105             
    106         socket.connect()
    107     }
    108 
    109     func disconnect() {
    110         socket.disconnect()
    111         subscriptionToken = nil
    112         
    113         isConnected = false
    114         isConnecting = false
    115     }
    116     
    117     func disablePermanently() {
    118         isDisabled = true
    119     }
    120     
    121     func send_raw(_ req: String) {
    122         socket.send(.string(req))
    123     }
    124     
    125     func send(_ req: NostrRequestType, callback: ((String) -> Void)? = nil) {
    126         switch req {
    127         case .typical(let req):
    128             guard let req = make_nostr_req(req) else {
    129                 print("failed to encode nostr req: \(req)")
    130                 return
    131             }
    132             send_raw(req)
    133             callback?(req)
    134             
    135         case .custom(let req):
    136             send_raw(req)
    137             callback?(req)
    138         }
    139     }
    140     
    141     private func receive(event: WebSocketEvent) {
    142         assert(!Thread.isMainThread, "This code must not be executed on the main thread")
    143         processEvent(event)
    144         switch event {
    145         case .connected:
    146             DispatchQueue.main.async {
    147                 self.backoff = 1.0
    148                 self.isConnected = true
    149                 self.isConnecting = false
    150             }
    151         case .message(let message):
    152             self.receive(message: message)
    153         case .disconnected(let closeCode, let reason):
    154             if closeCode != .normalClosure {
    155                 Log.error("⚠️ Warning: RelayConnection (%d) closed with code: %s", for: .networking, String(describing: closeCode), String(describing: reason))
    156             }
    157             DispatchQueue.main.async {
    158                 self.isConnected = false
    159                 self.isConnecting = false
    160                 self.reconnect()
    161             }
    162         case .error(let error):
    163             Log.error("⚠️ Warning: RelayConnection (%s) error: %s", for: .networking, self.relay_url.absoluteString, error.localizedDescription)
    164             let nserr = error as NSError
    165             if nserr.domain == NSPOSIXErrorDomain && nserr.code == 57 {
    166                 // ignore socket not connected?
    167                 return
    168             }
    169             if nserr.domain == NSURLErrorDomain && nserr.code == -999 {
    170                 // these aren't real error, it just means task was cancelled
    171                 return
    172             }
    173             DispatchQueue.main.async {
    174                 self.isConnected = false
    175                 self.isConnecting = false
    176                 self.reconnect_with_backoff()
    177             }
    178         }
    179         DispatchQueue.main.async {
    180             guard let ws_connection_event = NostrConnectionEvent.WSConnectionEvent.from(full_ws_event: event) else { return }
    181             self.handleEvent(.ws_connection_event(ws_connection_event))
    182         }
    183         
    184         if let description = event.description {
    185             log?.add(description)
    186         }
    187     }
    188     
    189     func reconnect_with_backoff() {
    190         self.backoff *= 2.0
    191         self.reconnect_in(after: self.backoff)
    192     }
    193     
    194     func reconnect() {
    195         guard !isConnecting && !isDisabled else {
    196             self.log?.add("Cancelling reconnect, already connecting")
    197             return  // we're already trying to connect or we're disabled
    198         }
    199 
    200         guard !self.isConnected else {
    201             self.log?.add("Cancelling reconnect, already connected")
    202             return
    203         }
    204 
    205         disconnect()
    206         connect()
    207         log?.add("Reconnecting...")
    208     }
    209     
    210     func reconnect_in(after: TimeInterval) {
    211         DispatchQueue.main.asyncAfter(deadline: .now() + after) {
    212             self.reconnect()
    213         }
    214     }
    215     
    216     private func receive(message: URLSessionWebSocketTask.Message) {
    217         switch message {
    218         case .string(let messageString):
    219             // NOTE: Once we switch to the local relay model,
    220             // we will not need to verify nostr events at this point.
    221             if let ev = decode_and_verify_nostr_response(txt: messageString) {
    222                 DispatchQueue.main.async {
    223                     self.handleEvent(.nostr_event(ev))
    224                 }
    225                 return
    226             }
    227             print("failed to decode event \(messageString)")
    228         case .data(let messageData):
    229             if let messageString = String(data: messageData, encoding: .utf8) {
    230                 receive(message: .string(messageString))
    231             }
    232         @unknown default:
    233             print("An unexpected URLSessionWebSocketTask.Message was received.")
    234         }
    235     }
    236 }
    237 
    238 func make_nostr_req(_ req: NostrRequest) -> String? {
    239     switch req {
    240     case .subscribe(let sub):
    241         return make_nostr_subscription_req(sub.filters, sub_id: sub.sub_id)
    242     case .unsubscribe(let sub_id):
    243         return make_nostr_unsubscribe_req(sub_id)
    244     case .event(let ev):
    245         return make_nostr_push_event(ev: ev)
    246     case .auth(let ev):
    247         return make_nostr_auth_event(ev: ev)
    248     }
    249 }
    250 
    251 func make_nostr_auth_event(ev: NostrEvent) -> String? {
    252     guard let event = encode_json(ev) else {
    253         return nil
    254     }
    255     let encoded = "[\"AUTH\",\(event)]"
    256     print(encoded)
    257     return encoded
    258 }
    259 
    260 func make_nostr_push_event(ev: NostrEvent) -> String? {
    261     guard let event = encode_json(ev) else {
    262         return nil
    263     }
    264     let encoded = "[\"EVENT\",\(event)]"
    265     print(encoded)
    266     return encoded
    267 }
    268 
    269 func make_nostr_unsubscribe_req(_ sub_id: String) -> String? {
    270     "[\"CLOSE\",\"\(sub_id)\"]"
    271 }
    272 
    273 func make_nostr_subscription_req(_ filters: [NostrFilter], sub_id: String) -> String? {
    274     let encoder = JSONEncoder()
    275     var req = "[\"REQ\",\"\(sub_id)\""
    276     for filter in filters {
    277         req += ","
    278         guard let filter_json = try? encoder.encode(filter) else {
    279             return nil
    280         }
    281         let filter_json_str = String(decoding: filter_json, as: UTF8.self)
    282         req += filter_json_str
    283     }
    284     req += "]"
    285     return req
    286 }