notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

multicast.rs (6073B)


      1 use ewebsock::{WsEvent, WsMessage};
      2 use mio::net::UdpSocket;
      3 use std::io;
      4 use std::net::IpAddr;
      5 use std::net::{SocketAddr, SocketAddrV4};
      6 use std::time::{Duration, Instant};
      7 
      8 use crate::relay::{BroadcastCache, BroadcastRelay, RawEventData, RelayImplType};
      9 use crate::{EventClientMessage, RelayStatus, Result, Wakeup};
     10 use std::net::Ipv4Addr;
     11 use tracing::{debug, error};
     12 
     13 pub struct MulticastRelay {
     14     last_join: Instant,
     15     status: RelayStatus,
     16     address: SocketAddrV4,
     17     socket: UdpSocket,
     18     interface: Ipv4Addr,
     19 }
     20 
     21 impl MulticastRelay {
     22     pub fn new(address: SocketAddrV4, socket: UdpSocket, interface: Ipv4Addr) -> Self {
     23         let last_join = Instant::now();
     24         let status = RelayStatus::Connected;
     25         MulticastRelay {
     26             status,
     27             address,
     28             socket,
     29             interface,
     30             last_join,
     31         }
     32     }
     33 
     34     /// Multicast seems to fail every 260 seconds. We force a rejoin every 200 seconds or
     35     /// so to ensure we are always in the group
     36     pub fn rejoin(&mut self) -> Result<()> {
     37         self.last_join = Instant::now();
     38         self.status = RelayStatus::Disconnected;
     39         self.socket
     40             .leave_multicast_v4(self.address.ip(), &self.interface)?;
     41         self.socket
     42             .join_multicast_v4(self.address.ip(), &self.interface)?;
     43         self.status = RelayStatus::Connected;
     44         Ok(())
     45     }
     46 
     47     pub fn should_rejoin(&self) -> bool {
     48         (Instant::now() - self.last_join) >= Duration::from_secs(200)
     49     }
     50 
     51     pub fn try_recv(&self) -> Option<WsEvent> {
     52         let mut buffer = [0u8; 65535];
     53         // Read the size header
     54         match self.socket.recv_from(&mut buffer) {
     55             Ok((size, src)) => {
     56                 let parsed_size = u32::from_be_bytes(buffer[0..4].try_into().ok()?) as usize;
     57                 debug!("multicast: read size {} from start of header", size - 4);
     58 
     59                 if size != parsed_size + 4 {
     60                     error!(
     61                         "multicast: partial data received: expected {}, got {}",
     62                         parsed_size, size
     63                     );
     64                     return None;
     65                 }
     66 
     67                 let text = String::from_utf8_lossy(&buffer[4..size]);
     68                 debug!("multicast: received {} bytes from {}: {}", size, src, &text);
     69                 Some(WsEvent::Message(WsMessage::Text(text.to_string())))
     70             }
     71             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
     72                 // No data available, continue
     73                 None
     74             }
     75             Err(e) => {
     76                 error!("multicast: error receiving data: {}", e);
     77                 None
     78             }
     79         }
     80     }
     81 
     82     pub fn send(&self, msg: &EventClientMessage) -> Result<()> {
     83         let json = msg.to_json();
     84         let len = json.len();
     85 
     86         debug!("writing to multicast relay");
     87         let mut buf: Vec<u8> = Vec::with_capacity(4 + len);
     88 
     89         // Write the length of the message as 4 bytes (big-endian)
     90         buf.extend_from_slice(&(len as u32).to_be_bytes());
     91 
     92         // Append the JSON message bytes
     93         buf.extend_from_slice(json.as_bytes());
     94 
     95         self.socket.send_to(&buf, SocketAddr::V4(self.address))?;
     96         Ok(())
     97     }
     98 
     99     pub fn status(&self) -> RelayStatus {
    100         self.status
    101     }
    102 }
    103 
    104 pub fn setup_multicast_relay(
    105     wakeup: impl Fn() + Send + Sync + Clone + 'static,
    106 ) -> Result<MulticastRelay> {
    107     use mio::{Events, Interest, Poll, Token};
    108 
    109     let port = 9797;
    110     let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
    111     let multicast_ip = Ipv4Addr::new(239, 19, 88, 1);
    112 
    113     let mut socket = UdpSocket::bind(address)?;
    114     let interface = Ipv4Addr::UNSPECIFIED;
    115     let multicast_address = SocketAddrV4::new(multicast_ip, port);
    116 
    117     socket.join_multicast_v4(&multicast_ip, &interface)?;
    118 
    119     let mut poll = Poll::new()?;
    120     poll.registry().register(
    121         &mut socket,
    122         Token(0),
    123         Interest::READABLE | Interest::WRITABLE,
    124     )?;
    125 
    126     // wakeup our render thread when we have new stuff on the socket
    127     std::thread::spawn(move || {
    128         let mut events = Events::with_capacity(1);
    129         loop {
    130             if let Err(err) = poll.poll(&mut events, None) {
    131                 error!("multicast socket poll error: {err}. ending multicast poller.");
    132                 return;
    133             }
    134             wakeup();
    135 
    136             std::thread::yield_now();
    137         }
    138     });
    139 
    140     Ok(MulticastRelay::new(multicast_address, socket, interface))
    141 }
    142 /// MulticastRelayCache lazily initializes the multicast connection and buffers
    143 /// outbound events until a connection is available.
    144 #[derive(Default)]
    145 pub struct MulticastRelayCache {
    146     multicast: Option<MulticastRelay>,
    147     cache: BroadcastCache,
    148 }
    149 
    150 impl MulticastRelayCache {
    151     pub fn is_setup(&self) -> bool {
    152         self.multicast.is_some()
    153     }
    154 
    155     pub fn try_setup<W>(&mut self, wakeup: &W)
    156     where
    157         W: Wakeup,
    158     {
    159         let wake = wakeup.clone();
    160         let Ok(multicast) = setup_multicast_relay(move || wake.wake()) else {
    161             return;
    162         };
    163 
    164         self.multicast = Some(multicast);
    165     }
    166 
    167     pub fn broadcast(&mut self, msg: EventClientMessage) {
    168         BroadcastRelay::multicast(self.multicast.as_mut(), &mut self.cache).broadcast(msg);
    169     }
    170 
    171     #[profiling::function]
    172     pub fn try_recv<F>(&mut self, mut process: F)
    173     where
    174         for<'a> F: FnMut(RawEventData<'a>),
    175     {
    176         let Some(multicast) = &mut self.multicast else {
    177             return;
    178         };
    179 
    180         if multicast.should_rejoin() {
    181             if let Err(e) = multicast.rejoin() {
    182                 tracing::error!("multicast: rejoin error: {e}");
    183             }
    184         }
    185 
    186         BroadcastRelay::multicast(Some(multicast), &mut self.cache).try_flush_queue();
    187 
    188         let Some(WsEvent::Message(WsMessage::Text(text))) = multicast.try_recv() else {
    189             return;
    190         };
    191 
    192         process(RawEventData {
    193             url: "multicast",
    194             event_json: &text,
    195             relay_type: RelayImplType::Multicast,
    196         });
    197     }
    198 }