RelayConnection.swift (7998B)
1 // 2 // NostrConnection.swift 3 // damus 4 // 5 // Created by William Casarin on 2022-04-02. 6 // 7 8 import Combine 9 import Foundation 10 11 enum NostrConnectionEvent { 12 case ws_event(WebSocketEvent) 13 case nostr_event(NostrResponse) 14 } 15 16 final class RelayConnection: ObservableObject { 17 @Published private(set) var isConnected = false 18 @Published private(set) var isConnecting = false 19 private var isDisabled = false 20 21 private(set) var last_connection_attempt: TimeInterval = 0 22 private(set) var last_pong: Date? = nil 23 private(set) var backoff: TimeInterval = 1.0 24 private lazy var socket = WebSocket(relay_url.url) 25 private var subscriptionToken: AnyCancellable? 26 27 private var handleEvent: (NostrConnectionEvent) -> () 28 private var processEvent: (WebSocketEvent) -> () 29 private let relay_url: RelayURL 30 var log: RelayLog? 31 32 init(url: RelayURL, 33 handleEvent: @escaping (NostrConnectionEvent) -> (), 34 processEvent: @escaping (WebSocketEvent) -> ()) 35 { 36 self.relay_url = url 37 self.handleEvent = handleEvent 38 self.processEvent = processEvent 39 } 40 41 func ping() { 42 socket.ping { [weak self] err in 43 guard let self else { 44 return 45 } 46 47 if err == nil { 48 self.last_pong = .now 49 Log.info("Got pong from '%s'", for: .networking, self.relay_url.absoluteString) 50 self.log?.add("Successful ping") 51 } else { 52 Log.info("Ping failed, reconnecting to '%s'", for: .networking, self.relay_url.absoluteString) 53 self.isConnected = false 54 self.isConnecting = false 55 self.reconnect_with_backoff() 56 self.log?.add("Ping failed") 57 } 58 } 59 } 60 61 func connect(force: Bool = false) { 62 if !force && (isConnected || isConnecting) { 63 return 64 } 65 66 isConnecting = true 67 last_connection_attempt = Date().timeIntervalSince1970 68 69 subscriptionToken = socket.subject 70 .receive(on: DispatchQueue.global(qos: .default)) 71 .sink { [weak self] completion in 72 switch completion { 73 case .failure(let error): 74 self?.receive(event: .error(error)) 75 case .finished: 76 self?.receive(event: .disconnected(.normalClosure, nil)) 77 } 78 } receiveValue: { [weak self] event in 79 self?.receive(event: event) 80 } 81 82 socket.connect() 83 } 84 85 func disconnect() { 86 socket.disconnect() 87 subscriptionToken = nil 88 89 isConnected = false 90 isConnecting = false 91 } 92 93 func disablePermanently() { 94 isDisabled = true 95 } 96 97 func send_raw(_ req: String) { 98 socket.send(.string(req)) 99 } 100 101 func send(_ req: NostrRequestType, callback: ((String) -> Void)? = nil) { 102 switch req { 103 case .typical(let req): 104 guard let req = make_nostr_req(req) else { 105 print("failed to encode nostr req: \(req)") 106 return 107 } 108 send_raw(req) 109 callback?(req) 110 111 case .custom(let req): 112 send_raw(req) 113 callback?(req) 114 } 115 } 116 117 private func receive(event: WebSocketEvent) { 118 processEvent(event) 119 switch event { 120 case .connected: 121 DispatchQueue.main.async { 122 self.backoff = 1.0 123 self.isConnected = true 124 self.isConnecting = false 125 } 126 case .message(let message): 127 self.receive(message: message) 128 case .disconnected(let closeCode, let reason): 129 if closeCode != .normalClosure { 130 Log.error("⚠️ Warning: RelayConnection (%d) closed with code: %s", for: .networking, String(describing: closeCode), String(describing: reason)) 131 } 132 DispatchQueue.main.async { 133 self.isConnected = false 134 self.isConnecting = false 135 self.reconnect() 136 } 137 case .error(let error): 138 Log.error("⚠️ Warning: RelayConnection (%s) error: %s", for: .networking, self.relay_url.absoluteString, error.localizedDescription) 139 let nserr = error as NSError 140 if nserr.domain == NSPOSIXErrorDomain && nserr.code == 57 { 141 // ignore socket not connected? 142 return 143 } 144 if nserr.domain == NSURLErrorDomain && nserr.code == -999 { 145 // these aren't real error, it just means task was cancelled 146 return 147 } 148 DispatchQueue.main.async { 149 self.isConnected = false 150 self.isConnecting = false 151 self.reconnect_with_backoff() 152 } 153 } 154 DispatchQueue.main.async { 155 self.handleEvent(.ws_event(event)) 156 } 157 158 if let description = event.description { 159 log?.add(description) 160 } 161 } 162 163 func reconnect_with_backoff() { 164 self.backoff *= 2.0 165 self.reconnect_in(after: self.backoff) 166 } 167 168 func reconnect() { 169 guard !isConnecting && !isDisabled else { 170 self.log?.add("Cancelling reconnect, already connecting") 171 return // we're already trying to connect or we're disabled 172 } 173 174 guard !self.isConnected else { 175 self.log?.add("Cancelling reconnect, already connected") 176 return 177 } 178 179 disconnect() 180 connect() 181 log?.add("Reconnecting...") 182 } 183 184 func reconnect_in(after: TimeInterval) { 185 DispatchQueue.main.asyncAfter(deadline: .now() + after) { 186 self.reconnect() 187 } 188 } 189 190 private func receive(message: URLSessionWebSocketTask.Message) { 191 switch message { 192 case .string(let messageString): 193 if let ev = decode_nostr_event(txt: messageString) { 194 DispatchQueue.main.async { 195 self.handleEvent(.nostr_event(ev)) 196 } 197 return 198 } 199 print("failed to decode event \(messageString)") 200 case .data(let messageData): 201 if let messageString = String(data: messageData, encoding: .utf8) { 202 receive(message: .string(messageString)) 203 } 204 @unknown default: 205 print("An unexpected URLSessionWebSocketTask.Message was received.") 206 } 207 } 208 } 209 210 func make_nostr_req(_ req: NostrRequest) -> String? { 211 switch req { 212 case .subscribe(let sub): 213 return make_nostr_subscription_req(sub.filters, sub_id: sub.sub_id) 214 case .unsubscribe(let sub_id): 215 return make_nostr_unsubscribe_req(sub_id) 216 case .event(let ev): 217 return make_nostr_push_event(ev: ev) 218 case .auth(let ev): 219 return make_nostr_auth_event(ev: ev) 220 } 221 } 222 223 func make_nostr_auth_event(ev: NostrEvent) -> String? { 224 guard let event = encode_json(ev) else { 225 return nil 226 } 227 let encoded = "[\"AUTH\",\(event)]" 228 print(encoded) 229 return encoded 230 } 231 232 func make_nostr_push_event(ev: NostrEvent) -> String? { 233 guard let event = encode_json(ev) else { 234 return nil 235 } 236 let encoded = "[\"EVENT\",\(event)]" 237 print(encoded) 238 return encoded 239 } 240 241 func make_nostr_unsubscribe_req(_ sub_id: String) -> String? { 242 "[\"CLOSE\",\"\(sub_id)\"]" 243 } 244 245 func make_nostr_subscription_req(_ filters: [NostrFilter], sub_id: String) -> String? { 246 let encoder = JSONEncoder() 247 var req = "[\"REQ\",\"\(sub_id)\"" 248 for filter in filters { 249 req += "," 250 guard let filter_json = try? encoder.encode(filter) else { 251 return nil 252 } 253 let filter_json_str = String(decoding: filter_json, as: UTF8.self) 254 req += filter_json_str 255 } 256 req += "]" 257 return req 258 }