notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

broadcast.rs (2796B)


      1 use crate::{
      2     relay::{MulticastRelay, UnownedRelay, WebsocketRelay},
      3     ClientMessage, EventClientMessage, RelayStatus,
      4 };
      5 
      6 /// BroadcastCache stores queued events for relays that are temporarily disconnected.
      7 #[derive(Default)]
      8 pub struct BroadcastCache {
      9     to_send: Vec<EventClientMessage>,
     10 }
     11 
     12 /// BroadcastRelay sends events to either a websocket relay or the multicast relay
     13 /// while handling retries via the shared cache.
     14 pub struct BroadcastRelay<'a> {
     15     relay: Option<UnownedRelay<'a>>,
     16     cache: &'a mut BroadcastCache,
     17 }
     18 
     19 impl<'a> BroadcastRelay<'a> {
     20     pub fn websocket(
     21         websocket: Option<&'a mut WebsocketRelay>,
     22         cache: &'a mut BroadcastCache,
     23     ) -> Self {
     24         Self {
     25             relay: websocket.map(UnownedRelay::Websocket),
     26             cache,
     27         }
     28     }
     29 
     30     pub fn multicast(
     31         multicast: Option<&'a mut MulticastRelay>,
     32         cache: &'a mut BroadcastCache,
     33     ) -> Self {
     34         Self {
     35             relay: multicast.map(UnownedRelay::Multicast),
     36             cache,
     37         }
     38     }
     39 
     40     pub fn broadcast(&mut self, msg: EventClientMessage) {
     41         let Some(relay) = &mut self.relay else {
     42             self.cache.to_send.push(msg);
     43             return;
     44         };
     45 
     46         match relay {
     47             UnownedRelay::Websocket(websocket_relay) => {
     48                 if !websocket_relay.is_connected() {
     49                     self.cache.to_send.push(msg);
     50                     return;
     51                 }
     52 
     53                 websocket_relay.conn.send(&ClientMessage::Event(msg));
     54             }
     55             UnownedRelay::Multicast(multicast) => {
     56                 // Always queue if we're not connected.
     57                 if multicast.status() != RelayStatus::Connected {
     58                     self.cache.to_send.push(msg.clone());
     59                     return;
     60                 }
     61 
     62                 if multicast.send(&msg).is_err() {
     63                     self.cache.to_send.push(msg.clone());
     64                 }
     65             }
     66         }
     67     }
     68 
     69     #[profiling::function]
     70     pub fn try_flush_queue(&mut self) {
     71         let Some(relay) = &mut self.relay else {
     72             return;
     73         };
     74 
     75         match relay {
     76             UnownedRelay::Websocket(websocket) => {
     77                 if !websocket.is_connected() || self.cache.to_send.is_empty() {
     78                     return;
     79                 }
     80 
     81                 for item in self.cache.to_send.drain(..) {
     82                     websocket.conn.send(&ClientMessage::Event(item));
     83                 }
     84             }
     85             UnownedRelay::Multicast(multicast) => {
     86                 if multicast.status() != RelayStatus::Connected || self.cache.to_send.is_empty() {
     87                     return;
     88                 }
     89 
     90                 self.cache.to_send.retain(|m| multicast.send(m).is_err());
     91             }
     92         }
     93     }
     94 }