notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit f7f3c57a8e590017ae925464188b944b37e4fa28
parent 0e4bce9ac885c3506f0573c9c91c35e3e5aad3ca
Author: kernelkind <kernelkind@gmail.com>
Date:   Sun,  1 Feb 2026 22:04:42 -0500

feat(outbox): add `MulticastRelayCache`

Signed-off-by: kernelkind <kernelkind@gmail.com>

Diffstat:
Mcrates/enostr/src/relay/mod.rs | 2+-
Mcrates/enostr/src/relay/multicast.rs | 60+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 60 insertions(+), 2 deletions(-)

diff --git a/crates/enostr/src/relay/mod.rs b/crates/enostr/src/relay/mod.rs @@ -14,7 +14,7 @@ pub use identity::{ pub use limits::{ RelayCoordinatorLimits, RelayLimitations, SubPass, SubPassGuardian, SubPassRevocation, }; -pub use multicast::MulticastRelay; +pub use multicast::{MulticastRelay, MulticastRelayCache}; use nostrdb::Filter; pub use websocket::{WebsocketConn, WebsocketRelay}; diff --git a/crates/enostr/src/relay/multicast.rs b/crates/enostr/src/relay/multicast.rs @@ -5,7 +5,8 @@ use std::net::IpAddr; use std::net::{SocketAddr, SocketAddrV4}; use std::time::{Duration, Instant}; -use crate::{EventClientMessage, RelayStatus, Result}; +use crate::relay::{BroadcastCache, BroadcastRelay, RawEventData, RelayImplType}; +use crate::{EventClientMessage, RelayStatus, Result, Wakeup}; use std::net::Ipv4Addr; use tracing::{debug, error}; @@ -138,3 +139,60 @@ pub fn setup_multicast_relay( Ok(MulticastRelay::new(multicast_address, socket, interface)) } +/// MulticastRelayCache lazily initializes the multicast connection and buffers +/// outbound events until a connection is available. +#[derive(Default)] +pub struct MulticastRelayCache { + multicast: Option<MulticastRelay>, + cache: BroadcastCache, +} + +impl MulticastRelayCache { + pub fn is_setup(&self) -> bool { + self.multicast.is_some() + } + + pub fn try_setup<W>(&mut self, wakeup: &W) + where + W: Wakeup, + { + let wake = wakeup.clone(); + let Ok(multicast) = setup_multicast_relay(move || wake.wake()) else { + return; + }; + + self.multicast = Some(multicast); + } + + pub fn broadcast(&mut self, msg: EventClientMessage) { + BroadcastRelay::multicast(self.multicast.as_mut(), &mut self.cache).broadcast(msg); + } + + #[profiling::function] + pub fn try_recv<F>(&mut self, mut process: F) + where + for<'a> F: FnMut(RawEventData<'a>), + { + let Some(multicast) = &mut self.multicast else { + return; + }; + + if multicast.should_rejoin() { + if let Err(e) = multicast.rejoin() { + tracing::error!("multicast: rejoin error: {e}"); + } + } + + BroadcastRelay::multicast(Some(multicast), &mut self.cache).try_flush_queue(); + + let Some(WsEvent::Message(WsMessage::Text(text))) = multicast.try_recv() else { + return; + }; + + process(RawEventData { + url: "multicast", + event_json: &text, + relay_type: RelayImplType::Multicast, + }); + } +}