RelayConnection.swift (9245B)
1 // 2 // NostrConnection.swift 3 // damus 4 // 5 // Created by William Casarin on 2022-04-02. 6 // 7 8 import Combine 9 import Foundation 10 11 enum NostrConnectionEvent { 12 /// Other non-message websocket events 13 case ws_connection_event(WSConnectionEvent) 14 /// A nostr response 15 case nostr_event(NostrResponse) 16 17 /// Models non-messaging websocket events 18 /// 19 /// Implementation note: Messaging events should use `.nostr_event` in `NostrConnectionEvent` 20 enum WSConnectionEvent { 21 case connected 22 case disconnected(URLSessionWebSocketTask.CloseCode, String?) 23 case error(Error) 24 25 static func from(full_ws_event: WebSocketEvent) -> Self? { 26 switch full_ws_event { 27 case .connected: 28 return .connected 29 case .message(_): 30 return nil 31 case .disconnected(let closeCode, let string): 32 return .disconnected(closeCode, string) 33 case .error(let error): 34 return .error(error) 35 } 36 } 37 } 38 } 39 40 final class RelayConnection: ObservableObject { 41 @Published private(set) var isConnected = false 42 @Published private(set) var isConnecting = false 43 private var isDisabled = false 44 45 private(set) var last_connection_attempt: TimeInterval = 0 46 private(set) var last_pong: Date? = nil 47 private(set) var backoff: TimeInterval = 1.0 48 private lazy var socket = WebSocket(relay_url.url) 49 private var subscriptionToken: AnyCancellable? 50 51 private var handleEvent: (NostrConnectionEvent) -> () 52 private var processEvent: (WebSocketEvent) -> () 53 private let relay_url: RelayURL 54 var log: RelayLog? 55 56 init(url: RelayURL, 57 handleEvent: @escaping (NostrConnectionEvent) -> (), 58 processUnverifiedWSEvent: @escaping (WebSocketEvent) -> ()) 59 { 60 self.relay_url = url 61 self.handleEvent = handleEvent 62 self.processEvent = processUnverifiedWSEvent 63 } 64 65 func ping() { 66 socket.ping { [weak self] err in 67 guard let self else { 68 return 69 } 70 71 if err == nil { 72 self.last_pong = .now 73 Log.info("Got pong from '%s'", for: .networking, self.relay_url.absoluteString) 74 self.log?.add("Successful ping") 75 } else { 76 Log.info("Ping failed, reconnecting to '%s'", for: .networking, self.relay_url.absoluteString) 77 self.isConnected = false 78 self.isConnecting = false 79 self.reconnect_with_backoff() 80 self.log?.add("Ping failed") 81 } 82 } 83 } 84 85 func connect(force: Bool = false) { 86 if !force && (isConnected || isConnecting) { 87 return 88 } 89 90 isConnecting = true 91 last_connection_attempt = Date().timeIntervalSince1970 92 93 subscriptionToken = socket.subject 94 .receive(on: DispatchQueue.global(qos: .default)) 95 .sink { [weak self] completion in 96 switch completion { 97 case .failure(let error): 98 self?.receive(event: .error(error)) 99 case .finished: 100 self?.receive(event: .disconnected(.normalClosure, nil)) 101 } 102 } receiveValue: { [weak self] event in 103 self?.receive(event: event) 104 } 105 106 socket.connect() 107 } 108 109 func disconnect() { 110 socket.disconnect() 111 subscriptionToken = nil 112 113 isConnected = false 114 isConnecting = false 115 } 116 117 func disablePermanently() { 118 isDisabled = true 119 } 120 121 func send_raw(_ req: String) { 122 socket.send(.string(req)) 123 } 124 125 func send(_ req: NostrRequestType, callback: ((String) -> Void)? = nil) { 126 switch req { 127 case .typical(let req): 128 guard let req = make_nostr_req(req) else { 129 print("failed to encode nostr req: \(req)") 130 return 131 } 132 send_raw(req) 133 callback?(req) 134 135 case .custom(let req): 136 send_raw(req) 137 callback?(req) 138 } 139 } 140 141 private func receive(event: WebSocketEvent) { 142 assert(!Thread.isMainThread, "This code must not be executed on the main thread") 143 processEvent(event) 144 switch event { 145 case .connected: 146 DispatchQueue.main.async { 147 self.backoff = 1.0 148 self.isConnected = true 149 self.isConnecting = false 150 } 151 case .message(let message): 152 self.receive(message: message) 153 case .disconnected(let closeCode, let reason): 154 if closeCode != .normalClosure { 155 Log.error("⚠️ Warning: RelayConnection (%d) closed with code: %s", for: .networking, String(describing: closeCode), String(describing: reason)) 156 } 157 DispatchQueue.main.async { 158 self.isConnected = false 159 self.isConnecting = false 160 self.reconnect() 161 } 162 case .error(let error): 163 Log.error("⚠️ Warning: RelayConnection (%s) error: %s", for: .networking, self.relay_url.absoluteString, error.localizedDescription) 164 let nserr = error as NSError 165 if nserr.domain == NSPOSIXErrorDomain && nserr.code == 57 { 166 // ignore socket not connected? 167 return 168 } 169 if nserr.domain == NSURLErrorDomain && nserr.code == -999 { 170 // these aren't real error, it just means task was cancelled 171 return 172 } 173 DispatchQueue.main.async { 174 self.isConnected = false 175 self.isConnecting = false 176 self.reconnect_with_backoff() 177 } 178 } 179 DispatchQueue.main.async { 180 guard let ws_connection_event = NostrConnectionEvent.WSConnectionEvent.from(full_ws_event: event) else { return } 181 self.handleEvent(.ws_connection_event(ws_connection_event)) 182 } 183 184 if let description = event.description { 185 log?.add(description) 186 } 187 } 188 189 func reconnect_with_backoff() { 190 self.backoff *= 2.0 191 self.reconnect_in(after: self.backoff) 192 } 193 194 func reconnect() { 195 guard !isConnecting && !isDisabled else { 196 self.log?.add("Cancelling reconnect, already connecting") 197 return // we're already trying to connect or we're disabled 198 } 199 200 guard !self.isConnected else { 201 self.log?.add("Cancelling reconnect, already connected") 202 return 203 } 204 205 disconnect() 206 connect() 207 log?.add("Reconnecting...") 208 } 209 210 func reconnect_in(after: TimeInterval) { 211 DispatchQueue.main.asyncAfter(deadline: .now() + after) { 212 self.reconnect() 213 } 214 } 215 216 private func receive(message: URLSessionWebSocketTask.Message) { 217 switch message { 218 case .string(let messageString): 219 // NOTE: Once we switch to the local relay model, 220 // we will not need to verify nostr events at this point. 221 if let ev = decode_and_verify_nostr_response(txt: messageString) { 222 DispatchQueue.main.async { 223 self.handleEvent(.nostr_event(ev)) 224 } 225 return 226 } 227 print("failed to decode event \(messageString)") 228 case .data(let messageData): 229 if let messageString = String(data: messageData, encoding: .utf8) { 230 receive(message: .string(messageString)) 231 } 232 @unknown default: 233 print("An unexpected URLSessionWebSocketTask.Message was received.") 234 } 235 } 236 } 237 238 func make_nostr_req(_ req: NostrRequest) -> String? { 239 switch req { 240 case .subscribe(let sub): 241 return make_nostr_subscription_req(sub.filters, sub_id: sub.sub_id) 242 case .unsubscribe(let sub_id): 243 return make_nostr_unsubscribe_req(sub_id) 244 case .event(let ev): 245 return make_nostr_push_event(ev: ev) 246 case .auth(let ev): 247 return make_nostr_auth_event(ev: ev) 248 } 249 } 250 251 func make_nostr_auth_event(ev: NostrEvent) -> String? { 252 guard let event = encode_json(ev) else { 253 return nil 254 } 255 let encoded = "[\"AUTH\",\(event)]" 256 print(encoded) 257 return encoded 258 } 259 260 func make_nostr_push_event(ev: NostrEvent) -> String? { 261 guard let event = encode_json(ev) else { 262 return nil 263 } 264 let encoded = "[\"EVENT\",\(event)]" 265 print(encoded) 266 return encoded 267 } 268 269 func make_nostr_unsubscribe_req(_ sub_id: String) -> String? { 270 "[\"CLOSE\",\"\(sub_id)\"]" 271 } 272 273 func make_nostr_subscription_req(_ filters: [NostrFilter], sub_id: String) -> String? { 274 let encoder = JSONEncoder() 275 var req = "[\"REQ\",\"\(sub_id)\"" 276 for filter in filters { 277 req += "," 278 guard let filter_json = try? encoder.encode(filter) else { 279 return nil 280 } 281 let filter_json_str = String(decoding: filter_json, as: UTF8.self) 282 req += filter_json_str 283 } 284 req += "]" 285 return req 286 }