notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

compaction.rs (32467B)


      1 use std::collections::HashMap;
      2 
      3 use hashbrown::HashSet;
      4 use nostrdb::Filter;
      5 
      6 use crate::{
      7     relay::{
      8         websocket::WebsocketRelay, OutboxSubId, OutboxSubscriptions, QueuedTasks, RelayReqId,
      9         RelayReqStatus, RelayTask, SubPass, SubPassGuardian, SubPassRevocation,
     10     },
     11     ClientMessage,
     12 };
     13 
     14 /// CompactionData tracks every compaction REQ on a relay along with the
     15 /// Outbox sub ids routed into it.
     16 #[derive(Default)]
     17 pub struct CompactionData {
     18     request_to_sid: HashMap<OutboxSubId, RelayReqId>, // we never split outbox subs over multiple REQs
     19     relay_subs: HashMap<RelayReqId, RelaySubData>,    // UUID
     20     queue: QueuedTasks,
     21 }
     22 
     23 impl CompactionData {
     24     #[allow(dead_code)]
     25     pub fn num_subs(&self) -> usize {
     26         self.relay_subs.len()
     27     }
     28 
     29     pub fn set_req_status(&mut self, sid: &str, status: RelayReqStatus) {
     30         let Some(data) = self.relay_subs.get_mut(sid) else {
     31             return;
     32         };
     33 
     34         data.status = status;
     35     }
     36 
     37     pub fn req_status(&self, id: &OutboxSubId) -> Option<RelayReqStatus> {
     38         let sid = self.request_to_sid.get(id)?;
     39         let data = self.relay_subs.get(sid)?;
     40         Some(data.status)
     41     }
     42 
     43     #[allow(dead_code)]
     44     pub fn has_eose(&self, id: &OutboxSubId) -> bool {
     45         self.req_status(id) == Some(RelayReqStatus::Eose)
     46     }
     47 
     48     /// Returns the OutboxSubIds associated with the given relay subscription ID.
     49     pub fn ids(&self, sid: &RelayReqId) -> Option<&HashSet<OutboxSubId>> {
     50         self.relay_subs.get(sid).map(|d| &d.requests.requests)
     51     }
     52 }
     53 
     54 /// Ensures `max_subs` REQ to the websocket relay by "compacting" subscriptions (combining multiple requests into one)
     55 pub struct CompactionRelay<'a> {
     56     ctx: CompactionCtx<'a>,
     57     sub_guardian: &'a mut SubPassGuardian,
     58     json_limit: usize,
     59 }
     60 
     61 /// CompactionRelay ensures multiple Outbox subscriptions are packed into as few
     62 /// REQs as possible, respecting per-relay limits.
     63 impl<'a> CompactionRelay<'a> {
     64     pub fn new(
     65         relay: Option<&'a mut WebsocketRelay>,
     66         data: &'a mut CompactionData,
     67         json_limit: usize,
     68         sub_guardian: &'a mut SubPassGuardian,
     69         subs: &'a OutboxSubscriptions,
     70     ) -> Self {
     71         let ctx = match relay {
     72             Some(relay) => CompactionCtx::Active(CompactionHandler::new(relay, data, subs)),
     73             None => CompactionCtx::Inactive {
     74                 data,
     75                 session: CompactionSubSession::default(),
     76                 subs,
     77             },
     78         };
     79         Self {
     80             ctx,
     81             sub_guardian,
     82             json_limit,
     83         }
     84     }
     85 
     86     #[profiling::function]
     87     pub fn ingest_session(mut self, session: CompactionSession) {
     88         let request_free = session.request_free;
     89         let mut reserved: Vec<SubPass> = Vec::new();
     90 
     91         // Reserve passes - take from guardian or compact to free them
     92         while reserved.len() < request_free {
     93             if let Some(pass) = self.sub_guardian.take_pass() {
     94                 reserved.push(pass);
     95             } else if let Some(ejected_pass) = self.compact() {
     96                 reserved.push(ejected_pass);
     97             } else {
     98                 break;
     99             }
    100         }
    101 
    102         // Process session (can't touch reserved passes)
    103         self.ingest_session_internal(session);
    104 
    105         // Drain queue
    106         {
    107             profiling::scope!("drain queue");
    108             loop {
    109                 let Some(id) = self.ctx.data().queue.pop() else {
    110                     break;
    111                 };
    112                 if self.subscribe(id) == PlaceResult::Queued {
    113                     break;
    114                 }
    115             }
    116         }
    117 
    118         // Return reserved passes
    119         for pass in reserved {
    120             self.sub_guardian.return_pass(pass);
    121         }
    122     }
    123 
    124     #[profiling::function]
    125     fn ingest_session_internal(&mut self, session: CompactionSession) {
    126         for (id, task) in session.tasks {
    127             match task {
    128                 RelayTask::Unsubscribe => {
    129                     self.unsubscribe(id);
    130                 }
    131                 RelayTask::Subscribe => {
    132                     self.subscribe(id);
    133                 }
    134             }
    135         }
    136     }
    137 
    138     #[profiling::function]
    139     pub fn handle_relay_open(&mut self) {
    140         let CompactionCtx::Active(handler) = &mut self.ctx else {
    141             return;
    142         };
    143 
    144         if !handler.relay.is_connected() {
    145             return;
    146         }
    147 
    148         for (sid, sub_data) in &handler.data.relay_subs {
    149             let filters = handler.subs.filters_all(&sub_data.requests.requests);
    150             if are_filters_empty(&filters) {
    151                 continue;
    152             }
    153 
    154             handler
    155                 .relay
    156                 .conn
    157                 .send(&ClientMessage::req(sid.to_string(), filters));
    158         }
    159     }
    160 
    161     #[allow(dead_code)]
    162     pub fn revocate(&mut self, mut revocation: SubPassRevocation) {
    163         let Some(pass) = self.compact() else {
    164             // this shouldn't be possible
    165             return;
    166         };
    167 
    168         revocation.revocate(pass);
    169     }
    170 
    171     #[allow(dead_code)]
    172     pub fn revocate_all(&mut self, revocations: Vec<SubPassRevocation>) {
    173         for revocation in revocations {
    174             self.revocate(revocation);
    175         }
    176     }
    177 
    178     #[profiling::function]
    179     fn compact(&mut self) -> Option<SubPass> {
    180         let SharedCtx {
    181             data,
    182             session,
    183             subs,
    184         } = self.ctx.shared();
    185 
    186         let (id, smallest) = take_smallest_sub_reqs(subs, &mut data.relay_subs)?;
    187 
    188         session.tasks.insert(id, SubSessionTask::Removed);
    189         for id in smallest.requests.requests {
    190             self.ctx.data().request_to_sid.remove(&id);
    191             self.place(id);
    192         }
    193 
    194         Some(smallest.sub_pass)
    195     }
    196 
    197     #[profiling::function]
    198     fn new_sub(&mut self, id: OutboxSubId) -> PlaceResult {
    199         let Some(new_pass) = self.sub_guardian.take_pass() else {
    200             // pass not available, try to place on an existing sub
    201             return self.place(id);
    202         };
    203 
    204         let relay_id = RelayReqId::default();
    205         let mut requests = SubRequests::default();
    206         requests.add(id);
    207 
    208         let SharedCtx {
    209             data,
    210             session,
    211             subs: _,
    212         } = self.ctx.shared();
    213         data.relay_subs.insert(
    214             relay_id.clone(),
    215             RelaySubData {
    216                 requests,
    217                 status: RelayReqStatus::InitialQuery,
    218                 sub_pass: new_pass,
    219             },
    220         );
    221         data.request_to_sid.insert(id, relay_id.clone());
    222         session.tasks.insert(relay_id, SubSessionTask::New);
    223         PlaceResult::Placed
    224     }
    225 
    226     #[profiling::function]
    227     pub fn subscribe(&mut self, id: OutboxSubId) -> PlaceResult {
    228         let SharedCtx {
    229             data,
    230             session,
    231             subs: _,
    232         } = self.ctx.shared();
    233         let Some(relay_id) = data.request_to_sid.get(&id) else {
    234             return self.new_sub(id);
    235         };
    236 
    237         let Some(sub_data) = data.relay_subs.get_mut(relay_id) else {
    238             return self.new_sub(id);
    239         };
    240 
    241         // modifying a filter
    242         sub_data.requests.add(id);
    243 
    244         sub_data.status = RelayReqStatus::InitialQuery;
    245 
    246         session
    247             .tasks
    248             .insert(relay_id.clone(), SubSessionTask::Touched);
    249         tracing::debug!("Placed {id:?} on an existing subscription: {relay_id:?}");
    250         PlaceResult::Placed
    251     }
    252 
    253     #[profiling::function]
    254     pub fn unsubscribe(&mut self, id: OutboxSubId) {
    255         let SharedCtx {
    256             data: compaction_data,
    257             session,
    258             subs: _,
    259         } = self.ctx.shared();
    260         let Some(relay_id) = compaction_data.request_to_sid.remove(&id) else {
    261             compaction_data.queue.add(id, RelayTask::Unsubscribe);
    262             return;
    263         };
    264 
    265         let Some(data) = compaction_data.relay_subs.get_mut(&relay_id) else {
    266             compaction_data.queue.add(id, RelayTask::Unsubscribe);
    267             return;
    268         };
    269 
    270         data.status = RelayReqStatus::InitialQuery;
    271 
    272         if !data.requests.remove(&id) {
    273             return;
    274         }
    275 
    276         if !data.requests.is_empty() {
    277             session
    278                 .tasks
    279                 .insert(relay_id.clone(), SubSessionTask::Touched);
    280             return;
    281         }
    282 
    283         let Some(data) = compaction_data.relay_subs.remove(&relay_id) else {
    284             return;
    285         };
    286 
    287         self.sub_guardian.return_pass(data.sub_pass);
    288         tracing::debug!("Unsubed from last internal id in REQ, returning pass");
    289         session
    290             .tasks
    291             .insert(relay_id.clone(), SubSessionTask::Removed);
    292     }
    293 
    294     #[profiling::function]
    295     fn place(&mut self, id: OutboxSubId) -> PlaceResult {
    296         let SharedCtx {
    297             data,
    298             session,
    299             subs,
    300         } = self.ctx.shared();
    301         let placed_on = 'place: {
    302             for (relay_id, relay_data) in &mut data.relay_subs {
    303                 if !relay_data.requests.can_fit(subs, &id, self.json_limit) {
    304                     continue;
    305                 }
    306 
    307                 session
    308                     .tasks
    309                     .insert(relay_id.clone(), SubSessionTask::Touched);
    310                 relay_data.requests.add(id);
    311                 break 'place Some(relay_id.clone());
    312             }
    313 
    314             None
    315         };
    316 
    317         if let Some(relay_id) = placed_on {
    318             data.request_to_sid.insert(id, relay_id);
    319             return PlaceResult::Placed;
    320         }
    321 
    322         data.queue.add(id, RelayTask::Subscribe);
    323         PlaceResult::Queued
    324     }
    325 }
    326 
    327 #[derive(Debug, PartialEq, Eq)]
    328 pub enum PlaceResult {
    329     Placed,
    330     Queued,
    331 }
    332 
    333 fn take_smallest_sub_reqs(
    334     subs: &OutboxSubscriptions,
    335     data: &mut HashMap<RelayReqId, RelaySubData>,
    336 ) -> Option<(RelayReqId, RelaySubData)> {
    337     let mut smallest = usize::MAX;
    338     let mut res = None;
    339 
    340     for (id, d) in data.iter() {
    341         let cur_size = subs.json_size_sum(&d.requests.requests);
    342         if cur_size < smallest {
    343             smallest = cur_size;
    344             res = Some(id.clone());
    345         }
    346     }
    347 
    348     let id = res?;
    349 
    350     data.remove(&id).map(|r| (id, r))
    351 }
    352 
    353 #[derive(Default)]
    354 struct CompactionSubSession {
    355     tasks: HashMap<RelayReqId, SubSessionTask>,
    356 }
    357 
    358 enum SubSessionTask {
    359     New,
    360     Touched,
    361     Removed,
    362 }
    363 
    364 enum CompactionCtx<'a> {
    365     Active(CompactionHandler<'a>),
    366     Inactive {
    367         data: &'a mut CompactionData,
    368         session: CompactionSubSession,
    369         subs: &'a OutboxSubscriptions,
    370     },
    371 }
    372 
    373 impl<'a> CompactionCtx<'a> {
    374     #[profiling::function]
    375     pub fn shared(&mut self) -> SharedCtx<'_> {
    376         match self {
    377             CompactionCtx::Active(compaction_handler) => SharedCtx {
    378                 data: compaction_handler.data,
    379                 session: &mut compaction_handler.session,
    380                 subs: compaction_handler.subs,
    381             },
    382             CompactionCtx::Inactive {
    383                 data,
    384                 session,
    385                 subs,
    386             } => SharedCtx {
    387                 data,
    388                 session,
    389                 subs,
    390             },
    391         }
    392     }
    393 
    394     pub fn data(&mut self) -> &mut CompactionData {
    395         match self {
    396             CompactionCtx::Active(compaction_handler) => compaction_handler.data,
    397             CompactionCtx::Inactive {
    398                 data,
    399                 session: _,
    400                 subs: _,
    401             } => data,
    402         }
    403     }
    404 }
    405 struct SharedCtx<'a> {
    406     data: &'a mut CompactionData,
    407     session: &'a mut CompactionSubSession,
    408     subs: &'a OutboxSubscriptions,
    409 }
    410 
    411 struct CompactionHandler<'a> {
    412     relay: &'a mut WebsocketRelay,
    413     data: &'a mut CompactionData,
    414     subs: &'a OutboxSubscriptions,
    415     pub session: CompactionSubSession,
    416 }
    417 
    418 impl<'a> Drop for CompactionHandler<'a> {
    419     #[profiling::function]
    420     fn drop(&mut self) {
    421         for (id, task) in &self.session.tasks {
    422             match task {
    423                 SubSessionTask::Touched => {
    424                     let Some(data) = self.data.relay_subs.get_mut(id) else {
    425                         continue;
    426                     };
    427 
    428                     let filters = self.subs.filters_all(&data.requests.requests);
    429 
    430                     if filters.is_empty() {
    431                         self.relay.conn.send(&ClientMessage::close(id.0.clone()));
    432                     } else {
    433                         self.relay
    434                             .conn
    435                             .send(&ClientMessage::req(id.0.clone(), filters));
    436                     }
    437                 }
    438                 SubSessionTask::Removed => {
    439                     self.relay.conn.send(&ClientMessage::close(id.0.clone()));
    440                 }
    441                 SubSessionTask::New => {
    442                     let Some(data) = self.data.relay_subs.get(id) else {
    443                         continue;
    444                     };
    445 
    446                     let filters = self.subs.filters_all(&data.requests.requests);
    447                     self.relay
    448                         .conn
    449                         .send(&ClientMessage::req(id.0.clone(), filters));
    450                 }
    451             }
    452         }
    453     }
    454 }
    455 
    456 fn are_filters_empty(filters: &Vec<Filter>) -> bool {
    457     if filters.is_empty() {
    458         return true;
    459     }
    460 
    461     for filter in filters {
    462         if filter.num_elements() != 0 {
    463             return false;
    464         }
    465     }
    466 
    467     true
    468 }
    469 
    470 impl<'a> CompactionHandler<'a> {
    471     pub fn new(
    472         relay: &'a mut WebsocketRelay,
    473         data: &'a mut CompactionData,
    474         subs: &'a OutboxSubscriptions,
    475     ) -> Self {
    476         Self {
    477             relay,
    478             data,
    479             session: CompactionSubSession::default(),
    480             subs,
    481         }
    482     }
    483 }
    484 
    485 /// Represents a singular REQ to a relay
    486 struct RelaySubData {
    487     requests: SubRequests,
    488     status: RelayReqStatus,
    489     sub_pass: SubPass,
    490 }
    491 
    492 #[derive(Default)]
    493 struct SubRequests {
    494     pub requests: HashSet<OutboxSubId>,
    495 }
    496 
    497 impl SubRequests {
    498     #[profiling::function]
    499     pub fn add(&mut self, id: OutboxSubId) {
    500         self.requests.insert(id);
    501     }
    502 
    503     pub fn remove(&mut self, id: &OutboxSubId) -> bool {
    504         self.requests.remove(id)
    505     }
    506 
    507     pub fn is_empty(&self) -> bool {
    508         self.requests.is_empty()
    509     }
    510 
    511     pub fn can_fit(
    512         &self,
    513         subs: &OutboxSubscriptions,
    514         new: &OutboxSubId,
    515         json_limit: usize,
    516     ) -> bool {
    517         let Some(new_size) = subs.json_size(new) else {
    518             return true;
    519         };
    520 
    521         let cur_json_size = subs.json_size_sum(&self.requests);
    522 
    523         // `["REQ","abc...123"]`;
    524         //  12345678  ...    90 -> 10 characters excluding the UUID
    525         cur_json_size + new_size + 10 + RelayReqId::byte_len() <= json_limit
    526     }
    527 }
    528 
    529 #[derive(Default)]
    530 pub struct CompactionSession {
    531     // Number of subs which should be free after ingestion. Subs will compact enough to free up that number of subs
    532     // OR as much as possible without dropping any existing subs
    533     request_free: usize,
    534     tasks: HashMap<OutboxSubId, RelayTask>,
    535 }
    536 
    537 impl CompactionSession {
    538     pub fn request_free_subs(&mut self, num_free: usize) {
    539         self.request_free = num_free;
    540     }
    541 
    542     pub fn unsub(&mut self, unsub: OutboxSubId) {
    543         self.tasks.insert(unsub, RelayTask::Unsubscribe);
    544     }
    545 
    546     pub fn sub(&mut self, id: OutboxSubId) {
    547         self.tasks.insert(id, RelayTask::Subscribe);
    548     }
    549 
    550     pub fn is_empty(&self) -> bool {
    551         self.tasks.is_empty() && self.request_free == 0
    552     }
    553 }
    554 
    555 #[cfg(test)]
    556 mod tests {
    557     use super::*;
    558     use crate::relay::{RelayUrlPkgs, SubscribeTask};
    559     use hashbrown::HashSet;
    560 
    561     // ==================== CompactionData tests ====================
    562 
    563     #[test]
    564     fn compaction_data_default_empty() {
    565         let data = CompactionData::default();
    566         assert_eq!(data.num_subs(), 0);
    567     }
    568 
    569     #[test]
    570     fn compaction_data_req_status_none_for_unknown() {
    571         let data = CompactionData::default();
    572         assert!(data.req_status(&OutboxSubId(999)).is_none());
    573     }
    574 
    575     #[test]
    576     fn compaction_data_has_eose_false_for_unknown() {
    577         let data = CompactionData::default();
    578         assert!(!data.has_eose(&OutboxSubId(999)));
    579     }
    580 
    581     #[test]
    582     fn compaction_data_set_req_status_ignores_unknown_sid() {
    583         let mut data = CompactionData::default();
    584         // Should not panic or error when setting status for unknown sid
    585         data.set_req_status("unknown-sid", RelayReqStatus::Eose);
    586     }
    587 
    588     #[test]
    589     fn compaction_data_ids_returns_sub_ids() {
    590         let mut data = CompactionData::default();
    591         let mut guardian = SubPassGuardian::new(1);
    592         let pass = guardian.take_pass().unwrap();
    593 
    594         let id = OutboxSubId(7);
    595         let relay_id = RelayReqId::from("req-123");
    596         let mut requests = SubRequests::default();
    597         requests.add(id);
    598         data.relay_subs.insert(
    599             relay_id.clone(),
    600             RelaySubData {
    601                 requests,
    602                 status: RelayReqStatus::InitialQuery,
    603                 sub_pass: pass,
    604             },
    605         );
    606 
    607         let ids = data.ids(&relay_id);
    608         assert!(ids.is_some());
    609         assert!(ids.unwrap().contains(&id));
    610     }
    611 
    612     #[test]
    613     fn compaction_data_set_req_status_updates_status() {
    614         let mut data = CompactionData::default();
    615 
    616         // Manually set up a relay subscription
    617         let relay_id = RelayReqId::from("test-sid");
    618         let mut guardian = SubPassGuardian::new(1);
    619         let pass = guardian.take_pass().unwrap();
    620 
    621         data.relay_subs.insert(
    622             relay_id.clone(),
    623             RelaySubData {
    624                 requests: SubRequests::default(),
    625                 status: RelayReqStatus::InitialQuery,
    626                 sub_pass: pass,
    627             },
    628         );
    629 
    630         // Set EOSE should update status
    631         data.set_req_status("test-sid", RelayReqStatus::Eose);
    632 
    633         // Verify status was set
    634         let sub_data = data.relay_subs.get(&relay_id).unwrap();
    635         assert_eq!(sub_data.status, RelayReqStatus::Eose);
    636     }
    637 
    638     // ==================== SubRequests tests ====================
    639 
    640     /// can_fit returns true when combined JSON size is under the limit.
    641     #[test]
    642     fn sub_requests_can_fit() {
    643         use crate::relay::{RelayUrlPkgs, SubscribeTask};
    644         use hashbrown::HashSet;
    645 
    646         let mut subs = OutboxSubscriptions::default();
    647         subs.new_subscription(
    648             OutboxSubId(0),
    649             SubscribeTask {
    650                 filters: vec![Filter::new().kinds(vec![1]).build()],
    651                 relays: RelayUrlPkgs::new(HashSet::new()),
    652             },
    653             false,
    654         );
    655 
    656         let requests = SubRequests::default();
    657 
    658         assert!(requests.can_fit(&subs, &OutboxSubId(0), 1_000_000));
    659         assert!(!requests.can_fit(&subs, &OutboxSubId(0), 5));
    660     }
    661 
    662     // ==================== CompactionSession tests ====================
    663 
    664     #[test]
    665     fn compaction_session_default() {
    666         let session = CompactionSession::default();
    667         assert_eq!(session.request_free, 0);
    668         assert!(session.tasks.is_empty());
    669     }
    670 
    671     #[test]
    672     fn compaction_session_unsub() {
    673         let mut session = CompactionSession::default();
    674         session.unsub(OutboxSubId(42));
    675 
    676         assert!(session.tasks.contains_key(&OutboxSubId(42)));
    677         match session.tasks.get(&OutboxSubId(42)) {
    678             Some(RelayTask::Unsubscribe) => (),
    679             _ => panic!("Expected Unsubscribe task"),
    680         }
    681     }
    682 
    683     #[test]
    684     fn compaction_session_sub() {
    685         let mut session = CompactionSession::default();
    686         session.sub(OutboxSubId(1));
    687 
    688         assert!(session.tasks.contains_key(&OutboxSubId(1)));
    689         assert!(matches!(
    690             session.tasks.get(&OutboxSubId(1)),
    691             Some(RelayTask::Subscribe)
    692         ));
    693     }
    694 
    695     // ==================== take_smallest_sub_reqs tests ====================
    696 
    697     #[test]
    698     fn take_smallest_returns_none_for_empty() {
    699         let subs = OutboxSubscriptions::default();
    700         let mut data: HashMap<RelayReqId, RelaySubData> = HashMap::new();
    701         assert!(take_smallest_sub_reqs(&subs, &mut data).is_none());
    702     }
    703 
    704     /// Returns the relay sub with the smallest combined JSON size.
    705     #[test]
    706     fn take_smallest_returns_smallest_by_json_size() {
    707         use crate::relay::{RelayUrlPkgs, SubscribeTask};
    708         use hashbrown::HashSet;
    709 
    710         // Register subscriptions with different JSON sizes
    711         let mut subs = OutboxSubscriptions::default();
    712         subs.new_subscription(
    713             OutboxSubId(0),
    714             SubscribeTask {
    715                 filters: vec![Filter::new().kinds(vec![1]).build()],
    716                 relays: RelayUrlPkgs::new(HashSet::new()),
    717             },
    718             false,
    719         );
    720         subs.new_subscription(
    721             OutboxSubId(1),
    722             SubscribeTask {
    723                 filters: vec![Filter::new()
    724                     .kinds(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    725                     .build()],
    726                 relays: RelayUrlPkgs::new(HashSet::new()),
    727             },
    728             false,
    729         );
    730 
    731         let mut guardian = SubPassGuardian::new(2);
    732 
    733         // Small relay sub contains id 0
    734         let mut small_requests = SubRequests::default();
    735         small_requests.add(OutboxSubId(0));
    736 
    737         // Large relay sub contains id 1
    738         let mut large_requests = SubRequests::default();
    739         large_requests.add(OutboxSubId(1));
    740 
    741         let mut data: HashMap<RelayReqId, RelaySubData> = HashMap::new();
    742         data.insert(
    743             RelayReqId::from("small"),
    744             RelaySubData {
    745                 requests: small_requests,
    746                 status: RelayReqStatus::InitialQuery,
    747                 sub_pass: guardian.take_pass().unwrap(),
    748             },
    749         );
    750         data.insert(
    751             RelayReqId::from("large"),
    752             RelaySubData {
    753                 requests: large_requests,
    754                 status: RelayReqStatus::InitialQuery,
    755                 sub_pass: guardian.take_pass().unwrap(),
    756             },
    757         );
    758 
    759         let (id, _) = take_smallest_sub_reqs(&subs, &mut data).unwrap();
    760         assert_eq!(id.0, "small");
    761         assert_eq!(data.len(), 1);
    762     }
    763 
    764     #[test]
    765     fn take_smallest_removes_from_map() {
    766         let subs = OutboxSubscriptions::default();
    767         let mut data: HashMap<RelayReqId, RelaySubData> = HashMap::new();
    768         let mut guardian = SubPassGuardian::new(1);
    769 
    770         data.insert(
    771             RelayReqId::from("only"),
    772             RelaySubData {
    773                 requests: SubRequests::default(),
    774                 status: RelayReqStatus::InitialQuery,
    775                 sub_pass: guardian.take_pass().unwrap(),
    776             },
    777         );
    778 
    779         let result = take_smallest_sub_reqs(&subs, &mut data);
    780         assert!(result.is_some());
    781         assert!(data.is_empty());
    782     }
    783 
    784     // ==================== CompactionRelay tests ====================
    785 
    786     /// Requesting free subs when there's nothing to compact has no effect.
    787     #[test]
    788     fn compact_returns_none_when_no_subs() {
    789         let subs = OutboxSubscriptions::default();
    790         let mut data = CompactionData::default();
    791         let mut guardian = SubPassGuardian::new(5);
    792         let json_limit = 100000;
    793 
    794         let initial_passes = guardian.available_passes();
    795 
    796         let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
    797         let mut session = CompactionSession::default();
    798         session.request_free_subs(1);
    799         relay.ingest_session(session);
    800 
    801         assert_eq!(guardian.available_passes(), initial_passes);
    802     }
    803 
    804     /// Compacting frees a pass and redistributes requests to remaining subs.
    805     #[test]
    806     fn compact_frees_pass_and_redistributes() {
    807         use crate::relay::{RelayUrlPkgs, SubscribeTask};
    808         use hashbrown::HashSet;
    809 
    810         let mut subs = OutboxSubscriptions::default();
    811         subs.new_subscription(
    812             OutboxSubId(0),
    813             SubscribeTask {
    814                 filters: vec![Filter::new().kinds(vec![1]).build()],
    815                 relays: RelayUrlPkgs::new(HashSet::new()),
    816             },
    817             false,
    818         );
    819         subs.new_subscription(
    820             OutboxSubId(1),
    821             SubscribeTask {
    822                 filters: vec![Filter::new()
    823                     .kinds(vec![2, 3, 4, 5, 6, 7, 8, 9, 10])
    824                     .build()],
    825                 relays: RelayUrlPkgs::new(HashSet::new()),
    826             },
    827             false,
    828         );
    829 
    830         let mut data = CompactionData::default();
    831         let mut guardian = SubPassGuardian::new(5);
    832         let json_limit = 100000;
    833 
    834         // Create 2 relay subs
    835         let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
    836         let mut session = CompactionSession::default();
    837         session.sub(OutboxSubId(0));
    838         session.sub(OutboxSubId(1));
    839         relay.ingest_session(session);
    840 
    841         assert_eq!(data.relay_subs.len(), 2);
    842         assert_eq!(guardian.available_passes(), 3); // 5 - 2
    843 
    844         // Request 4 free passes - must compact 1
    845         let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
    846         let mut session = CompactionSession::default();
    847         session.request_free_subs(4);
    848         relay.ingest_session(session);
    849 
    850         assert_eq!(data.relay_subs.len(), 1);
    851         assert_eq!(guardian.available_passes(), 4);
    852 
    853         let remaining = data.relay_subs.values().next().unwrap();
    854         assert_eq!(remaining.requests.requests.len(), 2);
    855     }
    856 
    857     /// When compaction redistributes a request but the remaining sub
    858     /// doesn't have room, the request goes to the queue.
    859     #[test]
    860     fn place_queues_when_no_room() {
    861         use crate::relay::{RelayUrlPkgs, SubscribeTask};
    862         use hashbrown::HashSet;
    863 
    864         let mut subs = OutboxSubscriptions::default();
    865         subs.new_subscription(
    866             OutboxSubId(0),
    867             SubscribeTask {
    868                 filters: vec![Filter::new().kinds(vec![1]).build()],
    869                 relays: RelayUrlPkgs::new(HashSet::new()),
    870             },
    871             false,
    872         );
    873         subs.new_subscription(
    874             OutboxSubId(1),
    875             SubscribeTask {
    876                 filters: vec![Filter::new().kinds(vec![2]).build()],
    877                 relays: RelayUrlPkgs::new(HashSet::new()),
    878             },
    879             false,
    880         );
    881 
    882         // Set limit so combined filters won't fit in one REQ
    883         let size0 = subs.json_size(&OutboxSubId(0)).unwrap();
    884         let size1 = subs.json_size(&OutboxSubId(1)).unwrap();
    885         let json_limit = size0 + size1 - 1;
    886 
    887         let mut data = CompactionData::default();
    888         let mut guardian = SubPassGuardian::new(2);
    889 
    890         // Create 2 relay subs at capacity
    891         let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
    892         let mut session = CompactionSession::default();
    893         session.sub(OutboxSubId(0));
    894         session.sub(OutboxSubId(1));
    895         relay.ingest_session(session);
    896 
    897         assert_eq!(data.relay_subs.len(), 2);
    898         assert!(data.queue.is_empty());
    899 
    900         // Compact 1 - redistributed request won't fit
    901         let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
    902         let mut session = CompactionSession::default();
    903         session.request_free_subs(1);
    904         relay.ingest_session(session);
    905 
    906         assert_eq!(data.relay_subs.len(), 1);
    907         assert!(!data.queue.is_empty());
    908     }
    909 
    910     /// When no passes are available, requests are placed on existing relay subs.
    911     #[test]
    912     fn new_sub_places_on_existing_when_no_passes() {
    913         use crate::relay::{RelayUrlPkgs, SubscribeTask};
    914         use hashbrown::HashSet;
    915 
    916         let mut subs = OutboxSubscriptions::default();
    917         subs.new_subscription(
    918             OutboxSubId(0),
    919             SubscribeTask {
    920                 filters: vec![Filter::new().kinds(vec![1]).build()],
    921                 relays: RelayUrlPkgs::new(HashSet::new()),
    922             },
    923             false,
    924         );
    925         subs.new_subscription(
    926             OutboxSubId(1),
    927             SubscribeTask {
    928                 filters: vec![Filter::new().kinds(vec![2]).build()],
    929                 relays: RelayUrlPkgs::new(HashSet::new()),
    930             },
    931             false,
    932         );
    933 
    934         let mut data = CompactionData::default();
    935         let mut guardian = SubPassGuardian::new(1); // Only 1 pass
    936         let json_limit = 100000;
    937 
    938         // Add 2 requests with only 1 pass - second must go on existing
    939         let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
    940         let mut session = CompactionSession::default();
    941         session.sub(OutboxSubId(0));
    942         session.sub(OutboxSubId(1));
    943         relay.ingest_session(session);
    944 
    945         assert_eq!(data.relay_subs.len(), 1);
    946         let sub = data.relay_subs.values().next().unwrap();
    947         assert_eq!(sub.requests.requests.len(), 2);
    948     }
    949 
    950     /// Subscriptions placed onto an existing compacted REQ must register
    951     /// request-to-relay mapping so a later unsubscribe updates the correct REQ.
    952     #[test]
    953     fn unsubscribe_after_place_on_existing_removes_request() {
    954         use crate::relay::{RelayUrlPkgs, SubscribeTask};
    955         use hashbrown::HashSet;
    956 
    957         let mut subs = OutboxSubscriptions::default();
    958         subs.new_subscription(
    959             OutboxSubId(0),
    960             SubscribeTask {
    961                 filters: vec![Filter::new().kinds(vec![1]).build()],
    962                 relays: RelayUrlPkgs::new(HashSet::new()),
    963             },
    964             false,
    965         );
    966         subs.new_subscription(
    967             OutboxSubId(1),
    968             SubscribeTask {
    969                 filters: vec![Filter::new().kinds(vec![2]).build()],
    970                 relays: RelayUrlPkgs::new(HashSet::new()),
    971             },
    972             false,
    973         );
    974 
    975         let mut data = CompactionData::default();
    976         let mut guardian = SubPassGuardian::new(1); // Force second sub onto existing REQ
    977         let json_limit = 100000;
    978 
    979         let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
    980         let mut session = CompactionSession::default();
    981         session.sub(OutboxSubId(0));
    982         session.sub(OutboxSubId(1));
    983         relay.ingest_session(session);
    984 
    985         assert_eq!(data.relay_subs.len(), 1);
    986         let relay_id = data.relay_subs.keys().next().cloned().unwrap();
    987         assert_eq!(data.request_to_sid.get(&OutboxSubId(0)), Some(&relay_id));
    988         assert_eq!(data.request_to_sid.get(&OutboxSubId(1)), Some(&relay_id));
    989 
    990         let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
    991         let mut session = CompactionSession::default();
    992         session.unsub(OutboxSubId(1));
    993         relay.ingest_session(session);
    994 
    995         assert!(data.queue.is_empty());
    996         assert_eq!(data.relay_subs.len(), 1);
    997         let sub = data.relay_subs.get(&relay_id).unwrap();
    998         assert_eq!(sub.requests.requests.len(), 1);
    999         assert!(sub.requests.requests.contains(&OutboxSubId(0)));
   1000         assert!(!sub.requests.requests.contains(&OutboxSubId(1)));
   1001         assert_eq!(data.request_to_sid.get(&OutboxSubId(0)), Some(&relay_id));
   1002         assert!(!data.request_to_sid.contains_key(&OutboxSubId(1)));
   1003     }
   1004 
   1005     /// When requesting multiple free passes, multiple subs are compacted
   1006     /// and all requests are consolidated into fewer relay subs.
   1007     #[test]
   1008     fn compact_multiple_subs() {
   1009         let mut data = CompactionData::default();
   1010         let mut guardian = SubPassGuardian::new(3);
   1011         let json_limit = 100000;
   1012         let mut subs = OutboxSubscriptions::default();
   1013         for i in 0..3 {
   1014             subs.new_subscription(
   1015                 OutboxSubId(i),
   1016                 SubscribeTask {
   1017                     filters: vec![Filter::new().kinds(vec![i as u64 + 1]).build()],
   1018                     relays: RelayUrlPkgs::new(HashSet::new()),
   1019                 },
   1020                 false,
   1021             );
   1022         }
   1023 
   1024         // Create 3 subs and request 2 free in same session
   1025         let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
   1026         let mut session = CompactionSession::default();
   1027         for i in 0..3 {
   1028             session.sub(OutboxSubId(i));
   1029         }
   1030         session.request_free_subs(2);
   1031         relay.ingest_session(session);
   1032 
   1033         // Should compact down to 1 sub with all 3 requests
   1034         assert_eq!(data.relay_subs.len(), 1);
   1035         assert_eq!(guardian.available_passes(), 2);
   1036 
   1037         let sub = data.relay_subs.values().next().unwrap();
   1038         assert_eq!(sub.requests.requests.len(), 3);
   1039     }
   1040 }