commit e7d32d9ea76f83a92edea1b2a3b8cb6b60714a20
parent 442a50f9ae8fbe9caadff2fcfd3fbef62a81c236
Author: William Casarin <jb55@jb55.com>
Date: Fri, 6 Jan 2023 10:18:31 -0800
pool: queue requests if we're disconnected
Changelog-Fixed: Don't spin forever if we're temporarily disconnected
Diffstat:
1 file changed, 57 insertions(+), 2 deletions(-)
diff --git a/damus/Nostr/RelayPool.swift b/damus/Nostr/RelayPool.swift
@@ -28,9 +28,20 @@ struct RelayHandler {
let callback: (String, NostrConnectionEvent) -> ()
}
+struct QueuedRequest {
+ let req: NostrRequest
+ let relay: String
+}
+
+struct NostrRequestId: Equatable, Hashable {
+ let relay: String?
+ let sub_id: String
+}
+
class RelayPool {
var relays: [Relay] = []
var handlers: [RelayHandler] = []
+ var request_queue: [QueuedRequest] = []
var descriptors: [RelayDescriptor] {
relays.map { $0.descriptor }
@@ -148,13 +159,38 @@ class RelayPool {
send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to)
}
+ func count_queued(relay: String) -> Int {
+ var c = 0
+ for request in request_queue {
+ if request.relay == relay {
+ c += 1
+ }
+ }
+
+ return c
+ }
+
+ func queue_req(r: NostrRequest, relay: String) {
+ let count = count_queued(relay: relay)
+ guard count <= 10 else {
+ print("can't queue, too many queued events for \(relay)")
+ return
+ }
+
+ print("queueing request: \(r) for \(relay)")
+ request_queue.append(QueuedRequest(req: r, relay: relay))
+ }
+
func send(_ req: NostrRequest, to: [String]? = nil) {
let relays = to.map{ get_relays($0) } ?? self.relays
for relay in relays {
- if relay.connection.isConnected {
- relay.connection.send(req)
+ guard relay.connection.isConnected else {
+ queue_req(r: req, relay: relay.id)
+ continue
}
+
+ relay.connection.send(req)
}
}
@@ -193,9 +229,28 @@ class RelayPool {
}
}
+ func run_queue(_ relay_id: String) {
+ self.request_queue = request_queue.reduce(into: Array<QueuedRequest>()) { (q, req) in
+ guard req.relay == relay_id else {
+ q.append(req)
+ return
+ }
+
+ print("running queueing request: \(req.req) for \(relay)")
+ self.send(req.req, to: [relay_id])
+ }
+ }
+
func handle_event(relay_id: String, event: NostrConnectionEvent) {
record_last_pong(relay_id: relay_id, event: event)
+ // run req queue when we reconnect
+ if case .ws_event(let ws) = event {
+ if case .connected = ws {
+ run_queue(relay_id)
+ }
+ }
+
// handle reconnect logic, etc?
for handler in handlers {
handler.callback(relay_id, event)