coordinator.rs (19027B)
1 use ewebsock::{WsEvent, WsMessage}; 2 use hashbrown::{HashMap, HashSet}; 3 4 use crate::{ 5 relay::{ 6 compaction::{CompactionData, CompactionRelay, CompactionSession}, 7 transparent::{revocate_transparent_subs, TransparentData, TransparentRelay}, 8 BroadcastCache, BroadcastRelay, NormRelayUrl, OutboxSubId, OutboxSubscriptions, 9 RawEventData, RelayCoordinatorLimits, RelayImplType, RelayLimitations, RelayReqId, 10 RelayReqStatus, RelayType, SubPassGuardian, SubPassRevocation, WebsocketRelay, 11 }, 12 EventClientMessage, RelayMessage, RelayStatus, Wakeup, WebsocketConn, 13 }; 14 15 /// RelayCoordinator routes each Outbox subscription to either the compaction or 16 /// transparent relay engine and tracks their status. 17 pub struct CoordinationData { 18 limits: RelayCoordinatorLimits, 19 pub(crate) websocket: Option<WebsocketRelay>, 20 coordination: HashMap<OutboxSubId, RelayType>, 21 compaction_data: CompactionData, 22 transparent_data: TransparentData, // for outbox subs that prefer to be transparent 23 broadcast_cache: BroadcastCache, 24 eose_queue: Vec<RelayReqId>, 25 } 26 27 impl CoordinationData { 28 pub fn new<W>(limits: RelayLimitations, norm_url: NormRelayUrl, wakeup: W) -> Self 29 where 30 W: Wakeup, 31 { 32 let websocket = match WebsocketConn::from_wakeup(norm_url.clone().into(), wakeup) { 33 Ok(w) => Some(WebsocketRelay::new(w)), 34 Err(e) => { 35 tracing::error!("could not open websocket to {norm_url:?}: {e}"); 36 None 37 } 38 }; 39 let limits = RelayCoordinatorLimits::new(limits); 40 let compaction_data = CompactionData::default(); 41 Self { 42 limits, 43 websocket, 44 compaction_data, 45 transparent_data: TransparentData::default(), 46 coordination: Default::default(), 47 broadcast_cache: Default::default(), 48 eose_queue: Vec::new(), 49 } 50 } 51 52 /// Change if we found a new NIP-11 `max_subscriptions` 53 #[allow(dead_code)] 54 pub fn set_max_size(&mut self, subs: &OutboxSubscriptions, max_size: usize) { 55 let Some(revocations) = self.limits.new_total(max_size) else { 56 return; 57 }; 58 59 let mut trans_left = self.transparent_data.num_subs(); 60 let mut compact_left = self.compaction_data.num_subs(); 61 62 let (trans_revocations, compacts_revocations): ( 63 Vec<SubPassRevocation>, 64 Vec<SubPassRevocation>, 65 ) = revocations.into_iter().partition(|_| { 66 let take_trans = (trans_left > compact_left && trans_left > 0) || (compact_left == 0); 67 68 if take_trans { 69 trans_left -= 1; 70 } else { 71 compact_left -= 1; 72 } 73 take_trans 74 }); 75 76 if !trans_revocations.is_empty() { 77 revocate_transparent_subs( 78 self.websocket.as_mut(), 79 &mut self.transparent_data, 80 trans_revocations, 81 ); 82 } 83 84 if !compacts_revocations.is_empty() { 85 CompactionRelay::new( 86 self.websocket.as_mut(), 87 &mut self.compaction_data, 88 self.limits.max_json_bytes, 89 &mut self.limits.sub_guardian, 90 subs, 91 ) 92 .revocate_all(compacts_revocations); 93 } 94 } 95 96 #[profiling::function] 97 pub fn ingest_session( 98 &mut self, 99 subs: &OutboxSubscriptions, 100 session: CoordinationSession, 101 ) -> EoseIds { 102 let mut trans_unsubs: HashSet<OutboxSubId> = HashSet::new(); 103 let mut trans = HashSet::new(); 104 let mut compaction_session = CompactionSession::default(); 105 let mut eose_ids = EoseIds::default(); 106 107 for (id, task) in session.tasks { 108 match task { 109 CoordinationTask::TransparentSub => { 110 if let Some(RelayType::Compaction) = self.coordination.get(&id) { 111 compaction_session.unsub(id); 112 } 113 self.coordination.insert(id, RelayType::Transparent); 114 trans.insert(id); 115 } 116 CoordinationTask::CompactionSub => { 117 if let Some(RelayType::Transparent) = self.coordination.get(&id) { 118 trans_unsubs.insert(id); 119 } 120 self.coordination.insert(id, RelayType::Compaction); 121 compaction_session.sub(id); 122 } 123 CoordinationTask::Unsubscribe => { 124 let Some(rtype) = self.coordination.remove(&id) else { 125 continue; 126 }; 127 128 match rtype { 129 RelayType::Compaction => { 130 compaction_session.unsub(id); 131 } 132 RelayType::Transparent => { 133 trans_unsubs.insert(id); 134 } 135 } 136 } 137 } 138 } 139 140 // Drain EOSE queue and collect IDs 141 for sid in self.eose_queue.drain(..) { 142 // Try compaction first 143 let Some(compaction_ids) = self.compaction_data.ids(&sid) else { 144 let Some(transparent_id) = self.transparent_data.id(&sid) else { 145 continue; 146 }; 147 148 if subs.is_oneshot(&transparent_id) { 149 trans_unsubs.insert(transparent_id); 150 eose_ids.oneshots.insert(transparent_id); 151 } else { 152 eose_ids.normal.insert(transparent_id); 153 } 154 continue; 155 }; 156 157 let oneshots = subs.subset_oneshot(compaction_ids); 158 159 for id in compaction_ids { 160 if oneshots.contains(id) { 161 compaction_session.unsub(*id); 162 eose_ids.oneshots.insert(*id); 163 } else { 164 eose_ids.normal.insert(*id); 165 } 166 } 167 } 168 169 if !trans_unsubs.is_empty() { 170 let mut transparent = TransparentRelay::new( 171 self.websocket.as_mut(), 172 &mut self.transparent_data, 173 &mut self.limits.sub_guardian, 174 ); 175 for unsub in trans_unsubs { 176 transparent.unsubscribe(unsub); 177 } 178 } 179 180 if !trans.is_empty() { 181 compaction_session.request_free_subs(trans.len()); 182 } 183 184 if !compaction_session.is_empty() { 185 CompactionRelay::new( 186 self.websocket.as_mut(), 187 &mut self.compaction_data, 188 self.limits.max_json_bytes, 189 &mut self.limits.sub_guardian, 190 subs, 191 ) 192 .ingest_session(compaction_session); 193 } 194 195 let mut transparent = TransparentRelay::new( 196 self.websocket.as_mut(), 197 &mut self.transparent_data, 198 &mut self.limits.sub_guardian, 199 ); 200 for id in trans { 201 let Some(view) = subs.view(&id) else { 202 continue; 203 }; 204 transparent.subscribe(view); 205 } 206 207 transparent.try_flush_queue(subs); 208 tracing::trace!( 209 "Using {} of {} subs", 210 self.limits.sub_guardian.total_passes() - self.limits.sub_guardian.available_passes(), 211 self.limits.sub_guardian.total_passes() 212 ); 213 214 eose_ids 215 } 216 217 pub fn send_event(&mut self, msg: EventClientMessage) { 218 BroadcastRelay::websocket(self.websocket.as_mut(), &mut self.broadcast_cache) 219 .broadcast(msg); 220 } 221 222 #[allow(dead_code)] 223 pub fn set_req_status(&mut self, sid: &str, status: RelayReqStatus) { 224 // the compaction & transparent data only act on sids that they already know, so whichever 225 // this sid belongs to, it'll make it to its rightful home 226 self.compaction_data.set_req_status(sid, status); 227 self.transparent_data.set_req_status(sid, status); 228 } 229 230 pub fn req_status(&self, id: &OutboxSubId) -> Option<RelayReqStatus> { 231 match self.coordination.get(id)? { 232 RelayType::Compaction => self.compaction_data.req_status(id), 233 RelayType::Transparent => self.transparent_data.req_status(id), 234 } 235 } 236 237 #[allow(dead_code)] 238 pub fn has_req_status(&self, id: &OutboxSubId, status: RelayReqStatus) -> bool { 239 self.req_status(id) == Some(status) 240 } 241 242 fn url(&self) -> &str { 243 let Some(websocket) = &self.websocket else { 244 return ""; 245 }; 246 websocket.conn.url.as_str() 247 } 248 249 // whether we received 250 #[profiling::function] 251 pub(crate) fn try_recv<F>(&mut self, subs: &OutboxSubscriptions, act: &mut F) -> RecvResponse 252 where 253 for<'a> F: FnMut(RawEventData<'a>), 254 { 255 let Some(websocket) = self.websocket.as_mut() else { 256 return RecvResponse::default(); 257 }; 258 259 let event = { 260 profiling::scope!("webscket try_recv"); 261 262 let Some(event) = websocket.conn.receiver.try_recv() else { 263 return RecvResponse::default(); 264 }; 265 event 266 }; 267 268 let msg = match &event { 269 WsEvent::Opened => { 270 websocket.conn.set_status(RelayStatus::Connected); 271 websocket.reconnect_attempt = 0; 272 websocket.retry_connect_after = WebsocketRelay::initial_reconnect_duration(); 273 handle_relay_open( 274 websocket, 275 &mut self.broadcast_cache, 276 &mut self.compaction_data, 277 &mut self.transparent_data, 278 self.limits.max_json_bytes, 279 &mut self.limits.sub_guardian, 280 subs, 281 ); 282 None 283 } 284 WsEvent::Closed => { 285 websocket.conn.set_status(RelayStatus::Disconnected); 286 None 287 } 288 WsEvent::Error(err) => { 289 tracing::error!("relay {} error: {:?}", websocket.conn.url, err); 290 websocket.conn.set_status(RelayStatus::Disconnected); 291 None 292 } 293 WsEvent::Message(ws_message) => match ws_message { 294 #[cfg(not(target_arch = "wasm32"))] 295 WsMessage::Ping(bs) => { 296 websocket.conn.sender.send(WsMessage::Pong(bs.clone())); 297 None 298 } 299 WsMessage::Text(text) => { 300 tracing::trace!("relay {} received text: {}", websocket.conn.url, text); 301 match RelayMessage::from_json(text) { 302 Ok(msg) => Some(msg), 303 Err(err) => { 304 tracing::error!( 305 "relay {} message decode error: {:?}", 306 websocket.conn.url, 307 err 308 ); 309 None 310 } 311 } 312 } 313 _ => None, 314 }, 315 }; 316 317 let mut resp = RecvResponse::received(); 318 let Some(msg) = msg else { 319 return resp; 320 }; 321 322 match msg { 323 RelayMessage::OK(cr) => tracing::info!("OK {:?}", cr), 324 RelayMessage::Eose(sid) => { 325 tracing::debug!("Relay {} received EOSE for subscription: {sid}", self.url()); 326 self.compaction_data 327 .set_req_status(sid, RelayReqStatus::Eose); 328 self.transparent_data 329 .set_req_status(sid, RelayReqStatus::Eose); 330 self.eose_queue.push(RelayReqId(sid.to_string())); 331 } 332 RelayMessage::Event(_, ev) => { 333 profiling::scope!("ingest event"); 334 resp.event_was_nostr_note = true; 335 act(RawEventData { 336 url: websocket.conn.url.as_str(), 337 event_json: ev, 338 relay_type: RelayImplType::Websocket, 339 }); 340 } 341 RelayMessage::Notice(msg) => { 342 tracing::warn!("Notice from {}: {}", self.url(), msg) 343 } 344 RelayMessage::Closed(sid, _) => { 345 tracing::trace!("Relay {} received CLOSED: {sid}", self.url()); 346 self.compaction_data 347 .set_req_status(sid, RelayReqStatus::Closed); 348 self.transparent_data 349 .set_req_status(sid, RelayReqStatus::Closed); 350 } 351 } 352 353 resp 354 } 355 } 356 357 #[derive(Default)] 358 pub struct RecvResponse { 359 pub received_event: bool, 360 pub event_was_nostr_note: bool, 361 } 362 363 impl RecvResponse { 364 pub fn received() -> Self { 365 RecvResponse { 366 received_event: true, 367 event_was_nostr_note: false, 368 } 369 } 370 } 371 372 #[derive(Default)] 373 pub struct EoseIds { 374 pub oneshots: HashSet<OutboxSubId>, 375 pub normal: HashSet<OutboxSubId>, 376 } 377 378 impl EoseIds { 379 /// Merges IDs from `other` into `self`, preserving set uniqueness. 380 pub fn absorb(&mut self, other: EoseIds) { 381 self.oneshots.extend(other.oneshots); 382 self.normal.extend(other.normal); 383 } 384 } 385 386 fn handle_relay_open( 387 websocket: &mut WebsocketRelay, 388 broadcast_cache: &mut BroadcastCache, 389 compaction: &mut CompactionData, 390 transparent: &mut TransparentData, 391 max_json: usize, 392 guardian: &mut SubPassGuardian, 393 subs: &OutboxSubscriptions, 394 ) { 395 BroadcastRelay::websocket(Some(websocket), broadcast_cache).try_flush_queue(); 396 let mut transparent = TransparentRelay::new(Some(websocket), transparent, guardian); 397 transparent.handle_relay_open(subs); 398 let mut compaction = 399 CompactionRelay::new(Some(websocket), compaction, max_json, guardian, subs); 400 compaction.handle_relay_open(); 401 } 402 403 #[derive(Default)] 404 pub struct CoordinationSession { 405 pub tasks: HashMap<OutboxSubId, CoordinationTask>, 406 } 407 408 pub enum CoordinationTask { 409 TransparentSub, 410 CompactionSub, 411 Unsubscribe, 412 } 413 414 impl CoordinationSession { 415 pub fn subscribe(&mut self, id: OutboxSubId, use_transparent: bool) { 416 self.tasks.insert( 417 id, 418 if use_transparent { 419 CoordinationTask::TransparentSub 420 } else { 421 CoordinationTask::CompactionSub 422 }, 423 ); 424 } 425 426 pub fn unsubscribe(&mut self, id: OutboxSubId) { 427 self.tasks.insert(id, CoordinationTask::Unsubscribe); 428 } 429 } 430 431 #[cfg(test)] 432 mod tests { 433 use super::*; 434 435 /// Returns the task held for `id`, panicking when no matching task exists. 436 #[track_caller] 437 fn expect_task<'a>(session: &'a CoordinationSession, id: OutboxSubId) -> &'a CoordinationTask { 438 session 439 .tasks 440 .get(&id) 441 .unwrap_or_else(|| panic!("Expected task for {:?}", id)) 442 } 443 444 // ==================== CoordinationSession tests ==================== 445 446 /// Newly created coordination sessions hold no tasks. 447 #[test] 448 fn coordination_session_default_empty() { 449 let session = CoordinationSession::default(); 450 assert!(session.tasks.is_empty()); 451 } 452 453 /// Transparent subscriptions should be recorded as TransparentSub tasks. 454 #[test] 455 fn coordination_session_subscribe_transparent() { 456 let mut session = CoordinationSession::default(); 457 458 session.subscribe(OutboxSubId(0), true); // use_transparent = true 459 460 assert!(matches!( 461 expect_task(&session, OutboxSubId(0)), 462 CoordinationTask::TransparentSub 463 )); 464 } 465 466 /// Compaction mode subscriptions should be recorded as CompactionSub tasks. 467 #[test] 468 fn coordination_session_subscribe_compaction() { 469 let mut session = CoordinationSession::default(); 470 471 session.subscribe(OutboxSubId(0), false); // use_transparent = false means compaction 472 473 assert!(matches!( 474 expect_task(&session, OutboxSubId(0)), 475 CoordinationTask::CompactionSub 476 )); 477 } 478 479 /// Unsubscribe should record an Unsubscribe task. 480 #[test] 481 fn coordination_session_unsubscribe() { 482 let mut session = CoordinationSession::default(); 483 484 session.unsubscribe(OutboxSubId(42)); 485 486 assert!(matches!( 487 expect_task(&session, OutboxSubId(42)), 488 CoordinationTask::Unsubscribe 489 )); 490 } 491 492 /// Subsequent subscribe calls should overwrite previous modes. 493 #[test] 494 fn coordination_session_subscribe_overwrites_previous() { 495 let mut session = CoordinationSession::default(); 496 497 // First subscribe as transparent 498 session.subscribe(OutboxSubId(0), true); 499 500 assert!(matches!( 501 expect_task(&session, OutboxSubId(0)), 502 CoordinationTask::TransparentSub 503 )); 504 505 // Then as compaction 506 session.subscribe(OutboxSubId(0), false); 507 508 // Should be compaction now 509 assert!(matches!( 510 expect_task(&session, OutboxSubId(0)), 511 CoordinationTask::CompactionSub 512 )); 513 } 514 515 /// Unsubscribe should override any prior subscribe entries. 516 #[test] 517 fn coordination_session_unsubscribe_overwrites_subscribe() { 518 let mut session = CoordinationSession::default(); 519 520 session.subscribe(OutboxSubId(0), true); 521 assert!(matches!( 522 expect_task(&session, OutboxSubId(0)), 523 CoordinationTask::TransparentSub 524 )); 525 session.unsubscribe(OutboxSubId(0)); 526 527 assert!(matches!( 528 expect_task(&session, OutboxSubId(0)), 529 CoordinationTask::Unsubscribe 530 )); 531 } 532 533 /// Multiple tasks can be recorded in a single session. 534 #[test] 535 fn coordination_session_multiple_tasks() { 536 let mut session = CoordinationSession::default(); 537 538 session.subscribe(OutboxSubId(0), true); 539 session.subscribe(OutboxSubId(1), false); 540 session.unsubscribe(OutboxSubId(2)); 541 542 assert_eq!(session.tasks.len(), 3); 543 } 544 545 // ==================== EoseIds tests ==================== 546 547 #[test] 548 fn eose_ids_default_empty() { 549 let eose_ids = EoseIds::default(); 550 assert!(eose_ids.oneshots.is_empty()); 551 assert!(eose_ids.normal.is_empty()); 552 } 553 554 /// absorb merges oneshot and normal ID sets into the target accumulator. 555 #[test] 556 fn eose_ids_absorb_merges_both_sets() { 557 let mut acc = EoseIds::default(); 558 let mut incoming = EoseIds::default(); 559 560 acc.oneshots.insert(OutboxSubId(1)); 561 incoming.oneshots.insert(OutboxSubId(2)); 562 incoming.normal.insert(OutboxSubId(3)); 563 564 acc.absorb(incoming); 565 566 assert!(acc.oneshots.contains(&OutboxSubId(1))); 567 assert!(acc.oneshots.contains(&OutboxSubId(2))); 568 assert!(acc.normal.contains(&OutboxSubId(3))); 569 } 570 }