commit 2dd8e1879b57901096844ff831f107475e18a628
parent de84dc2a5f3c6ce58a29cab7b5a8495487961a80
Author: kernelkind <kernelkind@gmail.com>
Date: Thu, 5 Feb 2026 17:19:36 -0500
test(outbox): `CompactionRelay`
Signed-off-by: kernelkind <kernelkind@gmail.com>
Diffstat:
1 file changed, 487 insertions(+), 0 deletions(-)
diff --git a/crates/enostr/src/relay/compaction.rs b/crates/enostr/src/relay/compaction.rs
@@ -547,3 +547,490 @@ impl CompactionSession {
self.tasks.is_empty() && self.request_free == 0
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::relay::{RelayUrlPkgs, SubscribeTask};
+ use hashbrown::HashSet;
+
+ // ==================== CompactionData tests ====================
+
+ #[test]
+ fn compaction_data_default_empty() {
+ let data = CompactionData::default();
+ assert_eq!(data.num_subs(), 0);
+ }
+
+ #[test]
+ fn compaction_data_req_status_none_for_unknown() {
+ let data = CompactionData::default();
+ assert!(data.req_status(&OutboxSubId(999)).is_none());
+ }
+
+ #[test]
+ fn compaction_data_has_eose_false_for_unknown() {
+ let data = CompactionData::default();
+ assert!(!data.has_eose(&OutboxSubId(999)));
+ }
+
+ #[test]
+ fn compaction_data_set_req_status_ignores_unknown_sid() {
+ let mut data = CompactionData::default();
+ // Should not panic or error when setting status for unknown sid
+ data.set_req_status("unknown-sid", RelayReqStatus::Eose);
+ }
+
+ #[test]
+ fn compaction_data_ids_returns_sub_ids() {
+ let mut data = CompactionData::default();
+ let mut guardian = SubPassGuardian::new(1);
+ let pass = guardian.take_pass().unwrap();
+
+ let id = OutboxSubId(7);
+ let relay_id = RelayReqId::from("req-123");
+ let mut requests = SubRequests::default();
+ requests.add(id);
+ data.relay_subs.insert(
+ relay_id.clone(),
+ RelaySubData {
+ requests,
+ status: RelayReqStatus::InitialQuery,
+ sub_pass: pass,
+ },
+ );
+
+ let ids = data.ids(&relay_id);
+ assert!(ids.is_some());
+ assert!(ids.unwrap().contains(&id));
+ }
+
+ #[test]
+ fn compaction_data_set_req_status_updates_status() {
+ let mut data = CompactionData::default();
+
+ // Manually set up a relay subscription
+ let relay_id = RelayReqId::from("test-sid");
+ let mut guardian = SubPassGuardian::new(1);
+ let pass = guardian.take_pass().unwrap();
+
+ data.relay_subs.insert(
+ relay_id.clone(),
+ RelaySubData {
+ requests: SubRequests::default(),
+ status: RelayReqStatus::InitialQuery,
+ sub_pass: pass,
+ },
+ );
+
+ // Set EOSE should update status
+ data.set_req_status("test-sid", RelayReqStatus::Eose);
+
+ // Verify status was set
+ let sub_data = data.relay_subs.get(&relay_id).unwrap();
+ assert_eq!(sub_data.status, RelayReqStatus::Eose);
+ }
+
+ // ==================== SubRequests tests ====================
+
+ /// can_fit returns true when combined JSON size is under the limit.
+ #[test]
+ fn sub_requests_can_fit() {
+ use crate::relay::{RelayUrlPkgs, SubscribeTask};
+ use hashbrown::HashSet;
+
+ let mut subs = OutboxSubscriptions::default();
+ subs.new_subscription(
+ OutboxSubId(0),
+ SubscribeTask {
+ filters: vec![Filter::new().kinds(vec![1]).build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+
+ let requests = SubRequests::default();
+
+ assert!(requests.can_fit(&subs, &OutboxSubId(0), 1_000_000));
+ assert!(!requests.can_fit(&subs, &OutboxSubId(0), 5));
+ }
+
+ // ==================== CompactionSession tests ====================
+
+ #[test]
+ fn compaction_session_default() {
+ let session = CompactionSession::default();
+ assert_eq!(session.request_free, 0);
+ assert!(session.tasks.is_empty());
+ }
+
+ #[test]
+ fn compaction_session_unsub() {
+ let mut session = CompactionSession::default();
+ session.unsub(OutboxSubId(42));
+
+ assert!(session.tasks.contains_key(&OutboxSubId(42)));
+ match session.tasks.get(&OutboxSubId(42)) {
+ Some(RelayTask::Unsubscribe) => (),
+ _ => panic!("Expected Unsubscribe task"),
+ }
+ }
+
+ #[test]
+ fn compaction_session_sub() {
+ let mut session = CompactionSession::default();
+ session.sub(OutboxSubId(1));
+
+ assert!(session.tasks.contains_key(&OutboxSubId(1)));
+ assert!(matches!(
+ session.tasks.get(&OutboxSubId(1)),
+ Some(RelayTask::Subscribe)
+ ));
+ }
+
+ // ==================== take_smallest_sub_reqs tests ====================
+
+ #[test]
+ fn take_smallest_returns_none_for_empty() {
+ let subs = OutboxSubscriptions::default();
+ let mut data: HashMap<RelayReqId, RelaySubData> = HashMap::new();
+ assert!(take_smallest_sub_reqs(&subs, &mut data).is_none());
+ }
+
+ /// Returns the relay sub with the smallest combined JSON size.
+ #[test]
+ fn take_smallest_returns_smallest_by_json_size() {
+ use crate::relay::{RelayUrlPkgs, SubscribeTask};
+ use hashbrown::HashSet;
+
+ // Register subscriptions with different JSON sizes
+ let mut subs = OutboxSubscriptions::default();
+ subs.new_subscription(
+ OutboxSubId(0),
+ SubscribeTask {
+ filters: vec![Filter::new().kinds(vec![1]).build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+ subs.new_subscription(
+ OutboxSubId(1),
+ SubscribeTask {
+ filters: vec![Filter::new()
+ .kinds(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
+ .build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+
+ let mut guardian = SubPassGuardian::new(2);
+
+ // Small relay sub contains id 0
+ let mut small_requests = SubRequests::default();
+ small_requests.add(OutboxSubId(0));
+
+ // Large relay sub contains id 1
+ let mut large_requests = SubRequests::default();
+ large_requests.add(OutboxSubId(1));
+
+ let mut data: HashMap<RelayReqId, RelaySubData> = HashMap::new();
+ data.insert(
+ RelayReqId::from("small"),
+ RelaySubData {
+ requests: small_requests,
+ status: RelayReqStatus::InitialQuery,
+ sub_pass: guardian.take_pass().unwrap(),
+ },
+ );
+ data.insert(
+ RelayReqId::from("large"),
+ RelaySubData {
+ requests: large_requests,
+ status: RelayReqStatus::InitialQuery,
+ sub_pass: guardian.take_pass().unwrap(),
+ },
+ );
+
+ let (id, _) = take_smallest_sub_reqs(&subs, &mut data).unwrap();
+ assert_eq!(id.0, "small");
+ assert_eq!(data.len(), 1);
+ }
+
+ #[test]
+ fn take_smallest_removes_from_map() {
+ let subs = OutboxSubscriptions::default();
+ let mut data: HashMap<RelayReqId, RelaySubData> = HashMap::new();
+ let mut guardian = SubPassGuardian::new(1);
+
+ data.insert(
+ RelayReqId::from("only"),
+ RelaySubData {
+ requests: SubRequests::default(),
+ status: RelayReqStatus::InitialQuery,
+ sub_pass: guardian.take_pass().unwrap(),
+ },
+ );
+
+ let result = take_smallest_sub_reqs(&subs, &mut data);
+ assert!(result.is_some());
+ assert!(data.is_empty());
+ }
+
+ // ==================== CompactionRelay tests ====================
+
+ /// Requesting free subs when there's nothing to compact has no effect.
+ #[test]
+ fn compact_returns_none_when_no_subs() {
+ let subs = OutboxSubscriptions::default();
+ let mut data = CompactionData::default();
+ let mut guardian = SubPassGuardian::new(5);
+ let json_limit = 100000;
+
+ let initial_passes = guardian.available_passes();
+
+ let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
+ let mut session = CompactionSession::default();
+ session.request_free_subs(1);
+ relay.ingest_session(session);
+
+ assert_eq!(guardian.available_passes(), initial_passes);
+ }
+
+ /// Compacting frees a pass and redistributes requests to remaining subs.
+ #[test]
+ fn compact_frees_pass_and_redistributes() {
+ use crate::relay::{RelayUrlPkgs, SubscribeTask};
+ use hashbrown::HashSet;
+
+ let mut subs = OutboxSubscriptions::default();
+ subs.new_subscription(
+ OutboxSubId(0),
+ SubscribeTask {
+ filters: vec![Filter::new().kinds(vec![1]).build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+ subs.new_subscription(
+ OutboxSubId(1),
+ SubscribeTask {
+ filters: vec![Filter::new()
+ .kinds(vec![2, 3, 4, 5, 6, 7, 8, 9, 10])
+ .build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+
+ let mut data = CompactionData::default();
+ let mut guardian = SubPassGuardian::new(5);
+ let json_limit = 100000;
+
+ // Create 2 relay subs
+ let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
+ let mut session = CompactionSession::default();
+ session.sub(OutboxSubId(0));
+ session.sub(OutboxSubId(1));
+ relay.ingest_session(session);
+
+ assert_eq!(data.relay_subs.len(), 2);
+ assert_eq!(guardian.available_passes(), 3); // 5 - 2
+
+ // Request 4 free passes - must compact 1
+ let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
+ let mut session = CompactionSession::default();
+ session.request_free_subs(4);
+ relay.ingest_session(session);
+
+ assert_eq!(data.relay_subs.len(), 1);
+ assert_eq!(guardian.available_passes(), 4);
+
+ let remaining = data.relay_subs.values().next().unwrap();
+ assert_eq!(remaining.requests.requests.len(), 2);
+ }
+
+ /// When compaction redistributes a request but the remaining sub
+ /// doesn't have room, the request goes to the queue.
+ #[test]
+ fn place_queues_when_no_room() {
+ use crate::relay::{RelayUrlPkgs, SubscribeTask};
+ use hashbrown::HashSet;
+
+ let mut subs = OutboxSubscriptions::default();
+ subs.new_subscription(
+ OutboxSubId(0),
+ SubscribeTask {
+ filters: vec![Filter::new().kinds(vec![1]).build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+ subs.new_subscription(
+ OutboxSubId(1),
+ SubscribeTask {
+ filters: vec![Filter::new().kinds(vec![2]).build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+
+ // Set limit so combined filters won't fit in one REQ
+ let size0 = subs.json_size(&OutboxSubId(0)).unwrap();
+ let size1 = subs.json_size(&OutboxSubId(1)).unwrap();
+ let json_limit = size0 + size1 - 1;
+
+ let mut data = CompactionData::default();
+ let mut guardian = SubPassGuardian::new(2);
+
+ // Create 2 relay subs at capacity
+ let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
+ let mut session = CompactionSession::default();
+ session.sub(OutboxSubId(0));
+ session.sub(OutboxSubId(1));
+ relay.ingest_session(session);
+
+ assert_eq!(data.relay_subs.len(), 2);
+ assert!(data.queue.is_empty());
+
+ // Compact 1 - redistributed request won't fit
+ let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
+ let mut session = CompactionSession::default();
+ session.request_free_subs(1);
+ relay.ingest_session(session);
+
+ assert_eq!(data.relay_subs.len(), 1);
+ assert!(!data.queue.is_empty());
+ }
+
+ /// When no passes are available, requests are placed on existing relay subs.
+ #[test]
+ fn new_sub_places_on_existing_when_no_passes() {
+ use crate::relay::{RelayUrlPkgs, SubscribeTask};
+ use hashbrown::HashSet;
+
+ let mut subs = OutboxSubscriptions::default();
+ subs.new_subscription(
+ OutboxSubId(0),
+ SubscribeTask {
+ filters: vec![Filter::new().kinds(vec![1]).build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+ subs.new_subscription(
+ OutboxSubId(1),
+ SubscribeTask {
+ filters: vec![Filter::new().kinds(vec![2]).build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+
+ let mut data = CompactionData::default();
+ let mut guardian = SubPassGuardian::new(1); // Only 1 pass
+ let json_limit = 100000;
+
+ // Add 2 requests with only 1 pass - second must go on existing
+ let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
+ let mut session = CompactionSession::default();
+ session.sub(OutboxSubId(0));
+ session.sub(OutboxSubId(1));
+ relay.ingest_session(session);
+
+ assert_eq!(data.relay_subs.len(), 1);
+ let sub = data.relay_subs.values().next().unwrap();
+ assert_eq!(sub.requests.requests.len(), 2);
+ }
+
+ /// Subscriptions placed onto an existing compacted REQ must register
+ /// request-to-relay mapping so a later unsubscribe updates the correct REQ.
+ #[test]
+ fn unsubscribe_after_place_on_existing_removes_request() {
+ use crate::relay::{RelayUrlPkgs, SubscribeTask};
+ use hashbrown::HashSet;
+
+ let mut subs = OutboxSubscriptions::default();
+ subs.new_subscription(
+ OutboxSubId(0),
+ SubscribeTask {
+ filters: vec![Filter::new().kinds(vec![1]).build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+ subs.new_subscription(
+ OutboxSubId(1),
+ SubscribeTask {
+ filters: vec![Filter::new().kinds(vec![2]).build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+
+ let mut data = CompactionData::default();
+ let mut guardian = SubPassGuardian::new(1); // Force second sub onto existing REQ
+ let json_limit = 100000;
+
+ let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
+ let mut session = CompactionSession::default();
+ session.sub(OutboxSubId(0));
+ session.sub(OutboxSubId(1));
+ relay.ingest_session(session);
+
+ assert_eq!(data.relay_subs.len(), 1);
+ let relay_id = data.relay_subs.keys().next().cloned().unwrap();
+ assert_eq!(data.request_to_sid.get(&OutboxSubId(0)), Some(&relay_id));
+ assert_eq!(data.request_to_sid.get(&OutboxSubId(1)), Some(&relay_id));
+
+ let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
+ let mut session = CompactionSession::default();
+ session.unsub(OutboxSubId(1));
+ relay.ingest_session(session);
+
+ assert!(data.queue.is_empty());
+ assert_eq!(data.relay_subs.len(), 1);
+ let sub = data.relay_subs.get(&relay_id).unwrap();
+ assert_eq!(sub.requests.requests.len(), 1);
+ assert!(sub.requests.requests.contains(&OutboxSubId(0)));
+ assert!(!sub.requests.requests.contains(&OutboxSubId(1)));
+ assert_eq!(data.request_to_sid.get(&OutboxSubId(0)), Some(&relay_id));
+ assert!(!data.request_to_sid.contains_key(&OutboxSubId(1)));
+ }
+
+ /// When requesting multiple free passes, multiple subs are compacted
+ /// and all requests are consolidated into fewer relay subs.
+ #[test]
+ fn compact_multiple_subs() {
+ let mut data = CompactionData::default();
+ let mut guardian = SubPassGuardian::new(3);
+ let json_limit = 100000;
+ let mut subs = OutboxSubscriptions::default();
+ for i in 0..3 {
+ subs.new_subscription(
+ OutboxSubId(i),
+ SubscribeTask {
+ filters: vec![Filter::new().kinds(vec![i as u64 + 1]).build()],
+ relays: RelayUrlPkgs::new(HashSet::new()),
+ },
+ false,
+ );
+ }
+
+ // Create 3 subs and request 2 free in same session
+ let relay = CompactionRelay::new(None, &mut data, json_limit, &mut guardian, &subs);
+ let mut session = CompactionSession::default();
+ for i in 0..3 {
+ session.sub(OutboxSubId(i));
+ }
+ session.request_free_subs(2);
+ relay.ingest_session(session);
+
+ // Should compact down to 1 sub with all 3 requests
+ assert_eq!(data.relay_subs.len(), 1);
+ assert_eq!(guardian.available_passes(), 2);
+
+ let sub = data.relay_subs.values().next().unwrap();
+ assert_eq!(sub.requests.requests.len(), 3);
+ }
+}