RelayConnection.swift (7390B)
1 // 2 // NostrConnection.swift 3 // damus 4 // 5 // Created by William Casarin on 2022-04-02. 6 // 7 8 import Combine 9 import Foundation 10 11 enum NostrConnectionEvent { 12 case ws_event(WebSocketEvent) 13 case nostr_event(NostrResponse) 14 } 15 16 final class RelayConnection: ObservableObject { 17 @Published private(set) var isConnected = false 18 @Published private(set) var isConnecting = false 19 private var isDisabled = false 20 21 private(set) var last_connection_attempt: TimeInterval = 0 22 private(set) var last_pong: Date? = nil 23 private(set) var backoff: TimeInterval = 1.0 24 private lazy var socket = WebSocket(relay_url.url) 25 private var subscriptionToken: AnyCancellable? 26 27 private var handleEvent: (NostrConnectionEvent) -> () 28 private var processEvent: (WebSocketEvent) -> () 29 private let relay_url: RelayURL 30 var log: RelayLog? 31 32 init(url: RelayURL, 33 handleEvent: @escaping (NostrConnectionEvent) -> (), 34 processEvent: @escaping (WebSocketEvent) -> ()) 35 { 36 self.relay_url = url 37 self.handleEvent = handleEvent 38 self.processEvent = processEvent 39 } 40 41 func ping() { 42 socket.ping { [weak self] err in 43 guard let self else { 44 return 45 } 46 47 if err == nil { 48 self.last_pong = .now 49 self.log?.add("Successful ping") 50 } else { 51 print("pong failed, reconnecting \(self.relay_url.id)") 52 self.isConnected = false 53 self.isConnecting = false 54 self.reconnect_with_backoff() 55 self.log?.add("Ping failed") 56 } 57 } 58 } 59 60 func connect(force: Bool = false) { 61 if !force && (isConnected || isConnecting) { 62 return 63 } 64 65 isConnecting = true 66 last_connection_attempt = Date().timeIntervalSince1970 67 68 subscriptionToken = socket.subject 69 .receive(on: DispatchQueue.global(qos: .default)) 70 .sink { [weak self] completion in 71 switch completion { 72 case .failure(let error): 73 self?.receive(event: .error(error)) 74 case .finished: 75 self?.receive(event: .disconnected(.normalClosure, nil)) 76 } 77 } receiveValue: { [weak self] event in 78 self?.receive(event: event) 79 } 80 81 socket.connect() 82 } 83 84 func disconnect() { 85 socket.disconnect() 86 subscriptionToken = nil 87 88 isConnected = false 89 isConnecting = false 90 } 91 92 func disablePermanently() { 93 isDisabled = true 94 } 95 96 func send_raw(_ req: String) { 97 socket.send(.string(req)) 98 } 99 100 func send(_ req: NostrRequestType, callback: ((String) -> Void)? = nil) { 101 switch req { 102 case .typical(let req): 103 guard let req = make_nostr_req(req) else { 104 print("failed to encode nostr req: \(req)") 105 return 106 } 107 send_raw(req) 108 callback?(req) 109 110 case .custom(let req): 111 send_raw(req) 112 callback?(req) 113 } 114 } 115 116 private func receive(event: WebSocketEvent) { 117 processEvent(event) 118 switch event { 119 case .connected: 120 DispatchQueue.main.async { 121 self.backoff = 1.0 122 self.isConnected = true 123 self.isConnecting = false 124 } 125 case .message(let message): 126 self.receive(message: message) 127 case .disconnected(let closeCode, let reason): 128 if closeCode != .normalClosure { 129 print("⚠️ Warning: RelayConnection (\(self.relay_url)) closed with code \(closeCode), reason: \(String(describing: reason))") 130 } 131 DispatchQueue.main.async { 132 self.isConnected = false 133 self.isConnecting = false 134 self.reconnect() 135 } 136 case .error(let error): 137 print("⚠️ Warning: RelayConnection (\(self.relay_url)) error: \(error)") 138 let nserr = error as NSError 139 if nserr.domain == NSPOSIXErrorDomain && nserr.code == 57 { 140 // ignore socket not connected? 141 return 142 } 143 DispatchQueue.main.async { 144 self.isConnected = false 145 self.isConnecting = false 146 self.reconnect_with_backoff() 147 } 148 } 149 DispatchQueue.main.async { 150 self.handleEvent(.ws_event(event)) 151 } 152 153 if let description = event.description { 154 log?.add(description) 155 } 156 } 157 158 func reconnect_with_backoff() { 159 self.backoff *= 1.5 160 self.reconnect_in(after: self.backoff) 161 } 162 163 func reconnect() { 164 guard !isConnecting && !isDisabled else { 165 return // we're already trying to connect or we're disabled 166 } 167 disconnect() 168 connect() 169 log?.add("Reconnecting...") 170 } 171 172 func reconnect_in(after: TimeInterval) { 173 DispatchQueue.main.asyncAfter(deadline: .now() + after) { 174 self.reconnect() 175 } 176 } 177 178 private func receive(message: URLSessionWebSocketTask.Message) { 179 switch message { 180 case .string(let messageString): 181 if let ev = decode_nostr_event(txt: messageString) { 182 DispatchQueue.main.async { 183 self.handleEvent(.nostr_event(ev)) 184 } 185 return 186 } 187 print("failed to decode event \(messageString)") 188 case .data(let messageData): 189 if let messageString = String(data: messageData, encoding: .utf8) { 190 receive(message: .string(messageString)) 191 } 192 @unknown default: 193 print("An unexpected URLSessionWebSocketTask.Message was received.") 194 } 195 } 196 } 197 198 func make_nostr_req(_ req: NostrRequest) -> String? { 199 switch req { 200 case .subscribe(let sub): 201 return make_nostr_subscription_req(sub.filters, sub_id: sub.sub_id) 202 case .unsubscribe(let sub_id): 203 return make_nostr_unsubscribe_req(sub_id) 204 case .event(let ev): 205 return make_nostr_push_event(ev: ev) 206 case .auth(let ev): 207 return make_nostr_auth_event(ev: ev) 208 } 209 } 210 211 func make_nostr_auth_event(ev: NostrEvent) -> String? { 212 guard let event = encode_json(ev) else { 213 return nil 214 } 215 let encoded = "[\"AUTH\",\(event)]" 216 print(encoded) 217 return encoded 218 } 219 220 func make_nostr_push_event(ev: NostrEvent) -> String? { 221 guard let event = encode_json(ev) else { 222 return nil 223 } 224 let encoded = "[\"EVENT\",\(event)]" 225 print(encoded) 226 return encoded 227 } 228 229 func make_nostr_unsubscribe_req(_ sub_id: String) -> String? { 230 "[\"CLOSE\",\"\(sub_id)\"]" 231 } 232 233 func make_nostr_subscription_req(_ filters: [NostrFilter], sub_id: String) -> String? { 234 let encoder = JSONEncoder() 235 var req = "[\"REQ\",\"\(sub_id)\"" 236 for filter in filters { 237 req += "," 238 guard let filter_json = try? encoder.encode(filter) else { 239 return nil 240 } 241 let filter_json_str = String(decoding: filter_json, as: UTF8.self) 242 req += filter_json_str 243 } 244 req += "]" 245 return req 246 }