broadcast.rs (2796B)
1 use crate::{ 2 relay::{MulticastRelay, UnownedRelay, WebsocketRelay}, 3 ClientMessage, EventClientMessage, RelayStatus, 4 }; 5 6 /// BroadcastCache stores queued events for relays that are temporarily disconnected. 7 #[derive(Default)] 8 pub struct BroadcastCache { 9 to_send: Vec<EventClientMessage>, 10 } 11 12 /// BroadcastRelay sends events to either a websocket relay or the multicast relay 13 /// while handling retries via the shared cache. 14 pub struct BroadcastRelay<'a> { 15 relay: Option<UnownedRelay<'a>>, 16 cache: &'a mut BroadcastCache, 17 } 18 19 impl<'a> BroadcastRelay<'a> { 20 pub fn websocket( 21 websocket: Option<&'a mut WebsocketRelay>, 22 cache: &'a mut BroadcastCache, 23 ) -> Self { 24 Self { 25 relay: websocket.map(UnownedRelay::Websocket), 26 cache, 27 } 28 } 29 30 pub fn multicast( 31 multicast: Option<&'a mut MulticastRelay>, 32 cache: &'a mut BroadcastCache, 33 ) -> Self { 34 Self { 35 relay: multicast.map(UnownedRelay::Multicast), 36 cache, 37 } 38 } 39 40 pub fn broadcast(&mut self, msg: EventClientMessage) { 41 let Some(relay) = &mut self.relay else { 42 self.cache.to_send.push(msg); 43 return; 44 }; 45 46 match relay { 47 UnownedRelay::Websocket(websocket_relay) => { 48 if !websocket_relay.is_connected() { 49 self.cache.to_send.push(msg); 50 return; 51 } 52 53 websocket_relay.conn.send(&ClientMessage::Event(msg)); 54 } 55 UnownedRelay::Multicast(multicast) => { 56 // Always queue if we're not connected. 57 if multicast.status() != RelayStatus::Connected { 58 self.cache.to_send.push(msg.clone()); 59 return; 60 } 61 62 if multicast.send(&msg).is_err() { 63 self.cache.to_send.push(msg.clone()); 64 } 65 } 66 } 67 } 68 69 #[profiling::function] 70 pub fn try_flush_queue(&mut self) { 71 let Some(relay) = &mut self.relay else { 72 return; 73 }; 74 75 match relay { 76 UnownedRelay::Websocket(websocket) => { 77 if !websocket.is_connected() || self.cache.to_send.is_empty() { 78 return; 79 } 80 81 for item in self.cache.to_send.drain(..) { 82 websocket.conn.send(&ClientMessage::Event(item)); 83 } 84 } 85 UnownedRelay::Multicast(multicast) => { 86 if multicast.status() != RelayStatus::Connected || self.cache.to_send.is_empty() { 87 return; 88 } 89 90 self.cache.to_send.retain(|m| multicast.send(m).is_err()); 91 } 92 } 93 } 94 }