outbox_integration.rs (20949B)
1 //! Integration tests for the Outbox relay system 2 //! 3 //! These tests use `nostr-relay-builder::LocalRelay` to run a real relay on localhost 4 //! and test the full subscription lifecycle, EOSE propagation, and multi-relay coordination. 5 6 use enostr::{ 7 NormRelayUrl, OutboxPool, OutboxSessionHandler, OutboxSubId, RelayReqStatus, RelayStatus, 8 RelayUrlPkgs, Wakeup, 9 }; 10 use hashbrown::HashSet; 11 use nostr_relay_builder::{LocalRelay, RelayBuilder}; 12 use nostrdb::Filter; 13 use std::sync::Once; 14 use std::time::Duration; 15 16 static TRACING_INIT: Once = Once::new(); 17 18 /// Initialize tracing for tests (only runs once even if called multiple times) 19 fn init_tracing() { 20 TRACING_INIT.call_once(|| { 21 tracing_subscriber::fmt() 22 .with_env_filter( 23 tracing_subscriber::EnvFilter::from_default_env() 24 .add_directive("enostr=debug".parse().unwrap()), 25 ) 26 .with_test_writer() 27 .init(); 28 }); 29 } 30 31 /// A mock Wakeup implementation for integration tests 32 #[derive(Clone, Default)] 33 pub struct MockWakeup {} 34 35 impl Wakeup for MockWakeup { 36 fn wake(&self) {} 37 } 38 39 /// Helper to create a LocalRelay with default settings for tests. 40 /// Returns the relay handle (must be kept alive) and its normalized URL. 41 async fn create_test_relay() -> (LocalRelay, NormRelayUrl) { 42 let relay = LocalRelay::run(RelayBuilder::default()) 43 .await 44 .expect("failed to start relay"); 45 46 let url_str = relay.url(); 47 tracing::info!("LocalRelay listening at {}", url_str); 48 49 let url = NormRelayUrl::new(&url_str).expect("valid relay url"); 50 (relay, url) 51 } 52 53 /// Polls the pool until the provided predicate returns true or the attempt limit is reached. 54 /// Returns the attempt count and whether the predicate was ultimately satisfied. 55 async fn pump_pool_until<F>( 56 pool: &mut OutboxPool, 57 max_attempts: usize, 58 sleep_duration: Duration, 59 mut predicate: F, 60 ) -> bool 61 where 62 F: FnMut(&mut OutboxPool) -> bool, 63 { 64 let mut attempts = 0; 65 for attempt in 0..max_attempts { 66 pool.try_recv(10, |_| {}); 67 if predicate(pool) { 68 return true; 69 } 70 tokio::time::sleep(sleep_duration).await; 71 attempts = attempt; 72 } 73 74 tracing::trace!("completed pool pump in {attempts} attempts"); 75 76 predicate(pool) 77 } 78 79 async fn default_pool_pump<F>(pool: &mut OutboxPool, predicate: F) -> bool 80 where 81 F: FnMut(&mut OutboxPool) -> bool, 82 { 83 pump_pool_until(pool, 100, Duration::from_millis(15), predicate).await 84 } 85 86 // ==================== Full Subscription Lifecycle ==================== 87 88 #[tokio::test] 89 async fn full_subscription_lifecycle() { 90 init_tracing(); 91 92 // Start local relay 93 let (_relay, url) = create_test_relay().await; 94 95 let mut pool = OutboxPool::default(); 96 let wakeup = MockWakeup::default(); 97 98 // 1. Subscribe to the local relay 99 let mut urls = HashSet::new(); 100 urls.insert(url.clone()); 101 let url_pkgs = RelayUrlPkgs::new(urls); 102 103 let id = { 104 let mut session = pool.start_session(wakeup.clone()); 105 session.subscribe(trivial_filter(), url_pkgs) 106 }; // session dropped, REQ sent to relay 107 108 let has_eose = pump_pool_until(&mut pool, 50, Duration::from_millis(5), |pool| { 109 pool.has_eose(&id) 110 }) 111 .await; 112 113 assert!(has_eose, "should have received EOSE from relay"); 114 115 // 4. Unsubscribe 116 { 117 let mut session = pool.start_session(wakeup.clone()); 118 session.unsubscribe(id); 119 } 120 121 // 5. Verify cleaned up 122 let status = pool.status(&id); 123 assert!( 124 status.is_empty(), 125 "status should be empty after unsubscribe" 126 ); 127 } 128 129 // ==================== EOSE Flow End-to-End ==================== 130 131 #[tokio::test] 132 async fn eose_propagation_from_real_relay() { 133 let (_relay, url) = create_test_relay().await; 134 135 let mut pool = OutboxPool::default(); 136 137 // Subscribe with transparent mode (faster EOSE) 138 let mut urls = HashSet::new(); 139 urls.insert(url.clone()); 140 let mut url_pkgs = RelayUrlPkgs::new(urls); 141 url_pkgs.use_transparent = true; 142 143 let id = { 144 let mut session = pool.start_session(MockWakeup::default()); 145 session.subscribe( 146 vec![Filter::new().kinds(vec![1]).limit(10).build()], 147 url_pkgs, 148 ) 149 }; 150 151 let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await; 152 153 assert!(got_eose, "EOSE should propagate from relay to pool",); 154 } 155 156 // ==================== Multi-Relay Coordination ==================== 157 158 #[tokio::test] 159 async fn subscribe_to_multiple_relays() { 160 // Start two local relays 161 let (_relay1, url1) = create_test_relay().await; 162 let (_relay2, url2) = create_test_relay().await; 163 164 let mut pool = OutboxPool::default(); 165 let wakeup = MockWakeup::default(); 166 167 // Subscribe to both relays 168 let mut urls = HashSet::new(); 169 urls.insert(url1.clone()); 170 urls.insert(url2.clone()); 171 let url_pkgs = RelayUrlPkgs::new(urls); 172 173 let id = { 174 let mut session = pool.start_session(wakeup.clone()); 175 session.subscribe(vec![Filter::new().kinds(vec![1]).build()], url_pkgs) 176 }; 177 178 let got_eoses = pump_pool_until(&mut pool, 50, Duration::from_millis(5), |pool| { 179 pool.all_have_eose(&id) 180 }) 181 .await; 182 183 let status = pool.status(&id); 184 assert_eq!(status.len(), 2); 185 assert!(got_eoses, "should have eoses from both relays"); 186 } 187 188 // ==================== Modify Relays Mid-Subscription ==================== 189 190 #[tokio::test] 191 async fn modify_relays_adds_and_removes() { 192 init_tracing(); 193 194 let (_relay1, url1) = create_test_relay().await; 195 let (_relay2, url2) = create_test_relay().await; 196 197 let mut pool = OutboxPool::default(); 198 let wakeup = MockWakeup::default(); 199 200 // Start with relay1 only 201 let mut urls1 = HashSet::new(); 202 urls1.insert(url1.clone()); 203 204 let id = { 205 let mut session = pool.start_session(wakeup.clone()); 206 session.subscribe( 207 vec![Filter::new().kinds(vec![1]).build()], 208 RelayUrlPkgs::new(urls1), 209 ) 210 }; 211 212 { 213 let status = pool.status(&id); 214 assert_eq!(status.len(), 1); 215 let (url, res) = status.into_iter().next().unwrap(); 216 assert_eq!(*url, url1); 217 assert_eq!(res, RelayReqStatus::InitialQuery); 218 } 219 220 let all_eose = default_pool_pump(&mut pool, |pool| pool.all_have_eose(&id)).await; 221 assert!(all_eose); 222 223 { 224 let status = pool.status(&id); 225 assert_eq!(status.len(), 1); 226 let (url, _) = status.into_iter().next().unwrap(); 227 assert_eq!(*url, url1.clone()); 228 } 229 230 // Switch to relay2 only 231 let mut urls2 = HashSet::new(); 232 urls2.insert(url2.clone()); 233 234 { 235 let mut session = pool.start_session(wakeup.clone()); 236 session.modify_relays(id, urls2); 237 } 238 239 { 240 let status = pool.status(&id); 241 assert_eq!(status.len(), 1); 242 let (url, res) = status.into_iter().next().unwrap(); 243 assert_eq!(*url, url2); 244 assert_eq!(res, RelayReqStatus::InitialQuery); 245 } 246 247 let all_eose = default_pool_pump(&mut pool, |pool| pool.all_have_eose(&id)).await; 248 tracing::info!("pool status: {:?}", pool.status(&id)); 249 assert!(all_eose); 250 251 let status = pool.status(&id); 252 assert_eq!( 253 status.len(), 254 1, 255 "we are replacing relay {:?} with {:?}", 256 url1, 257 url2 258 ); 259 let (url, _) = status.into_iter().next().unwrap(); 260 assert_eq!( 261 *url, url2, 262 "we are replacing relay {:?} with {:?}", 263 url1, url2 264 ); 265 } 266 267 // ==================== Subscription with Filters ==================== 268 269 #[tokio::test] 270 async fn subscription_with_complex_filters() { 271 let (_relay, url) = create_test_relay().await; 272 273 let mut pool = OutboxPool::default(); 274 let wakeup = MockWakeup::default(); 275 276 let mut urls = HashSet::new(); 277 urls.insert(url.clone()); 278 let url_pkgs = RelayUrlPkgs::new(urls); 279 280 // Use a more complex filter 281 let filters = vec![ 282 Filter::new().kinds(vec![1]).build(), 283 Filter::new().kinds(vec![0]).build(), 284 Filter::new().kinds(vec![3]).build(), 285 Filter::new().kinds(vec![4]).limit(100).build(), 286 ]; 287 288 let id = { 289 let mut session = pool.start_session(wakeup.clone()); 290 session.subscribe(filters, url_pkgs) 291 }; 292 293 let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await; 294 assert!(got_eose, "should receive EOSE even with multiple filters"); 295 } 296 297 // ==================== Multiple Concurrent Subscriptions ==================== 298 299 #[tokio::test] 300 async fn multiple_concurrent_subscriptions() { 301 let (_relay, url) = create_test_relay().await; 302 303 let mut pool = OutboxPool::default(); 304 let wakeup = MockWakeup::default(); 305 306 let mut urls = HashSet::new(); 307 urls.insert(url.clone()); 308 309 // Create multiple subscriptions 310 let mut ids: Vec<OutboxSubId> = Vec::new(); 311 312 { 313 let mut session = pool.start_session(wakeup.clone()); 314 315 for kind in 0..5 { 316 let id = session.subscribe( 317 vec![Filter::new().kinds(vec![kind]).build()], 318 RelayUrlPkgs::new(urls.clone()), 319 ); 320 ids.push(id); 321 } 322 } 323 324 assert_eq!(ids.len(), 5); 325 326 let all_eose = default_pool_pump(&mut pool, |pool| { 327 ids.iter().filter(|id| pool.has_eose(id)).count() == 5 328 }) 329 .await; 330 331 assert!(all_eose, "at least one subscription should have EOSE"); 332 } 333 334 // ==================== Unsubscribe During Processing ==================== 335 336 #[tokio::test] 337 async fn unsubscribe_during_processing() { 338 let (_relay, url) = create_test_relay().await; 339 340 let mut pool = OutboxPool::default(); 341 342 let mut urls = HashSet::new(); 343 urls.insert(url.clone()); 344 let url_pkgs = RelayUrlPkgs::new(urls); 345 346 let id = { 347 let mut session = pool.start_session(MockWakeup::default()); 348 session.subscribe(vec![Filter::new().kinds(vec![1]).build()], url_pkgs) 349 }; 350 351 // Immediately unsubscribe 352 { 353 let mut session = pool.start_session(MockWakeup::default()); 354 session.unsubscribe(id); 355 } 356 357 let empty = default_pool_pump(&mut pool, |pool| pool.status(&id).is_empty()).await; 358 359 // Status should be empty after unsubscribe 360 assert!(empty, "status should be empty after unsubscribe"); 361 } 362 363 // ==================== Transparent vs Compaction Mode ==================== 364 365 #[tokio::test] 366 async fn transparent_mode_subscription() { 367 let (_relay, url) = create_test_relay().await; 368 369 let mut pool = OutboxPool::default(); 370 371 let mut urls = HashSet::new(); 372 urls.insert(url.clone()); 373 let mut url_pkgs = RelayUrlPkgs::new(urls); 374 url_pkgs.use_transparent = true; // Enable transparent mode 375 376 let id = { 377 let mut session = pool.start_session(MockWakeup::default()); 378 session.subscribe(trivial_filter(), url_pkgs) 379 }; 380 381 let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await; 382 assert!(got_eose, "transparent mode should receive EOSE"); 383 } 384 385 #[tokio::test] 386 async fn compaction_mode_subscription() { 387 let (_relay, url) = create_test_relay().await; 388 389 let mut pool = OutboxPool::default(); 390 391 let mut urls = HashSet::new(); 392 urls.insert(url.clone()); 393 let mut url_pkgs = RelayUrlPkgs::new(urls); 394 url_pkgs.use_transparent = false; // Compaction mode (default) 395 396 let id = { 397 let mut session = pool.start_session(MockWakeup::default()); 398 session.subscribe(trivial_filter(), url_pkgs) 399 }; 400 401 let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await; 402 assert!(got_eose, "compaction mode should receive EOSE"); 403 } 404 405 // ==================== Modify Filters Mid-Subscription ==================== 406 407 #[tokio::test] 408 async fn modify_filters_mid_subscription() { 409 let (_relay, url) = create_test_relay().await; 410 411 let mut pool = OutboxPool::default(); 412 413 let mut urls = HashSet::new(); 414 urls.insert(url.clone()); 415 let url_pkgs = RelayUrlPkgs::new(urls); 416 417 // Start with kind 1 418 let id = { 419 let mut session = pool.start_session(MockWakeup::default()); 420 session.subscribe(trivial_filter(), url_pkgs) 421 }; 422 423 // Modify to kind 4 424 { 425 let mut session = pool.start_session(MockWakeup::default()); 426 session.modify_filters(id, vec![Filter::new().kinds(vec![4]).limit(9).build()]); 427 } 428 429 let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await; 430 assert!(got_eose, "should receive EOSE"); 431 } 432 433 // ==================== Connection Resilience ==================== 434 435 fn trivial_filter() -> Vec<Filter> { 436 vec![Filter::new().kinds([1]).build()] 437 } 438 439 #[tokio::test] 440 async fn websocket_status_tracking() { 441 let (_relay, url) = create_test_relay().await; 442 443 let mut pool = OutboxPool::default(); 444 let wakeup = MockWakeup::default(); 445 446 let mut urls = HashSet::new(); 447 urls.insert(url.clone()); 448 let url_pkgs = RelayUrlPkgs::new(urls); 449 450 { 451 let mut session = pool.start_session(wakeup.clone()); 452 session.subscribe(trivial_filter(), url_pkgs); 453 } 454 455 // Check websocket statuses 456 let statuses = pool.websocket_statuses(); 457 // Should have at least one relay tracked 458 assert!(!statuses.is_empty(), "should track websocket statuses"); 459 } 460 461 // ==================== Failure Paths ==================== 462 463 /// Subscribing to an unreachable relay should remain disconnected and never report EOSE. 464 #[tokio::test] 465 async fn unreachable_relay_reports_disconnected_status() { 466 let mut pool = OutboxPool::default(); 467 let wakeup = MockWakeup::default(); 468 let unreachable = 469 NormRelayUrl::new("wss://127.0.0.1:6555").expect("valid unreachable relay url"); 470 471 let mut urls = HashSet::new(); 472 urls.insert(unreachable.clone()); 473 let url_pkgs = RelayUrlPkgs::new(urls); 474 475 let id = { 476 let mut session = pool.start_session(wakeup); 477 session.subscribe(trivial_filter(), url_pkgs) 478 }; 479 480 let got_eose = pump_pool_until(&mut pool, 10, Duration::from_millis(10), |pool| { 481 pool.has_eose(&id) 482 }) 483 .await; 484 assert!( 485 !got_eose, 486 "unreachable relay should never yield an EOSE signal" 487 ); 488 489 // Should survive keepalive pings even when no websocket is available. 490 pool.keepalive_ping(|| {}); 491 492 let statuses = pool.websocket_statuses(); 493 let status = statuses 494 .into_iter() 495 .find(|(relay_url, _)| *relay_url == &unreachable) 496 .map(|(_, status)| status) 497 .expect("missing unreachable relay status"); 498 assert_eq!(status, RelayStatus::Disconnected); 499 } 500 501 // ==================== Oneshot Subscription Removal After EOSE ==================== 502 503 /// Oneshot subscriptions should be removed from the pool after EOSE is received. 504 #[tokio::test] 505 async fn oneshot_subscription_removed_after_eose() { 506 let (_relay, url) = create_test_relay().await; 507 508 let mut pool = OutboxPool::default(); 509 510 let mut urls = HashSet::new(); 511 urls.insert(url.clone()); 512 let url_pkgs = RelayUrlPkgs::new(urls); 513 514 // Create a oneshot subscription via the handler, then export to get the ID 515 let id = { 516 let mut handler = pool.start_session(MockWakeup::default()); 517 handler.oneshot(trivial_filter(), url_pkgs); 518 let session = handler.export(); 519 // Get the ID from the session's tasks 520 let id = *session 521 .tasks 522 .keys() 523 .next() 524 .expect("oneshot should create a task"); 525 OutboxSessionHandler::import(&mut pool, session, MockWakeup::default()); 526 id 527 }; 528 529 // Verify subscription exists 530 let filters_before = pool.filters(&id); 531 assert!( 532 filters_before.is_some(), 533 "oneshot subscription should exist before EOSE" 534 ); 535 536 // Wait for EOSE 537 let got_eose = pump_pool_until(&mut pool, 50, Duration::from_millis(5), |pool| { 538 pool.has_eose(&id) 539 }) 540 .await; 541 assert!(got_eose, "should receive EOSE for oneshot subscription"); 542 543 // Trigger EOSE processing by starting an empty session 544 { 545 let _ = pool.start_session(MockWakeup::default()); 546 } 547 548 // Verify subscription was removed 549 let filters_after = pool.filters(&id); 550 assert!( 551 filters_after.is_none(), 552 "oneshot subscription should be removed after EOSE" 553 ); 554 } 555 556 /// Oneshot subscriptions across multiple relays should fully clean up after all EOSEs. 557 #[tokio::test] 558 async fn oneshot_multi_relay_fully_removed_after_eose() { 559 let (_relay1, url1) = create_test_relay().await; 560 let (_relay2, url2) = create_test_relay().await; 561 562 let mut pool = OutboxPool::default(); 563 564 let mut urls = HashSet::new(); 565 urls.insert(url1.clone()); 566 urls.insert(url2.clone()); 567 let url_pkgs = RelayUrlPkgs::new(urls); 568 569 let id = { 570 let mut handler = pool.start_session(MockWakeup::default()); 571 handler.oneshot(trivial_filter(), url_pkgs); 572 let session = handler.export(); 573 let id = *session 574 .tasks 575 .keys() 576 .next() 577 .expect("oneshot should create a task"); 578 OutboxSessionHandler::import(&mut pool, session, MockWakeup::default()); 579 id 580 }; 581 582 let got_all_eose = pump_pool_until(&mut pool, 100, Duration::from_millis(10), |pool| { 583 pool.all_have_eose(&id) 584 }) 585 .await; 586 assert!(got_all_eose, "oneshot should receive EOSE from all relays"); 587 588 { 589 let _ = pool.start_session(MockWakeup::default()); 590 } 591 592 assert!( 593 pool.filters(&id).is_none(), 594 "oneshot metadata should be removed after EOSE processing" 595 ); 596 assert!( 597 pool.status(&id).is_empty(), 598 "oneshot should be fully unsubscribed on all relays after EOSE processing" 599 ); 600 } 601 602 // ==================== Since Optimization After EOSE ==================== 603 604 fn filter_has_since(filter: &Filter) -> bool { 605 filter.since().is_some() 606 } 607 608 /// After EOSE is received, filters should have `since` applied for future re-subscriptions. 609 #[tokio::test] 610 async fn eose_applies_since_to_filters() { 611 let (_relay, url) = create_test_relay().await; 612 613 let mut pool = OutboxPool::default(); 614 615 // Subscribe with transparent mode (faster EOSE) 616 let mut urls = HashSet::new(); 617 urls.insert(url.clone()); 618 let mut url_pkgs = RelayUrlPkgs::new(urls); 619 url_pkgs.use_transparent = true; 620 621 let id = { 622 let mut session = pool.start_session(MockWakeup::default()); 623 session.subscribe( 624 vec![Filter::new().kinds(vec![1]).limit(10).build()], 625 url_pkgs, 626 ) 627 }; 628 629 // Verify filters don't have since initially 630 let initial_filters = pool.filters(&id).expect("subscription exists"); 631 assert!( 632 !filter_has_since(&initial_filters[0]), 633 "filters should not have since before EOSE" 634 ); 635 636 // Wait for EOSE 637 let got_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await; 638 assert!(got_eose, "should receive EOSE"); 639 640 // Create an empty session to trigger EOSE queue processing 641 // (ingest_session is called when the handler is dropped) 642 { 643 let _ = pool.start_session(MockWakeup::default()); 644 } 645 646 // After EOSE processing, filters should have since applied 647 let optimized_filters = pool.filters(&id).expect("subscription still exists"); 648 649 assert!( 650 filter_has_since(&optimized_filters[0]), 651 "filters should have since after EOSE" 652 ); 653 } 654 655 /// Since optimization should wait until every relay for the subscription reaches EOSE. 656 #[tokio::test] 657 async fn since_optimization_waits_for_all_relays_eose() { 658 let (_relay, live_url) = create_test_relay().await; 659 let dead_url = NormRelayUrl::new("wss://127.0.0.1:1").expect("valid dead relay url"); 660 661 let mut pool = OutboxPool::default(); 662 663 let mut urls = HashSet::new(); 664 urls.insert(live_url); 665 urls.insert(dead_url); 666 let mut url_pkgs = RelayUrlPkgs::new(urls); 667 url_pkgs.use_transparent = true; 668 669 let id = { 670 let mut session = pool.start_session(MockWakeup::default()); 671 session.subscribe( 672 vec![Filter::new().kinds(vec![1]).limit(10).build()], 673 url_pkgs, 674 ) 675 }; 676 677 let initial_filters = pool.filters(&id).expect("subscription exists"); 678 assert!( 679 !filter_has_since(&initial_filters[0]), 680 "filters should not have since before any EOSE" 681 ); 682 683 let got_any_eose = default_pool_pump(&mut pool, |pool| pool.has_eose(&id)).await; 684 assert!(got_any_eose, "live relay should produce EOSE"); 685 assert!( 686 !pool.all_have_eose(&id), 687 "all relays should not have EOSE when one relay is unreachable" 688 ); 689 690 // Trigger EOSE queue processing. 691 { 692 let _ = pool.start_session(MockWakeup::default()); 693 } 694 695 let filters = pool.filters(&id).expect("subscription still exists"); 696 assert!( 697 !filter_has_since(&filters[0]), 698 "since should not be optimized until every relay reaches EOSE" 699 ); 700 }