notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

outbox_integration.rs (20949B)


      1 //! Integration tests for the Outbox relay system
      2 //!
      3 //! These tests use `nostr-relay-builder::LocalRelay` to run a real relay on localhost
      4 //! and test the full subscription lifecycle, EOSE propagation, and multi-relay coordination.
      5 
      6 use enostr::{
      7     NormRelayUrl, OutboxPool, OutboxSessionHandler, OutboxSubId, RelayReqStatus, RelayStatus,
      8     RelayUrlPkgs, Wakeup,
      9 };
     10 use hashbrown::HashSet;
     11 use nostr_relay_builder::{LocalRelay, RelayBuilder};
     12 use nostrdb::Filter;
     13 use std::sync::Once;
     14 use std::time::Duration;
     15 
     16 static TRACING_INIT: Once = Once::new();
     17 
     18 /// Initialize tracing for tests (only runs once even if called multiple times)
     19 fn init_tracing() {
     20     TRACING_INIT.call_once(|| {
     21         tracing_subscriber::fmt()
     22             .with_env_filter(
     23                 tracing_subscriber::EnvFilter::from_default_env()
     24                     .add_directive("enostr=debug".parse().unwrap()),
     25             )
     26             .with_test_writer()
     27             .init();
     28     });
     29 }
     30 
     31 /// A mock Wakeup implementation for integration tests
     32 #[derive(Clone, Default)]
     33 pub struct MockWakeup {}
     34 
     35 impl Wakeup for MockWakeup {
     36     fn wake(&self) {}
     37 }
     38 
     39 /// Helper to create a LocalRelay with default settings for tests.
     40 /// Returns the relay handle (must be kept alive) and its normalized URL.
     41 async fn create_test_relay() -> (LocalRelay, NormRelayUrl) {
     42     let relay = LocalRelay::run(RelayBuilder::default())
     43         .await
     44         .expect("failed to start relay");
     45 
     46     let url_str = relay.url();
     47     tracing::info!("LocalRelay listening at {}", url_str);
     48 
     49     let url = NormRelayUrl::new(&url_str).expect("valid relay url");
     50     (relay, url)
     51 }
     52 
     53 /// Polls the pool until the provided predicate returns true or the attempt limit is reached.
     54 /// Returns the attempt count and whether the predicate was ultimately satisfied.
     55 async fn pump_pool_until<F>(
     56     pool: &mut OutboxPool,
     57     max_attempts: usize,
     58     sleep_duration: Duration,
     59     mut predicate: F,
     60 ) -> bool
     61 where
     62     F: FnMut(&mut OutboxPool) -> bool,
     63 {
     64     let mut attempts = 0;
     65     for attempt in 0..max_attempts {
     66         pool.try_recv(10, |_| {});
     67         if predicate(pool) {
     68             return true;
     69         }
     70         tokio::time::sleep(sleep_duration).await;
     71         attempts = attempt;
     72     }
     73 
     74     tracing::trace!("completed pool pump in {attempts} attempts");
     75 
     76     predicate(pool)
     77 }
     78 
     79 async fn default_pool_pump<F>(pool: &mut OutboxPool, predicate: F) -> bool
     80 where
     81     F: FnMut(&mut OutboxPool) -> bool,
     82 {
     83     pump_pool_until(pool, 100, Duration::from_millis(15), predicate).await
     84 }
     85 
     86 // ==================== Full Subscription Lifecycle ====================
     87 
     88 #[tokio::test]
     89 async fn full_subscription_lifecycle() {
     90     init_tracing();
     91 
     92     // Start local relay
     93     let (_relay, url) = create_test_relay().await;
     94 
     95     let mut pool = OutboxPool::default();
     96     let wakeup = MockWakeup::default();
     97 
     98     // 1. Subscribe to the local relay
     99     let mut urls = HashSet::new();
    100     urls.insert(url.clone());
    101     let url_pkgs = RelayUrlPkgs::new(urls);
    102 
    103     let id = {
    104         let mut session = pool.start_session(wakeup.clone());
    105         session.subscribe(trivial_filter(), url_pkgs)
    106     }; // session dropped, REQ sent to relay
    107 
    108     let has_eose = pump_pool_until(&mut pool, 50, Duration::from_millis(5), |pool| {
    109         pool.has_eose(&id)
    110     })
    111     .await;
    112 
    113     assert!(has_eose, "should have received EOSE from relay");
    114 
    115     // 4. Unsubscribe
    116     {
    117         let mut session = pool.start_session(wakeup.clone());
    118         session.unsubscribe(id);
    119     }
    120 
    121     // 5. Verify cleaned up
    122     let status = pool.status(&id);
    123     assert!(
    124         status.is_empty(),
    125         "status should be empty after unsubscribe"
    126     );
    127 }
    128 
    129 // ==================== EOSE Flow End-to-End ====================
    130 
    131 #[tokio::test]
    132 async fn eose_propagation_from_real_relay() {
    133     let (_relay, url) = create_test_relay().await;
    134 
    135     let mut pool = OutboxPool::default();
    136 
    137     // Subscribe with transparent mode (faster EOSE)
    138     let mut urls = HashSet::new();
    139     urls.insert(url.clone());
    140     let mut url_pkgs = RelayUrlPkgs::new(urls);
    141     url_pkgs.use_transparent = true;
    142 
    143     let id = {
    144         let mut session = pool.start_session(MockWakeup::default());
    145         session.subscribe(
    146             vec![Filter::new().kinds(vec![1]).limit(10).build()],
    147             url_pkgs,
    148         )
    149     };
    150 
    151     let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await;
    152 
    153     assert!(got_eose, "EOSE should propagate from relay to pool",);
    154 }
    155 
    156 // ==================== Multi-Relay Coordination ====================
    157 
    158 #[tokio::test]
    159 async fn subscribe_to_multiple_relays() {
    160     // Start two local relays
    161     let (_relay1, url1) = create_test_relay().await;
    162     let (_relay2, url2) = create_test_relay().await;
    163 
    164     let mut pool = OutboxPool::default();
    165     let wakeup = MockWakeup::default();
    166 
    167     // Subscribe to both relays
    168     let mut urls = HashSet::new();
    169     urls.insert(url1.clone());
    170     urls.insert(url2.clone());
    171     let url_pkgs = RelayUrlPkgs::new(urls);
    172 
    173     let id = {
    174         let mut session = pool.start_session(wakeup.clone());
    175         session.subscribe(vec![Filter::new().kinds(vec![1]).build()], url_pkgs)
    176     };
    177 
    178     let got_eoses = pump_pool_until(&mut pool, 50, Duration::from_millis(5), |pool| {
    179         pool.all_have_eose(&id)
    180     })
    181     .await;
    182 
    183     let status = pool.status(&id);
    184     assert_eq!(status.len(), 2);
    185     assert!(got_eoses, "should have eoses from both relays");
    186 }
    187 
    188 // ==================== Modify Relays Mid-Subscription ====================
    189 
    190 #[tokio::test]
    191 async fn modify_relays_adds_and_removes() {
    192     init_tracing();
    193 
    194     let (_relay1, url1) = create_test_relay().await;
    195     let (_relay2, url2) = create_test_relay().await;
    196 
    197     let mut pool = OutboxPool::default();
    198     let wakeup = MockWakeup::default();
    199 
    200     // Start with relay1 only
    201     let mut urls1 = HashSet::new();
    202     urls1.insert(url1.clone());
    203 
    204     let id = {
    205         let mut session = pool.start_session(wakeup.clone());
    206         session.subscribe(
    207             vec![Filter::new().kinds(vec![1]).build()],
    208             RelayUrlPkgs::new(urls1),
    209         )
    210     };
    211 
    212     {
    213         let status = pool.status(&id);
    214         assert_eq!(status.len(), 1);
    215         let (url, res) = status.into_iter().next().unwrap();
    216         assert_eq!(*url, url1);
    217         assert_eq!(res, RelayReqStatus::InitialQuery);
    218     }
    219 
    220     let all_eose = default_pool_pump(&mut pool, |pool| pool.all_have_eose(&id)).await;
    221     assert!(all_eose);
    222 
    223     {
    224         let status = pool.status(&id);
    225         assert_eq!(status.len(), 1);
    226         let (url, _) = status.into_iter().next().unwrap();
    227         assert_eq!(*url, url1.clone());
    228     }
    229 
    230     // Switch to relay2 only
    231     let mut urls2 = HashSet::new();
    232     urls2.insert(url2.clone());
    233 
    234     {
    235         let mut session = pool.start_session(wakeup.clone());
    236         session.modify_relays(id, urls2);
    237     }
    238 
    239     {
    240         let status = pool.status(&id);
    241         assert_eq!(status.len(), 1);
    242         let (url, res) = status.into_iter().next().unwrap();
    243         assert_eq!(*url, url2);
    244         assert_eq!(res, RelayReqStatus::InitialQuery);
    245     }
    246 
    247     let all_eose = default_pool_pump(&mut pool, |pool| pool.all_have_eose(&id)).await;
    248     tracing::info!("pool status: {:?}", pool.status(&id));
    249     assert!(all_eose);
    250 
    251     let status = pool.status(&id);
    252     assert_eq!(
    253         status.len(),
    254         1,
    255         "we are replacing relay {:?} with {:?}",
    256         url1,
    257         url2
    258     );
    259     let (url, _) = status.into_iter().next().unwrap();
    260     assert_eq!(
    261         *url, url2,
    262         "we are replacing relay {:?} with {:?}",
    263         url1, url2
    264     );
    265 }
    266 
    267 // ==================== Subscription with Filters ====================
    268 
    269 #[tokio::test]
    270 async fn subscription_with_complex_filters() {
    271     let (_relay, url) = create_test_relay().await;
    272 
    273     let mut pool = OutboxPool::default();
    274     let wakeup = MockWakeup::default();
    275 
    276     let mut urls = HashSet::new();
    277     urls.insert(url.clone());
    278     let url_pkgs = RelayUrlPkgs::new(urls);
    279 
    280     // Use a more complex filter
    281     let filters = vec![
    282         Filter::new().kinds(vec![1]).build(),
    283         Filter::new().kinds(vec![0]).build(),
    284         Filter::new().kinds(vec![3]).build(),
    285         Filter::new().kinds(vec![4]).limit(100).build(),
    286     ];
    287 
    288     let id = {
    289         let mut session = pool.start_session(wakeup.clone());
    290         session.subscribe(filters, url_pkgs)
    291     };
    292 
    293     let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await;
    294     assert!(got_eose, "should receive EOSE even with multiple filters");
    295 }
    296 
    297 // ==================== Multiple Concurrent Subscriptions ====================
    298 
    299 #[tokio::test]
    300 async fn multiple_concurrent_subscriptions() {
    301     let (_relay, url) = create_test_relay().await;
    302 
    303     let mut pool = OutboxPool::default();
    304     let wakeup = MockWakeup::default();
    305 
    306     let mut urls = HashSet::new();
    307     urls.insert(url.clone());
    308 
    309     // Create multiple subscriptions
    310     let mut ids: Vec<OutboxSubId> = Vec::new();
    311 
    312     {
    313         let mut session = pool.start_session(wakeup.clone());
    314 
    315         for kind in 0..5 {
    316             let id = session.subscribe(
    317                 vec![Filter::new().kinds(vec![kind]).build()],
    318                 RelayUrlPkgs::new(urls.clone()),
    319             );
    320             ids.push(id);
    321         }
    322     }
    323 
    324     assert_eq!(ids.len(), 5);
    325 
    326     let all_eose = default_pool_pump(&mut pool, |pool| {
    327         ids.iter().filter(|id| pool.has_eose(id)).count() == 5
    328     })
    329     .await;
    330 
    331     assert!(all_eose, "at least one subscription should have EOSE");
    332 }
    333 
    334 // ==================== Unsubscribe During Processing ====================
    335 
    336 #[tokio::test]
    337 async fn unsubscribe_during_processing() {
    338     let (_relay, url) = create_test_relay().await;
    339 
    340     let mut pool = OutboxPool::default();
    341 
    342     let mut urls = HashSet::new();
    343     urls.insert(url.clone());
    344     let url_pkgs = RelayUrlPkgs::new(urls);
    345 
    346     let id = {
    347         let mut session = pool.start_session(MockWakeup::default());
    348         session.subscribe(vec![Filter::new().kinds(vec![1]).build()], url_pkgs)
    349     };
    350 
    351     // Immediately unsubscribe
    352     {
    353         let mut session = pool.start_session(MockWakeup::default());
    354         session.unsubscribe(id);
    355     }
    356 
    357     let empty = default_pool_pump(&mut pool, |pool| pool.status(&id).is_empty()).await;
    358 
    359     // Status should be empty after unsubscribe
    360     assert!(empty, "status should be empty after unsubscribe");
    361 }
    362 
    363 // ==================== Transparent vs Compaction Mode ====================
    364 
    365 #[tokio::test]
    366 async fn transparent_mode_subscription() {
    367     let (_relay, url) = create_test_relay().await;
    368 
    369     let mut pool = OutboxPool::default();
    370 
    371     let mut urls = HashSet::new();
    372     urls.insert(url.clone());
    373     let mut url_pkgs = RelayUrlPkgs::new(urls);
    374     url_pkgs.use_transparent = true; // Enable transparent mode
    375 
    376     let id = {
    377         let mut session = pool.start_session(MockWakeup::default());
    378         session.subscribe(trivial_filter(), url_pkgs)
    379     };
    380 
    381     let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await;
    382     assert!(got_eose, "transparent mode should receive EOSE");
    383 }
    384 
    385 #[tokio::test]
    386 async fn compaction_mode_subscription() {
    387     let (_relay, url) = create_test_relay().await;
    388 
    389     let mut pool = OutboxPool::default();
    390 
    391     let mut urls = HashSet::new();
    392     urls.insert(url.clone());
    393     let mut url_pkgs = RelayUrlPkgs::new(urls);
    394     url_pkgs.use_transparent = false; // Compaction mode (default)
    395 
    396     let id = {
    397         let mut session = pool.start_session(MockWakeup::default());
    398         session.subscribe(trivial_filter(), url_pkgs)
    399     };
    400 
    401     let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await;
    402     assert!(got_eose, "compaction mode should receive EOSE");
    403 }
    404 
    405 // ==================== Modify Filters Mid-Subscription ====================
    406 
    407 #[tokio::test]
    408 async fn modify_filters_mid_subscription() {
    409     let (_relay, url) = create_test_relay().await;
    410 
    411     let mut pool = OutboxPool::default();
    412 
    413     let mut urls = HashSet::new();
    414     urls.insert(url.clone());
    415     let url_pkgs = RelayUrlPkgs::new(urls);
    416 
    417     // Start with kind 1
    418     let id = {
    419         let mut session = pool.start_session(MockWakeup::default());
    420         session.subscribe(trivial_filter(), url_pkgs)
    421     };
    422 
    423     // Modify to kind 4
    424     {
    425         let mut session = pool.start_session(MockWakeup::default());
    426         session.modify_filters(id, vec![Filter::new().kinds(vec![4]).limit(9).build()]);
    427     }
    428 
    429     let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await;
    430     assert!(got_eose, "should receive EOSE");
    431 }
    432 
    433 // ==================== Connection Resilience ====================
    434 
    435 fn trivial_filter() -> Vec<Filter> {
    436     vec![Filter::new().kinds([1]).build()]
    437 }
    438 
    439 #[tokio::test]
    440 async fn websocket_status_tracking() {
    441     let (_relay, url) = create_test_relay().await;
    442 
    443     let mut pool = OutboxPool::default();
    444     let wakeup = MockWakeup::default();
    445 
    446     let mut urls = HashSet::new();
    447     urls.insert(url.clone());
    448     let url_pkgs = RelayUrlPkgs::new(urls);
    449 
    450     {
    451         let mut session = pool.start_session(wakeup.clone());
    452         session.subscribe(trivial_filter(), url_pkgs);
    453     }
    454 
    455     // Check websocket statuses
    456     let statuses = pool.websocket_statuses();
    457     // Should have at least one relay tracked
    458     assert!(!statuses.is_empty(), "should track websocket statuses");
    459 }
    460 
    461 // ==================== Failure Paths ====================
    462 
    463 /// Subscribing to an unreachable relay should remain disconnected and never report EOSE.
    464 #[tokio::test]
    465 async fn unreachable_relay_reports_disconnected_status() {
    466     let mut pool = OutboxPool::default();
    467     let wakeup = MockWakeup::default();
    468     let unreachable =
    469         NormRelayUrl::new("wss://127.0.0.1:6555").expect("valid unreachable relay url");
    470 
    471     let mut urls = HashSet::new();
    472     urls.insert(unreachable.clone());
    473     let url_pkgs = RelayUrlPkgs::new(urls);
    474 
    475     let id = {
    476         let mut session = pool.start_session(wakeup);
    477         session.subscribe(trivial_filter(), url_pkgs)
    478     };
    479 
    480     let got_eose = pump_pool_until(&mut pool, 10, Duration::from_millis(10), |pool| {
    481         pool.has_eose(&id)
    482     })
    483     .await;
    484     assert!(
    485         !got_eose,
    486         "unreachable relay should never yield an EOSE signal"
    487     );
    488 
    489     // Should survive keepalive pings even when no websocket is available.
    490     pool.keepalive_ping(|| {});
    491 
    492     let statuses = pool.websocket_statuses();
    493     let status = statuses
    494         .into_iter()
    495         .find(|(relay_url, _)| *relay_url == &unreachable)
    496         .map(|(_, status)| status)
    497         .expect("missing unreachable relay status");
    498     assert_eq!(status, RelayStatus::Disconnected);
    499 }
    500 
    501 // ==================== Oneshot Subscription Removal After EOSE ====================
    502 
    503 /// Oneshot subscriptions should be removed from the pool after EOSE is received.
    504 #[tokio::test]
    505 async fn oneshot_subscription_removed_after_eose() {
    506     let (_relay, url) = create_test_relay().await;
    507 
    508     let mut pool = OutboxPool::default();
    509 
    510     let mut urls = HashSet::new();
    511     urls.insert(url.clone());
    512     let url_pkgs = RelayUrlPkgs::new(urls);
    513 
    514     // Create a oneshot subscription via the handler, then export to get the ID
    515     let id = {
    516         let mut handler = pool.start_session(MockWakeup::default());
    517         handler.oneshot(trivial_filter(), url_pkgs);
    518         let session = handler.export();
    519         // Get the ID from the session's tasks
    520         let id = *session
    521             .tasks
    522             .keys()
    523             .next()
    524             .expect("oneshot should create a task");
    525         OutboxSessionHandler::import(&mut pool, session, MockWakeup::default());
    526         id
    527     };
    528 
    529     // Verify subscription exists
    530     let filters_before = pool.filters(&id);
    531     assert!(
    532         filters_before.is_some(),
    533         "oneshot subscription should exist before EOSE"
    534     );
    535 
    536     // Wait for EOSE
    537     let got_eose = pump_pool_until(&mut pool, 50, Duration::from_millis(5), |pool| {
    538         pool.has_eose(&id)
    539     })
    540     .await;
    541     assert!(got_eose, "should receive EOSE for oneshot subscription");
    542 
    543     // Trigger EOSE processing by starting an empty session
    544     {
    545         let _ = pool.start_session(MockWakeup::default());
    546     }
    547 
    548     // Verify subscription was removed
    549     let filters_after = pool.filters(&id);
    550     assert!(
    551         filters_after.is_none(),
    552         "oneshot subscription should be removed after EOSE"
    553     );
    554 }
    555 
    556 /// Oneshot subscriptions across multiple relays should fully clean up after all EOSEs.
    557 #[tokio::test]
    558 async fn oneshot_multi_relay_fully_removed_after_eose() {
    559     let (_relay1, url1) = create_test_relay().await;
    560     let (_relay2, url2) = create_test_relay().await;
    561 
    562     let mut pool = OutboxPool::default();
    563 
    564     let mut urls = HashSet::new();
    565     urls.insert(url1.clone());
    566     urls.insert(url2.clone());
    567     let url_pkgs = RelayUrlPkgs::new(urls);
    568 
    569     let id = {
    570         let mut handler = pool.start_session(MockWakeup::default());
    571         handler.oneshot(trivial_filter(), url_pkgs);
    572         let session = handler.export();
    573         let id = *session
    574             .tasks
    575             .keys()
    576             .next()
    577             .expect("oneshot should create a task");
    578         OutboxSessionHandler::import(&mut pool, session, MockWakeup::default());
    579         id
    580     };
    581 
    582     let got_all_eose = pump_pool_until(&mut pool, 100, Duration::from_millis(10), |pool| {
    583         pool.all_have_eose(&id)
    584     })
    585     .await;
    586     assert!(got_all_eose, "oneshot should receive EOSE from all relays");
    587 
    588     {
    589         let _ = pool.start_session(MockWakeup::default());
    590     }
    591 
    592     assert!(
    593         pool.filters(&id).is_none(),
    594         "oneshot metadata should be removed after EOSE processing"
    595     );
    596     assert!(
    597         pool.status(&id).is_empty(),
    598         "oneshot should be fully unsubscribed on all relays after EOSE processing"
    599     );
    600 }
    601 
    602 // ==================== Since Optimization After EOSE ====================
    603 
    604 fn filter_has_since(filter: &Filter) -> bool {
    605     filter.since().is_some()
    606 }
    607 
    608 /// After EOSE is received, filters should have `since` applied for future re-subscriptions.
    609 #[tokio::test]
    610 async fn eose_applies_since_to_filters() {
    611     let (_relay, url) = create_test_relay().await;
    612 
    613     let mut pool = OutboxPool::default();
    614 
    615     // Subscribe with transparent mode (faster EOSE)
    616     let mut urls = HashSet::new();
    617     urls.insert(url.clone());
    618     let mut url_pkgs = RelayUrlPkgs::new(urls);
    619     url_pkgs.use_transparent = true;
    620 
    621     let id = {
    622         let mut session = pool.start_session(MockWakeup::default());
    623         session.subscribe(
    624             vec![Filter::new().kinds(vec![1]).limit(10).build()],
    625             url_pkgs,
    626         )
    627     };
    628 
    629     // Verify filters don't have since initially
    630     let initial_filters = pool.filters(&id).expect("subscription exists");
    631     assert!(
    632         !filter_has_since(&initial_filters[0]),
    633         "filters should not have since before EOSE"
    634     );
    635 
    636     // Wait for EOSE
    637     let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await;
    638     assert!(got_eose, "should receive EOSE");
    639 
    640     // Create an empty session to trigger EOSE queue processing
    641     // (ingest_session is called when the handler is dropped)
    642     {
    643         let _ = pool.start_session(MockWakeup::default());
    644     }
    645 
    646     // After EOSE processing, filters should have since applied
    647     let optimized_filters = pool.filters(&id).expect("subscription still exists");
    648 
    649     assert!(
    650         filter_has_since(&optimized_filters[0]),
    651         "filters should have since after EOSE"
    652     );
    653 }
    654 
    655 /// Since optimization should wait until every relay for the subscription reaches EOSE.
    656 #[tokio::test]
    657 async fn since_optimization_waits_for_all_relays_eose() {
    658     let (_relay, live_url) = create_test_relay().await;
    659     let dead_url = NormRelayUrl::new("wss://127.0.0.1:1").expect("valid dead relay url");
    660 
    661     let mut pool = OutboxPool::default();
    662 
    663     let mut urls = HashSet::new();
    664     urls.insert(live_url);
    665     urls.insert(dead_url);
    666     let mut url_pkgs = RelayUrlPkgs::new(urls);
    667     url_pkgs.use_transparent = true;
    668 
    669     let id = {
    670         let mut session = pool.start_session(MockWakeup::default());
    671         session.subscribe(
    672             vec![Filter::new().kinds(vec![1]).limit(10).build()],
    673             url_pkgs,
    674         )
    675     };
    676 
    677     let initial_filters = pool.filters(&id).expect("subscription exists");
    678     assert!(
    679         !filter_has_since(&initial_filters[0]),
    680         "filters should not have since before any EOSE"
    681     );
    682 
    683     let got_any_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await;
    684     assert!(got_any_eose, "live relay should produce EOSE");
    685     assert!(
    686         !pool.all_have_eose(&id),
    687         "all relays should not have EOSE when one relay is unreachable"
    688     );
    689 
    690     // Trigger EOSE queue processing.
    691     {
    692         let _ = pool.start_session(MockWakeup::default());
    693     }
    694 
    695     let filters = pool.filters(&id).expect("subscription still exists");
    696     assert!(
    697         !filter_has_since(&filters[0]),
    698         "since should not be optimized until every relay reaches EOSE"
    699     );
    700 }