transparent.rs (18863B)
1 use hashbrown::HashMap; 2 use uuid::Uuid; 3 4 use crate::{ 5 relay::{ 6 subscription::SubscriptionView, MetadataFilters, OutboxSubId, OutboxSubscriptions, 7 QueuedTasks, RelayReqId, RelayReqStatus, RelayTask, SubPass, SubPassGuardian, 8 SubPassRevocation, WebsocketRelay, 9 }, 10 ClientMessage, 11 }; 12 13 /// TransparentData tracks the outstanding transparent REQs and their metadata. 14 #[derive(Default)] 15 pub struct TransparentData { 16 request_to_sid: HashMap<OutboxSubId, RelayReqId>, 17 sid_status: HashMap<RelayReqId, SubData>, 18 queue: QueuedTasks, 19 } 20 21 impl TransparentData { 22 #[allow(dead_code)] 23 pub fn num_subs(&self) -> usize { 24 self.sid_status.len() 25 } 26 27 #[allow(dead_code)] 28 pub fn contains(&self, id: &OutboxSubId) -> bool { 29 self.request_to_sid.contains_key(id) 30 } 31 32 pub fn set_req_status(&mut self, sid: &str, status: RelayReqStatus) { 33 let Some(entry) = self.sid_status.get_mut(sid) else { 34 return; 35 }; 36 entry.status = status; 37 } 38 39 pub fn req_status(&self, req_id: &OutboxSubId) -> Option<RelayReqStatus> { 40 let sid = self.request_to_sid.get(req_id)?; 41 Some(self.sid_status.get(sid)?.status) 42 } 43 44 /// Returns the OutboxSubId associated with the given relay subscription ID. 45 pub fn id(&self, sid: &RelayReqId) -> Option<OutboxSubId> { 46 self.sid_status.get(sid).map(|d| d.sub_req_id) 47 } 48 } 49 50 pub struct TransparentRelay<'a> { 51 relay: Option<&'a mut WebsocketRelay>, 52 data: &'a mut TransparentData, 53 sub_guardian: &'a mut SubPassGuardian, 54 } 55 56 /// TransparentRelay manages per-subscription REQs for outbox subscriptions which 57 /// need to get EOSE ASAP (or some other need) 58 impl<'a> TransparentRelay<'a> { 59 pub fn new( 60 relay: Option<&'a mut WebsocketRelay>, 61 data: &'a mut TransparentData, 62 sub_guardian: &'a mut SubPassGuardian, 63 ) -> Self { 64 Self { 65 relay, 66 data, 67 sub_guardian, 68 } 69 } 70 71 pub fn try_flush_queue(&mut self, subs: &OutboxSubscriptions) { 72 while self.sub_guardian.available_passes() > 0 && !self.data.queue.is_empty() { 73 let Some(next) = self.data.queue.pop() else { 74 return; 75 }; 76 77 let Some(view) = subs.view(&next) else { 78 continue; 79 }; 80 81 self.subscribe(view); 82 } 83 } 84 85 pub fn subscribe(&mut self, view: SubscriptionView) { 86 let req_id = view.id; 87 let Some(existing_sid) = self.data.request_to_sid.get(&req_id) else { 88 let Some(new_pass) = self.sub_guardian.take_pass() else { 89 self.data.queue.add(req_id, RelayTask::Subscribe); 90 return; 91 }; 92 tracing::debug!("Transparent took pass for {req_id:?}"); 93 let sid: RelayReqId = Uuid::new_v4().into(); 94 self.data.request_to_sid.insert(req_id, sid.clone()); 95 send_req(&mut self.relay, &sid, view.filters); 96 self.data.sid_status.insert( 97 sid, 98 SubData { 99 status: RelayReqStatus::InitialQuery, 100 sub_pass: new_pass, 101 sub_req_id: req_id, 102 }, 103 ); 104 return; 105 }; 106 107 let Some(sub_data) = self.data.sid_status.get_mut(existing_sid) else { 108 return; 109 }; 110 111 // we're replacing the existing sub with new filters 112 sub_data.status = RelayReqStatus::InitialQuery; 113 114 send_req(&mut self.relay, existing_sid, view.filters); 115 } 116 117 pub fn unsubscribe(&mut self, req_id: OutboxSubId) { 118 let Some(sid) = self.data.request_to_sid.remove(&req_id) else { 119 self.data.queue.add(req_id, RelayTask::Unsubscribe); 120 return; 121 }; 122 123 let Some(removed) = self.data.sid_status.remove(&sid) else { 124 return; 125 }; 126 127 self.sub_guardian.return_pass(removed.sub_pass); 128 129 let Some(relay) = &mut self.relay else { 130 return; 131 }; 132 133 if relay.is_connected() { 134 relay.conn.send(&ClientMessage::close(sid.to_string())); 135 } 136 } 137 138 #[profiling::function] 139 pub fn handle_relay_open(&mut self, subs: &OutboxSubscriptions) { 140 let Some(relay) = &mut self.relay else { 141 return; 142 }; 143 144 if !relay.is_connected() { 145 return; 146 } 147 148 for (sid, data) in &self.data.sid_status { 149 let Some(view) = subs.view(&data.sub_req_id) else { 150 continue; 151 }; 152 153 relay.conn.send(&ClientMessage::req( 154 sid.to_string(), 155 view.filters.get_filters().clone(), 156 )); 157 } 158 } 159 } 160 161 fn send_req(relay: &mut Option<&mut WebsocketRelay>, sid: &RelayReqId, filters: &MetadataFilters) { 162 let Some(relay) = relay.as_mut() else { 163 return; 164 }; 165 166 if !relay.is_connected() { 167 return; 168 } 169 170 relay.conn.send(&ClientMessage::req( 171 sid.to_string(), 172 filters.get_filters().clone(), 173 )); 174 } 175 176 #[allow(dead_code)] 177 pub fn revocate_transparent_subs( 178 mut relay: Option<&mut WebsocketRelay>, 179 data: &mut TransparentData, 180 revocations: Vec<SubPassRevocation>, 181 ) { 182 // Snapshot the pairs we intend to process (can't mutate while iterating). 183 let pairs: Vec<(OutboxSubId, RelayReqId)> = data 184 .request_to_sid 185 .iter() 186 .take(revocations.len()) 187 .map(|(id, sid)| (*id, sid.clone())) 188 .collect(); 189 190 for (mut revocation, (id, sid)) in revocations.into_iter().zip(pairs) { 191 // If we fail to remove the mapping, skip without consuming other state. 192 if data.request_to_sid.remove(&id).is_none() { 193 continue; 194 } 195 196 let Some(status) = data.sid_status.remove(&sid) else { 197 continue; 198 }; 199 200 revocation.revocate(status.sub_pass); 201 data.queue.add(id, RelayTask::Subscribe); 202 203 let Some(relay) = &mut relay else { 204 continue; 205 }; 206 207 if relay.is_connected() { 208 relay.conn.send(&ClientMessage::close(sid.to_string())); 209 } 210 } 211 } 212 213 struct SubData { 214 pub status: RelayReqStatus, 215 pub sub_pass: SubPass, 216 pub sub_req_id: OutboxSubId, 217 } 218 219 #[cfg(test)] 220 mod tests { 221 use super::*; 222 use crate::relay::{RelayUrlPkgs, SubscribeTask}; 223 use hashbrown::HashSet; 224 use nostrdb::Filter; 225 226 // ==================== TransparentData tests ==================== 227 228 fn trivial_filter() -> Vec<Filter> { 229 vec![Filter::new().kinds([0]).build()] 230 } 231 232 fn create_subs_with_filter(id: OutboxSubId, filters: Vec<Filter>) -> OutboxSubscriptions { 233 let mut subs = OutboxSubscriptions::default(); 234 insert_sub(&mut subs, id, filters, false); 235 subs 236 } 237 238 fn insert_sub( 239 subs: &mut OutboxSubscriptions, 240 id: OutboxSubId, 241 filters: Vec<Filter>, 242 is_oneshot: bool, 243 ) { 244 subs.new_subscription( 245 id, 246 SubscribeTask { 247 filters, 248 relays: RelayUrlPkgs::new(HashSet::new()), 249 }, 250 is_oneshot, 251 ); 252 } 253 254 #[test] 255 fn transparent_data_manual_insert_and_query() { 256 let mut data = TransparentData::default(); 257 let mut guardian = SubPassGuardian::new(1); 258 let pass = guardian.take_pass().unwrap(); 259 260 let req_id = OutboxSubId(42); 261 let sid = RelayReqId::default(); 262 263 data.request_to_sid.insert(req_id, sid.clone()); 264 data.sid_status.insert( 265 sid.clone(), 266 SubData { 267 status: RelayReqStatus::InitialQuery, 268 sub_pass: pass, 269 sub_req_id: req_id, 270 }, 271 ); 272 273 assert!(data.contains(&req_id)); 274 assert_eq!(data.num_subs(), 1); 275 assert_eq!(data.req_status(&req_id), Some(RelayReqStatus::InitialQuery)); 276 277 // Update status 278 data.set_req_status(&sid.to_string(), RelayReqStatus::Eose); 279 assert_eq!(data.req_status(&req_id), Some(RelayReqStatus::Eose)); 280 } 281 282 // ==================== TransparentRelay tests ==================== 283 284 #[test] 285 fn transparent_relay_subscribe_creates_mapping() { 286 let mut data = TransparentData::default(); 287 let mut guardian = SubPassGuardian::new(5); 288 let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter()); 289 290 { 291 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 292 relay.subscribe(subs.view(&OutboxSubId(0)).unwrap()); 293 } 294 295 assert!(data.contains(&OutboxSubId(0))); 296 assert_eq!(data.num_subs(), 1); 297 assert_eq!(guardian.available_passes(), 4); // One pass consumed 298 } 299 300 #[test] 301 fn transparent_relay_subscribe_queues_when_no_passes() { 302 let mut data = TransparentData::default(); 303 let mut guardian = SubPassGuardian::new(0); // No passes available 304 let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter()); 305 306 { 307 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 308 relay.subscribe(subs.view(&OutboxSubId(0)).unwrap()); 309 } 310 311 // Should be queued, not active 312 assert!(!data.contains(&OutboxSubId(0))); 313 assert_eq!(data.num_subs(), 0); 314 assert_eq!(data.queue.len(), 1); 315 } 316 317 #[test] 318 fn transparent_relay_unsubscribe_returns_pass() { 319 let mut data = TransparentData::default(); 320 let mut guardian = SubPassGuardian::new(1); 321 let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter()); 322 323 { 324 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 325 relay.subscribe(subs.view(&OutboxSubId(0)).unwrap()); 326 } 327 328 assert_eq!(guardian.available_passes(), 0); 329 assert!(data.queue.is_empty()); 330 331 { 332 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 333 relay.unsubscribe(OutboxSubId(0)); 334 } 335 336 assert_eq!(guardian.available_passes(), 1); 337 assert!(!data.contains(&OutboxSubId(0))); 338 assert_eq!(data.num_subs(), 0); 339 assert!(data.queue.is_empty()); 340 } 341 342 #[test] 343 fn transparent_relay_sub_unsub_no_passes() { 344 let mut data = TransparentData::default(); 345 346 // no passes available 347 let mut guardian = SubPassGuardian::new(0); 348 let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter()); 349 350 { 351 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 352 relay.subscribe(subs.view(&OutboxSubId(0)).unwrap()); 353 } 354 355 assert!(!data.queue.is_empty()); 356 357 { 358 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 359 relay.unsubscribe(OutboxSubId(0)); 360 } 361 362 assert!(data.queue.is_empty()); 363 } 364 365 #[test] 366 fn transparent_relay_unsubscribe_unknown_no_op() { 367 let mut data = TransparentData::default(); 368 let mut guardian = SubPassGuardian::new(5); 369 370 { 371 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 372 relay.unsubscribe(OutboxSubId(999)); // Unknown ID 373 } 374 375 // Should not panic, passes unchanged 376 assert_eq!(guardian.available_passes(), 5); 377 } 378 379 #[test] 380 fn transparent_relay_subscribe_replaces_existing() { 381 let mut data = TransparentData::default(); 382 let mut guardian = SubPassGuardian::new(5); 383 384 let filters1 = vec![Filter::new().kinds(vec![1]).build()]; 385 let filters2 = vec![Filter::new().kinds(vec![4]).build()]; 386 387 let subs1 = create_subs_with_filter(OutboxSubId(0), filters1); 388 389 { 390 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 391 relay.subscribe(subs1.view(&OutboxSubId(0)).unwrap()); 392 } 393 394 assert_eq!(guardian.available_passes(), 4); 395 396 let subs2 = create_subs_with_filter(OutboxSubId(0), filters2); 397 398 { 399 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 400 relay.subscribe(subs2.view(&OutboxSubId(0)).unwrap()); 401 } 402 403 // Should still have same number of passes (replaced, not added) 404 assert_eq!(guardian.available_passes(), 4); 405 assert_eq!(data.num_subs(), 1); 406 407 // Verify replacement happened - status should be reset to InitialQuery 408 let sid = data.request_to_sid.get(&OutboxSubId(0)).unwrap(); 409 let sub_data = data.sid_status.get(sid).unwrap(); 410 assert_eq!(sub_data.status, RelayReqStatus::InitialQuery); 411 } 412 413 #[test] 414 fn transparent_relay_try_flush_queue_processes_when_passes_available() { 415 let mut data = TransparentData::default(); 416 let mut guardian = SubPassGuardian::new(0); // Start with no passes 417 let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter()); 418 419 // Queue a subscription 420 { 421 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 422 relay.subscribe(subs.view(&OutboxSubId(0)).unwrap()); 423 } 424 425 assert_eq!(data.queue.len(), 1); 426 assert!(!data.contains(&OutboxSubId(0))); 427 428 // Return a pass 429 guardian.spawn_passes(1); 430 431 // Flush queue 432 { 433 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 434 relay.try_flush_queue(&subs); 435 } 436 437 // Should now be active 438 assert!(data.queue.is_empty()); 439 assert!(data.contains(&OutboxSubId(0))); 440 } 441 442 #[test] 443 fn transparent_relay_multiple_subscriptions() { 444 let mut data = TransparentData::default(); 445 let mut guardian = SubPassGuardian::new(3); 446 let mut subs = OutboxSubscriptions::default(); 447 insert_sub(&mut subs, OutboxSubId(0), trivial_filter(), false); 448 insert_sub(&mut subs, OutboxSubId(1), trivial_filter(), false); 449 insert_sub(&mut subs, OutboxSubId(2), trivial_filter(), false); 450 451 { 452 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 453 relay.subscribe(subs.view(&OutboxSubId(0)).unwrap()); 454 relay.subscribe(subs.view(&OutboxSubId(1)).unwrap()); 455 relay.subscribe(subs.view(&OutboxSubId(2)).unwrap()); 456 } 457 458 assert_eq!(data.num_subs(), 3); 459 assert_eq!(guardian.available_passes(), 0); 460 461 // All should be tracked 462 assert!(data.contains(&OutboxSubId(0))); 463 assert!(data.contains(&OutboxSubId(1))); 464 assert!(data.contains(&OutboxSubId(2))); 465 } 466 467 #[test] 468 fn transparent_data_id_returns_outbox_sub_id() { 469 let mut data = TransparentData::default(); 470 let mut guardian = SubPassGuardian::new(2); 471 let mut subs = OutboxSubscriptions::default(); 472 insert_sub(&mut subs, OutboxSubId(0), trivial_filter(), true); 473 insert_sub(&mut subs, OutboxSubId(1), trivial_filter(), false); 474 475 { 476 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 477 relay.subscribe(subs.view(&OutboxSubId(0)).unwrap()); 478 relay.subscribe(subs.view(&OutboxSubId(1)).unwrap()); 479 } 480 481 let sid = data.request_to_sid.get(&OutboxSubId(0)).unwrap().clone(); 482 483 // id() should return the OutboxSubId for the relay subscription 484 let outbox_id = data.id(&sid); 485 assert_eq!(outbox_id, Some(OutboxSubId(0))); 486 487 // Unknown sid should return None 488 let unknown_sid = RelayReqId::from("unknown"); 489 assert!(data.id(&unknown_sid).is_none()); 490 } 491 492 // ==================== revocate_transparent_subs tests ==================== 493 494 #[test] 495 fn revocate_transparent_subs_removes_subscriptions() { 496 let mut data = TransparentData::default(); 497 let mut guardian = SubPassGuardian::new(3); 498 let mut subs = OutboxSubscriptions::default(); 499 insert_sub(&mut subs, OutboxSubId(0), trivial_filter(), false); 500 insert_sub(&mut subs, OutboxSubId(1), trivial_filter(), false); 501 insert_sub(&mut subs, OutboxSubId(2), trivial_filter(), false); 502 503 // Set up some subscriptions 504 { 505 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 506 relay.subscribe(subs.view(&OutboxSubId(0)).unwrap()); 507 relay.subscribe(subs.view(&OutboxSubId(1)).unwrap()); 508 relay.subscribe(subs.view(&OutboxSubId(2)).unwrap()); 509 } 510 511 assert_eq!(data.num_subs(), 3); 512 513 // Create revocations for 2 subs 514 let revocations = vec![SubPassRevocation::new(), SubPassRevocation::new()]; 515 516 revocate_transparent_subs(None, &mut data, revocations); 517 518 // Should have removed 2 subscriptions 519 assert_eq!(data.num_subs(), 1); 520 assert_eq!(data.queue.len(), 2); 521 } 522 523 #[test] 524 fn revocate_transparent_subs_empty_revocations() { 525 let mut data = TransparentData::default(); 526 let mut guardian = SubPassGuardian::new(2); 527 let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter()); 528 529 { 530 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 531 relay.subscribe(subs.view(&OutboxSubId(0)).unwrap()); 532 } 533 534 // No revocations 535 let revocations: Vec<SubPassRevocation> = vec![]; 536 revocate_transparent_subs(None, &mut data, revocations); 537 538 // Nothing should change 539 assert_eq!(data.num_subs(), 1); 540 } 541 542 #[test] 543 fn revocate_transparent_subs_exactly_matching() { 544 // Test with exactly matching number of revocations and subscriptions 545 let mut data = TransparentData::default(); 546 let mut guardian = SubPassGuardian::new(3); 547 let mut subs = OutboxSubscriptions::default(); 548 insert_sub(&mut subs, OutboxSubId(0), trivial_filter(), false); 549 insert_sub(&mut subs, OutboxSubId(1), trivial_filter(), false); 550 insert_sub(&mut subs, OutboxSubId(2), trivial_filter(), false); 551 552 // Create 3 subscriptions 553 { 554 let mut relay = TransparentRelay::new(None, &mut data, &mut guardian); 555 relay.subscribe(subs.view(&OutboxSubId(0)).unwrap()); 556 relay.subscribe(subs.view(&OutboxSubId(1)).unwrap()); 557 relay.subscribe(subs.view(&OutboxSubId(2)).unwrap()); 558 } 559 560 assert_eq!(data.num_subs(), 3); 561 assert_eq!(guardian.available_passes(), 0); 562 563 // Create exactly 3 revocations 564 let revocations = vec![ 565 SubPassRevocation::new(), 566 SubPassRevocation::new(), 567 SubPassRevocation::new(), 568 ]; 569 570 // This should revoke all subscriptions 571 revocate_transparent_subs(None, &mut data, revocations); 572 573 assert_eq!(data.num_subs(), 0); 574 assert_eq!(data.queue.len(), 3); 575 } 576 }