compaction.rs (32467B)
1 use std::collections::HashMap; 2 3 use hashbrown::HashSet; 4 use nostrdb::Filter; 5 6 use crate::{ 7 relay::{ 8 websocket::WebsocketRelay, OutboxSubId, OutboxSubscriptions, QueuedTasks, RelayReqId, 9 RelayReqStatus, RelayTask, SubPass, SubPassGuardian, SubPassRevocation, 10 }, 11 ClientMessage, 12 }; 13 14 /// CompactionData tracks every compaction REQ on a relay along with the 15 /// Outbox sub ids routed into it. 16 #[derive(Default)] 17 pub struct CompactionData { 18 request_to_sid: HashMap<OutboxSubId, RelayReqId>, // we never split outbox subs over multiple REQs 19 relay_subs: HashMap<RelayReqId, RelaySubData>, // UUID 20 queue: QueuedTasks, 21 } 22 23 impl CompactionData { 24 #[allow(dead_code)] 25 pub fn num_subs(&self) -> usize { 26 self.relay_subs.len() 27 } 28 29 pub fn set_req_status(&mut self, sid: &str, status: RelayReqStatus) { 30 let Some(data) = self.relay_subs.get_mut(sid) else { 31 return; 32 }; 33 34 data.status = status; 35 } 36 37 pub fn req_status(&self, id: &OutboxSubId) -> Option<RelayReqStatus> { 38 let sid = self.request_to_sid.get(id)?; 39 let data = self.relay_subs.get(sid)?; 40 Some(data.status) 41 } 42 43 #[allow(dead_code)] 44 pub fn has_eose(&self, id: &OutboxSubId) -> bool { 45 self.req_status(id) == Some(RelayReqStatus::Eose) 46 } 47 48 /// Returns the OutboxSubIds associated with the given relay subscription ID. 49 pub fn ids(&self, sid: &RelayReqId) -> Option<&HashSet<OutboxSubId>> { 50 self.relay_subs.get(sid).map(|d| &d.requests.requests) 51 } 52 } 53 54 /// Ensures `max_subs` REQ to the websocket relay by "compacting" subscriptions (combining multiple requests into one) 55 pub struct CompactionRelay<'a> { 56 ctx: CompactionCtx<'a>, 57 sub_guardian: &'a mut SubPassGuardian, 58 json_limit: usize, 59 } 60 61 /// CompactionRelay ensures multiple Outbox subscriptions are packed into as few 62 /// REQs as possible, respecting per-relay limits. 63 impl<'a> CompactionRelay<'a> { 64 pub fn new( 65 relay: Option<&'a mut WebsocketRelay>, 66 data: &'a mut CompactionData, 67 json_limit: usize, 68 sub_guardian: &'a mut SubPassGuardian, 69 subs: &'a OutboxSubscriptions, 70 ) -> Self { 71 let ctx = match relay { 72 Some(relay) => CompactionCtx::Active(CompactionHandler::new(relay, data, subs)), 73 None => CompactionCtx::Inactive { 74 data, 75 session: CompactionSubSession::default(), 76 subs, 77 }, 78 }; 79 Self { 80 ctx, 81 sub_guardian, 82 json_limit, 83 } 84 } 85 86 #[profiling::function] 87 pub fn ingest_session(mut self, session: CompactionSession) { 88 let request_free = session.request_free; 89 let mut reserved: Vec<SubPass> = Vec::new(); 90 91 // Reserve passes - take from guardian or compact to free them 92 while reserved.len() < request_free { 93 if let Some(pass) = self.sub_guardian.take_pass() { 94 reserved.push(pass); 95 } else if let Some(ejected_pass) = self.compact() { 96 reserved.push(ejected_pass); 97 } else { 98 break; 99 } 100 } 101 102 // Process session (can't touch reserved passes) 103 self.ingest_session_internal(session); 104 105 // Drain queue 106 { 107 profiling::scope!("drain queue"); 108 loop { 109 let Some(id) = self.ctx.data().queue.pop() else { 110 break; 111 }; 112 if self.subscribe(id) == PlaceResult::Queued { 113 break; 114 } 115 } 116 } 117 118 // Return reserved passes 119 for pass in reserved { 120 self.sub_guardian.return_pass(pass); 121 } 122 } 123 124 #[profiling::function] 125 fn ingest_session_internal(&mut self, session: CompactionSession) { 126 for (id, task) in session.tasks { 127 match task { 128 RelayTask::Unsubscribe => { 129 self.unsubscribe(id); 130 } 131 RelayTask::Subscribe => { 132 self.subscribe(id); 133 } 134 } 135 } 136 } 137 138 #[profiling::function] 139 pub fn handle_relay_open(&mut self) { 140 let CompactionCtx::Active(handler) = &mut self.ctx else { 141 return; 142 }; 143 144 if !handler.relay.is_connected() { 145 return; 146 } 147 148 for (sid, sub_data) in &handler.data.relay_subs { 149 let filters = handler.subs.filters_all(&sub_data.requests.requests); 150 if are_filters_empty(&filters) { 151 continue; 152 } 153 154 handler 155 .relay 156 .conn 157 .send(&ClientMessage::req(sid.to_string(), filters)); 158 } 159 } 160 161 #[allow(dead_code)] 162 pub fn revocate(&mut self, mut revocation: SubPassRevocation) { 163 let Some(pass) = self.compact() else { 164 // this shouldn't be possible 165 return; 166 }; 167 168 revocation.revocate(pass); 169 } 170 171 #[allow(dead_code)] 172 pub fn revocate_all(&mut self, revocations: Vec<SubPassRevocation>) { 173 for revocation in revocations { 174 self.revocate(revocation); 175 } 176 } 177 178 #[profiling::function] 179 fn compact(&mut self) -> Option<SubPass> { 180 let SharedCtx { 181 data, 182 session, 183 subs, 184 } = self.ctx.shared(); 185 186 let (id, smallest) = take_smallest_sub_reqs(subs, &mut data.relay_subs)?; 187 188 session.tasks.insert(id, SubSessionTask::Removed); 189 for id in smallest.requests.requests { 190 self.ctx.data().request_to_sid.remove(&id); 191 self.place(id); 192 } 193 194 Some(smallest.sub_pass) 195 } 196 197 #[profiling::function] 198 fn new_sub(&mut self, id: OutboxSubId) -> PlaceResult { 199 let Some(new_pass) = self.sub_guardian.take_pass() else { 200 // pass not available, try to place on an existing sub 201 return self.place(id); 202 }; 203 204 let relay_id = RelayReqId::default(); 205 let mut requests = SubRequests::default(); 206 requests.add(id); 207 208 let SharedCtx { 209 data, 210 session, 211 subs: _, 212 } = self.ctx.shared(); 213 data.relay_subs.insert( 214 relay_id.clone(), 215 RelaySubData { 216 requests, 217 status: RelayReqStatus::InitialQuery, 218 sub_pass: new_pass, 219 }, 220 ); 221 data.request_to_sid.insert(id, relay_id.clone()); 222 session.tasks.insert(relay_id, SubSessionTask::New); 223 PlaceResult::Placed 224 } 225 226 #[profiling::function] 227 pub fn subscribe(&mut self, id: OutboxSubId) -> PlaceResult { 228 let SharedCtx { 229 data, 230 session, 231 subs: _, 232 } = self.ctx.shared(); 233 let Some(relay_id) = data.request_to_sid.get(&id) else { 234 return self.new_sub(id); 235 }; 236 237 let Some(sub_data) = data.relay_subs.get_mut(relay_id) else { 238 return self.new_sub(id); 239 }; 240 241 // modifying a filter 242 sub_data.requests.add(id); 243 244 sub_data.status = RelayReqStatus::InitialQuery; 245 246 session 247 .tasks 248 .insert(relay_id.clone(), SubSessionTask::Touched); 249 tracing::debug!("Placed {id:?} on an existing subscription: {relay_id:?}"); 250 PlaceResult::Placed 251 } 252 253 #[profiling::function] 254 pub fn unsubscribe(&mut self, id: OutboxSubId) { 255 let SharedCtx { 256 data: compaction_data, 257 session, 258 subs: _, 259 } = self.ctx.shared(); 260 let Some(relay_id) = compaction_data.request_to_sid.remove(&id) else { 261 compaction_data.queue.add(id, RelayTask::Unsubscribe); 262 return; 263 }; 264 265 let Some(data) = compaction_data.relay_subs.get_mut(&relay_id) else { 266 compaction_data.queue.add(id, RelayTask::Unsubscribe); 267 return; 268 }; 269 270 data.status = RelayReqStatus::InitialQuery; 271 272 if !data.requests.remove(&id) { 273 return; 274 } 275 276 if !data.requests.is_empty() { 277 session 278 .tasks 279 .insert(relay_id.clone(), SubSessionTask::Touched); 280 return; 281 } 282 283 let Some(data) = compaction_data.relay_subs.remove(&relay_id) else { 284 return; 285 }; 286 287 self.sub_guardian.return_pass(data.sub_pass); 288 tracing::debug!("Unsubed from last internal id in REQ, returning pass"); 289 session 290 .tasks 291 .insert(relay_id.clone(), SubSessionTask::Removed); 292 } 293 294 #[profiling::function] 295 fn place(&mut self, id: OutboxSubId) -> PlaceResult { 296 let SharedCtx { 297 data, 298 session, 299 subs, 300 } = self.ctx.shared(); 301 let placed_on = 'place: { 302 for (relay_id, relay_data) in &mut data.relay_subs { 303 if !relay_data.requests.can_fit(subs, &id, self.json_limit) { 304 continue; 305 } 306 307 session 308 .tasks 309 .insert(relay_id.clone(), SubSessionTask::Touched); 310 relay_data.requests.add(id); 311 break 'place Some(relay_id.clone()); 312 } 313 314 None 315 }; 316 317 if let Some(relay_id) = placed_on { 318 data.request_to_sid.insert(id, relay_id); 319 return PlaceResult::Placed; 320 } 321 322 data.queue.add(id, RelayTask::Subscribe); 323 PlaceResult::Queued 324 } 325 } 326 327 #[derive(Debug, PartialEq, Eq)] 328 pub enum PlaceResult { 329 Placed, 330 Queued, 331 } 332 333 fn take_smallest_sub_reqs( 334 subs: &OutboxSubscriptions, 335 data: &mut HashMap<RelayReqId, RelaySubData>, 336 ) -> Option<(RelayReqId, RelaySubData)> { 337 let mut smallest = usize::MAX; 338 let mut res = None; 339 340 for (id, d) in data.iter() { 341 let cur_size = subs.json_size_sum(&d.requests.requests); 342 if cur_size < smallest { 343 smallest = cur_size; 344 res = Some(id.clone()); 345 } 346 } 347 348 let id = res?; 349 350 data.remove(&id).map(|r| (id, r)) 351 } 352 353 #[derive(Default)] 354 struct CompactionSubSession { 355 tasks: HashMap<RelayReqId, SubSessionTask>, 356 } 357 358 enum SubSessionTask { 359 New, 360 Touched, 361 Removed, 362 } 363 364 enum CompactionCtx<'a> { 365 Active(CompactionHandler<'a>), 366 Inactive { 367 data: &'a mut CompactionData, 368 session: CompactionSubSession, 369 subs: &'a OutboxSubscriptions, 370 }, 371 } 372 373 impl<'a> CompactionCtx<'a> { 374 #[profiling::function] 375 pub fn shared(&mut self) -> SharedCtx<'_> { 376 match self { 377 CompactionCtx::Active(compaction_handler) => SharedCtx { 378 data: compaction_handler.data, 379 session: &mut compaction_handler.session, 380 subs: compaction_handler.subs, 381 }, 382 CompactionCtx::Inactive { 383 data, 384 session, 385 subs, 386 } => SharedCtx { 387 data, 388 session, 389 subs, 390 }, 391 } 392 } 393 394 pub fn data(&mut self) -> &mut CompactionData { 395 match self { 396 CompactionCtx::Active(compaction_handler) => compaction_handler.data, 397 CompactionCtx::Inactive { 398 data, 399 session: _, 400 subs: _, 401 } => data, 402 } 403 } 404 } 405 struct SharedCtx<'a> { 406 data: &'a mut CompactionData, 407 session: &'a mut CompactionSubSession, 408 subs: &'a OutboxSubscriptions, 409 } 410 411 struct CompactionHandler<'a> { 412 relay: &'a mut WebsocketRelay, 413 data: &'a mut CompactionData, 414 subs: &'a OutboxSubscriptions, 415 pub session: CompactionSubSession, 416 } 417 418 impl<'a> Drop for CompactionHandler<'a> { 419 #[profiling::function] 420 fn drop(&mut self) { 421 for (id, task) in &self.session.tasks { 422 match task { 423 SubSessionTask::Touched => { 424 let Some(data) = self.data.relay_subs.get_mut(id) else { 425 continue; 426 }; 427 428 let filters = self.subs.filters_all(&data.requests.requests); 429 430 if filters.is_empty() { 431 self.relay.conn.send(&ClientMessage::close(id.0.clone())); 432 } else { 433 self.relay 434 .conn 435 .send(&ClientMessage::req(id.0.clone(), filters)); 436 } 437 } 438 SubSessionTask::Removed => { 439 self.relay.conn.send(&ClientMessage::close(id.0.clone())); 440 } 441 SubSessionTask::New => { 442 let Some(data) = self.data.relay_subs.get(id) else { 443 continue; 444 }; 445 446 let filters = self.subs.filters_all(&data.requests.requests); 447 self.relay 448 .conn 449 .send(&ClientMessage::req(id.0.clone(), filters)); 450 } 451 } 452 } 453 } 454 } 455 456 fn are_filters_empty(filters: &Vec<Filter>) -> bool { 457 if filters.is_empty() { 458 return true; 459 } 460 461 for filter in filters { 462 if filter.num_elements() != 0 { 463 return false; 464 } 465 } 466 467 true 468 } 469 470 impl<'a> CompactionHandler<'a> { 471 pub fn new( 472 relay: &'a mut WebsocketRelay, 473 data: &'a mut CompactionData, 474 subs: &'a OutboxSubscriptions, 475 ) -> Self { 476 Self { 477 relay, 478 data, 479 session: CompactionSubSession::default(), 480 subs, 481 } 482 } 483 } 484 485 /// Represents a singular REQ to a relay 486 struct RelaySubData { 487 requests: SubRequests, 488 status: RelayReqStatus, 489 sub_pass: SubPass, 490 } 491 492 #[derive(Default)] 493 struct SubRequests { 494 pub requests: HashSet<OutboxSubId>, 495 } 496 497 impl SubRequests { 498 #[profiling::function] 499 pub fn add(&mut self, id: OutboxSubId) { 500 self.requests.insert(id); 501 } 502 503 pub fn remove(&mut self, id: &OutboxSubId) -> bool { 504 self.requests.remove(id) 505 } 506 507 pub fn is_empty(&self) -> bool { 508 self.requests.is_empty() 509 } 510 511 pub fn can_fit( 512 &self, 513 subs: &OutboxSubscriptions, 514 new: &OutboxSubId, 515 json_limit: usize, 516 ) -> bool { 517 let Some(new_size) = subs.json_size(new) else { 518 return true; 519 }; 520 521 let cur_json_size = subs.json_size_sum(&self.requests); 522 523 // `["REQ","abc...123"]`; 524 // 12345678 ... 90 -> 10 characters excluding the UUID 525 cur_json_size + new_size + 10 + RelayReqId::byte_len() <= json_limit 526 } 527 } 528 529 #[derive(Default)] 530 pub struct CompactionSession { 531 // Number of subs which should be free after ingestion. Subs will compact enough to free up that number of subs 532 // OR as much as possible without dropping any existing subs 533 request_free: usize, 534 tasks: HashMap<OutboxSubId, RelayTask>, 535 } 536 537 impl CompactionSession { 538 pub fn request_free_subs(&mut self, num_free: usize) { 539 self.request_free = num_free; 540 } 541 542 pub fn unsub(&mut self, unsub: OutboxSubId) { 543 self.tasks.insert(unsub, RelayTask::Unsubscribe); 544 } 545 546 pub fn sub(&mut self, id: OutboxSubId) { 547 self.tasks.insert(id, RelayTask::Subscribe); 548 } 549 550 pub fn is_empty(&self) -> bool { 551 self.tasks.is_empty() && self.request_free == 0 552 } 553 } 554 555 #[cfg(test)] 556 mod tests { 557 use super::*; 558 use crate::relay::{RelayUrlPkgs, SubscribeTask}; 559 use hashbrown::HashSet; 560 561 // ==================== CompactionData tests ==================== 562 563 #[test] 564 fn compaction_data_default_empty() { 565 let data = CompactionData::default(); 566 assert_eq!(data.num_subs(), 0); 567 } 568 569 #[test] 570 fn compaction_data_req_status_none_for_unknown() { 571 let data = CompactionData::default(); 572 assert!(data.req_status(&OutboxSubId(999)).is_none()); 573 } 574 575 #[test] 576 fn compaction_data_has_eose_false_for_unknown() { 577 let data = CompactionData::default(); 578 assert!(!data.has_eose(&OutboxSubId(999))); 579 } 580 581 #[test] 582 fn compaction_data_set_req_status_ignores_unknown_sid() { 583 let mut data = CompactionData::default(); 584 // Should not panic or error when setting status for unknown sid 585 data.set_req_status("unknown-sid", RelayReqStatus::Eose); 586 } 587 588 #[test] 589 fn compaction_data_ids_returns_sub_ids() { 590 let mut data = CompactionData::default(); 591 let mut guardian = SubPassGuardian::new(1); 592 let pass = guardian.take_pass().unwrap(); 593 594 let id = OutboxSubId(7); 595 let relay_id = RelayReqId::from("req-123"); 596 let mut requests = SubRequests::default(); 597 requests.add(id); 598 data.relay_subs.insert( 599 relay_id.clone(), 600 RelaySubData { 601 requests, 602 status: RelayReqStatus::InitialQuery, 603 sub_pass: pass, 604 }, 605 ); 606 607 let ids = data.ids(&relay_id); 608 assert!(ids.is_some()); 609 assert!(ids.unwrap().contains(&id)); 610 } 611 612 #[test] 613 fn compaction_data_set_req_status_updates_status() { 614 let mut data = CompactionData::default(); 615 616 // Manually set up a relay subscription 617 let relay_id = RelayReqId::from("test-sid"); 618 let mut guardian = SubPassGuardian::new(1); 619 let pass = guardian.take_pass().unwrap(); 620 621 data.relay_subs.insert( 622 relay_id.clone(), 623 RelaySubData { 624 requests: SubRequests::default(), 625 status: RelayReqStatus::InitialQuery, 626 sub_pass: pass, 627 }, 628 ); 629 630 // Set EOSE should update status 631 data.set_req_status("test-sid", RelayReqStatus::Eose); 632 633 // Verify status was set 634 let sub_data = data.relay_subs.get(&relay_id).unwrap(); 635 assert_eq!(sub_data.status, RelayReqStatus::Eose); 636 } 637 638 // ==================== SubRequests tests ==================== 639 640 /// can_fit returns true when combined JSON size is under the limit. 641 #[test] 642 fn sub_requests_can_fit() { 643 use crate::relay::{RelayUrlPkgs, SubscribeTask}; 644 use hashbrown::HashSet; 645 646 let mut subs = OutboxSubscriptions::default(); 647 subs.new_subscription( 648 OutboxSubId(0), 649 SubscribeTask { 650 filters: vec![Filter::new().kinds(vec![1]).build()], 651 relays: RelayUrlPkgs::new(HashSet::new()), 652 }, 653 false, 654 ); 655 656 let requests = SubRequests::default(); 657 658 assert!(requests.can_fit(&subs, &OutboxSubId(0), 1_000_000)); 659 assert!(!requests.can_fit(&subs, &OutboxSubId(0), 5)); 660 } 661 662 // ==================== CompactionSession tests ==================== 663 664 #[test] 665 fn compaction_session_default() { 666 let session = CompactionSession::default(); 667 assert_eq!(session.request_free, 0); 668 assert!(session.tasks.is_empty()); 669 } 670 671 #[test] 672 fn compaction_session_unsub() { 673 let mut session = CompactionSession::default(); 674 session.unsub(OutboxSubId(42)); 675 676 assert!(session.tasks.contains_key(&OutboxSubId(42))); 677 match session.tasks.get(&OutboxSubId(42)) { 678 Some(RelayTask::Unsubscribe) => (), 679 _ => panic!("Expected Unsubscribe task"), 680 } 681 } 682 683 #[test] 684 fn compaction_session_sub() { 685 let mut session = CompactionSession::default(); 686 session.sub(OutboxSubId(1)); 687 688 assert!(session.tasks.contains_key(&OutboxSubId(1))); 689 assert!(matches!( 690 session.tasks.get(&OutboxSubId(1)), 691 Some(RelayTask::Subscribe) 692 )); 693 } 694 695 // ==================== take_smallest_sub_reqs tests ==================== 696 697 #[test] 698 fn take_smallest_returns_none_for_empty() { 699 let subs = OutboxSubscriptions::default(); 700 let mut data: HashMap<RelayReqId, RelaySubData> = HashMap::new(); 701 assert!(take_smallest_sub_reqs(&subs, &mut data).is_none()); 702 } 703 704 /// Returns the relay sub with the smallest combined JSON size. 705 #[test] 706 fn take_smallest_returns_smallest_by_json_size() { 707 use crate::relay::{RelayUrlPkgs, SubscribeTask}; 708 use hashbrown::HashSet; 709 710 // Register subscriptions with different JSON sizes 711 let mut subs = OutboxSubscriptions::default(); 712 subs.new_subscription( 713 OutboxSubId(0), 714 SubscribeTask { 715 filters: vec![Filter::new().kinds(vec![1]).build()], 716 relays: RelayUrlPkgs::new(HashSet::new()), 717 }, 718 false, 719 ); 720 subs.new_subscription( 721 OutboxSubId(1), 722 SubscribeTask { 723 filters: vec![Filter::new() 724 .kinds(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) 725 .build()], 726 relays: RelayUrlPkgs::new(HashSet::new()), 727 }, 728 false, 729 ); 730 731 let mut guardian = SubPassGuardian::new(2); 732 733 // Small relay sub contains id 0 734 let mut small_requests = SubRequests::default(); 735 small_requests.add(OutboxSubId(0)); 736 737 // Large relay sub contains id 1 738 let mut large_requests = SubRequests::default(); 739 large_requests.add(OutboxSubId(1)); 740 741 let mut data: HashMap<RelayReqId, RelaySubData> = HashMap::new(); 742 data.insert( 743 RelayReqId::from("small"), 744 RelaySubData { 745 requests: small_requests, 746 status: RelayReqStatus::InitialQuery, 747 sub_pass: guardian.take_pass().unwrap(), 748 }, 749 ); 750 data.insert( 751 RelayReqId::from("large"), 752 RelaySubData { 753 requests: large_requests, 754 status: RelayReqStatus::InitialQuery, 755 sub_pass: guardian.take_pass().unwrap(), 756 }, 757 ); 758 759 let (id, _) = take_smallest_sub_reqs(&subs, &mut data).unwrap(); 760 assert_eq!(id.0, "small"); 761 assert_eq!(data.len(), 1); 762 } 763 764 #[test] 765 fn take_smallest_removes_from_map() { 766 let subs = OutboxSubscriptions::default(); 767 let mut data: HashMap<RelayReqId, RelaySubData> = HashMap::new(); 768 let mut guardian = SubPassGuardian::new(1); 769 770 data.insert( 771 RelayReqId::from("only"), 772 RelaySubData { 773 requests: SubRequests::default(), 774 status: RelayReqStatus::InitialQuery, 775 sub_pass: guardian.take_pass().unwrap(), 776 }, 777 ); 778 779 let result = take_smallest_sub_reqs(&subs, &mut data); 780 assert!(result.is_some()); 781 assert!(data.is_empty()); 782 } 783 784 // ==================== CompactionRelay tests ==================== 785 786 /// Requesting free subs when there's nothing to compact has no effect. 787 #[test] 788 fn compact_returns_none_when_no_subs() { 789 let subs = OutboxSubscriptions::default(); 790 let mut data = CompactionData::default(); 791 let mut guardian = SubPassGuardian::new(5); 792 let json_limit = 100000; 793 794 let initial_passes = guardian.available_passes(); 795 796 let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs); 797 let mut session = CompactionSession::default(); 798 session.request_free_subs(1); 799 relay.ingest_session(session); 800 801 assert_eq!(guardian.available_passes(), initial_passes); 802 } 803 804 /// Compacting frees a pass and redistributes requests to remaining subs. 805 #[test] 806 fn compact_frees_pass_and_redistributes() { 807 use crate::relay::{RelayUrlPkgs, SubscribeTask}; 808 use hashbrown::HashSet; 809 810 let mut subs = OutboxSubscriptions::default(); 811 subs.new_subscription( 812 OutboxSubId(0), 813 SubscribeTask { 814 filters: vec![Filter::new().kinds(vec![1]).build()], 815 relays: RelayUrlPkgs::new(HashSet::new()), 816 }, 817 false, 818 ); 819 subs.new_subscription( 820 OutboxSubId(1), 821 SubscribeTask { 822 filters: vec![Filter::new() 823 .kinds(vec![2, 3, 4, 5, 6, 7, 8, 9, 10]) 824 .build()], 825 relays: RelayUrlPkgs::new(HashSet::new()), 826 }, 827 false, 828 ); 829 830 let mut data = CompactionData::default(); 831 let mut guardian = SubPassGuardian::new(5); 832 let json_limit = 100000; 833 834 // Create 2 relay subs 835 let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs); 836 let mut session = CompactionSession::default(); 837 session.sub(OutboxSubId(0)); 838 session.sub(OutboxSubId(1)); 839 relay.ingest_session(session); 840 841 assert_eq!(data.relay_subs.len(), 2); 842 assert_eq!(guardian.available_passes(), 3); // 5 - 2 843 844 // Request 4 free passes - must compact 1 845 let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs); 846 let mut session = CompactionSession::default(); 847 session.request_free_subs(4); 848 relay.ingest_session(session); 849 850 assert_eq!(data.relay_subs.len(), 1); 851 assert_eq!(guardian.available_passes(), 4); 852 853 let remaining = data.relay_subs.values().next().unwrap(); 854 assert_eq!(remaining.requests.requests.len(), 2); 855 } 856 857 /// When compaction redistributes a request but the remaining sub 858 /// doesn't have room, the request goes to the queue. 859 #[test] 860 fn place_queues_when_no_room() { 861 use crate::relay::{RelayUrlPkgs, SubscribeTask}; 862 use hashbrown::HashSet; 863 864 let mut subs = OutboxSubscriptions::default(); 865 subs.new_subscription( 866 OutboxSubId(0), 867 SubscribeTask { 868 filters: vec![Filter::new().kinds(vec![1]).build()], 869 relays: RelayUrlPkgs::new(HashSet::new()), 870 }, 871 false, 872 ); 873 subs.new_subscription( 874 OutboxSubId(1), 875 SubscribeTask { 876 filters: vec![Filter::new().kinds(vec![2]).build()], 877 relays: RelayUrlPkgs::new(HashSet::new()), 878 }, 879 false, 880 ); 881 882 // Set limit so combined filters won't fit in one REQ 883 let size0 = subs.json_size(&OutboxSubId(0)).unwrap(); 884 let size1 = subs.json_size(&OutboxSubId(1)).unwrap(); 885 let json_limit = size0 + size1 - 1; 886 887 let mut data = CompactionData::default(); 888 let mut guardian = SubPassGuardian::new(2); 889 890 // Create 2 relay subs at capacity 891 let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs); 892 let mut session = CompactionSession::default(); 893 session.sub(OutboxSubId(0)); 894 session.sub(OutboxSubId(1)); 895 relay.ingest_session(session); 896 897 assert_eq!(data.relay_subs.len(), 2); 898 assert!(data.queue.is_empty()); 899 900 // Compact 1 - redistributed request won't fit 901 let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs); 902 let mut session = CompactionSession::default(); 903 session.request_free_subs(1); 904 relay.ingest_session(session); 905 906 assert_eq!(data.relay_subs.len(), 1); 907 assert!(!data.queue.is_empty()); 908 } 909 910 /// When no passes are available, requests are placed on existing relay subs. 911 #[test] 912 fn new_sub_places_on_existing_when_no_passes() { 913 use crate::relay::{RelayUrlPkgs, SubscribeTask}; 914 use hashbrown::HashSet; 915 916 let mut subs = OutboxSubscriptions::default(); 917 subs.new_subscription( 918 OutboxSubId(0), 919 SubscribeTask { 920 filters: vec![Filter::new().kinds(vec![1]).build()], 921 relays: RelayUrlPkgs::new(HashSet::new()), 922 }, 923 false, 924 ); 925 subs.new_subscription( 926 OutboxSubId(1), 927 SubscribeTask { 928 filters: vec![Filter::new().kinds(vec![2]).build()], 929 relays: RelayUrlPkgs::new(HashSet::new()), 930 }, 931 false, 932 ); 933 934 let mut data = CompactionData::default(); 935 let mut guardian = SubPassGuardian::new(1); // Only 1 pass 936 let json_limit = 100000; 937 938 // Add 2 requests with only 1 pass - second must go on existing 939 let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs); 940 let mut session = CompactionSession::default(); 941 session.sub(OutboxSubId(0)); 942 session.sub(OutboxSubId(1)); 943 relay.ingest_session(session); 944 945 assert_eq!(data.relay_subs.len(), 1); 946 let sub = data.relay_subs.values().next().unwrap(); 947 assert_eq!(sub.requests.requests.len(), 2); 948 } 949 950 /// Subscriptions placed onto an existing compacted REQ must register 951 /// request-to-relay mapping so a later unsubscribe updates the correct REQ. 952 #[test] 953 fn unsubscribe_after_place_on_existing_removes_request() { 954 use crate::relay::{RelayUrlPkgs, SubscribeTask}; 955 use hashbrown::HashSet; 956 957 let mut subs = OutboxSubscriptions::default(); 958 subs.new_subscription( 959 OutboxSubId(0), 960 SubscribeTask { 961 filters: vec![Filter::new().kinds(vec![1]).build()], 962 relays: RelayUrlPkgs::new(HashSet::new()), 963 }, 964 false, 965 ); 966 subs.new_subscription( 967 OutboxSubId(1), 968 SubscribeTask { 969 filters: vec![Filter::new().kinds(vec![2]).build()], 970 relays: RelayUrlPkgs::new(HashSet::new()), 971 }, 972 false, 973 ); 974 975 let mut data = CompactionData::default(); 976 let mut guardian = SubPassGuardian::new(1); // Force second sub onto existing REQ 977 let json_limit = 100000; 978 979 let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs); 980 let mut session = CompactionSession::default(); 981 session.sub(OutboxSubId(0)); 982 session.sub(OutboxSubId(1)); 983 relay.ingest_session(session); 984 985 assert_eq!(data.relay_subs.len(), 1); 986 let relay_id = data.relay_subs.keys().next().cloned().unwrap(); 987 assert_eq!(data.request_to_sid.get(&OutboxSubId(0)), Some(&relay_id)); 988 assert_eq!(data.request_to_sid.get(&OutboxSubId(1)), Some(&relay_id)); 989 990 let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs); 991 let mut session = CompactionSession::default(); 992 session.unsub(OutboxSubId(1)); 993 relay.ingest_session(session); 994 995 assert!(data.queue.is_empty()); 996 assert_eq!(data.relay_subs.len(), 1); 997 let sub = data.relay_subs.get(&relay_id).unwrap(); 998 assert_eq!(sub.requests.requests.len(), 1); 999 assert!(sub.requests.requests.contains(&OutboxSubId(0))); 1000 assert!(!sub.requests.requests.contains(&OutboxSubId(1))); 1001 assert_eq!(data.request_to_sid.get(&OutboxSubId(0)), Some(&relay_id)); 1002 assert!(!data.request_to_sid.contains_key(&OutboxSubId(1))); 1003 } 1004 1005 /// When requesting multiple free passes, multiple subs are compacted 1006 /// and all requests are consolidated into fewer relay subs. 1007 #[test] 1008 fn compact_multiple_subs() { 1009 let mut data = CompactionData::default(); 1010 let mut guardian = SubPassGuardian::new(3); 1011 let json_limit = 100000; 1012 let mut subs = OutboxSubscriptions::default(); 1013 for i in 0..3 { 1014 subs.new_subscription( 1015 OutboxSubId(i), 1016 SubscribeTask { 1017 filters: vec![Filter::new().kinds(vec![i as u64 + 1]).build()], 1018 relays: RelayUrlPkgs::new(HashSet::new()), 1019 }, 1020 false, 1021 ); 1022 } 1023 1024 // Create 3 subs and request 2 free in same session 1025 let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs); 1026 let mut session = CompactionSession::default(); 1027 for i in 0..3 { 1028 session.sub(OutboxSubId(i)); 1029 } 1030 session.request_free_subs(2); 1031 relay.ingest_session(session); 1032 1033 // Should compact down to 1 sub with all 3 requests 1034 assert_eq!(data.relay_subs.len(), 1); 1035 assert_eq!(guardian.available_passes(), 2); 1036 1037 let sub = data.relay_subs.values().next().unwrap(); 1038 assert_eq!(sub.requests.requests.len(), 3); 1039 } 1040 }