multicast.rs (6073B)
1 use ewebsock::{WsEvent, WsMessage}; 2 use mio::net::UdpSocket; 3 use std::io; 4 use std::net::IpAddr; 5 use std::net::{SocketAddr, SocketAddrV4}; 6 use std::time::{Duration, Instant}; 7 8 use crate::relay::{BroadcastCache, BroadcastRelay, RawEventData, RelayImplType}; 9 use crate::{EventClientMessage, RelayStatus, Result, Wakeup}; 10 use std::net::Ipv4Addr; 11 use tracing::{debug, error}; 12 13 pub struct MulticastRelay { 14 last_join: Instant, 15 status: RelayStatus, 16 address: SocketAddrV4, 17 socket: UdpSocket, 18 interface: Ipv4Addr, 19 } 20 21 impl MulticastRelay { 22 pub fn new(address: SocketAddrV4, socket: UdpSocket, interface: Ipv4Addr) -> Self { 23 let last_join = Instant::now(); 24 let status = RelayStatus::Connected; 25 MulticastRelay { 26 status, 27 address, 28 socket, 29 interface, 30 last_join, 31 } 32 } 33 34 /// Multicast seems to fail every 260 seconds. We force a rejoin every 200 seconds or 35 /// so to ensure we are always in the group 36 pub fn rejoin(&mut self) -> Result<()> { 37 self.last_join = Instant::now(); 38 self.status = RelayStatus::Disconnected; 39 self.socket 40 .leave_multicast_v4(self.address.ip(), &self.interface)?; 41 self.socket 42 .join_multicast_v4(self.address.ip(), &self.interface)?; 43 self.status = RelayStatus::Connected; 44 Ok(()) 45 } 46 47 pub fn should_rejoin(&self) -> bool { 48 (Instant::now() - self.last_join) >= Duration::from_secs(200) 49 } 50 51 pub fn try_recv(&self) -> Option<WsEvent> { 52 let mut buffer = [0u8; 65535]; 53 // Read the size header 54 match self.socket.recv_from(&mut buffer) { 55 Ok((size, src)) => { 56 let parsed_size = u32::from_be_bytes(buffer[0..4].try_into().ok()?) as usize; 57 debug!("multicast: read size {} from start of header", size - 4); 58 59 if size != parsed_size + 4 { 60 error!( 61 "multicast: partial data received: expected {}, got {}", 62 parsed_size, size 63 ); 64 return None; 65 } 66 67 let text = String::from_utf8_lossy(&buffer[4..size]); 68 debug!("multicast: received {} bytes from {}: {}", size, src, &text); 69 Some(WsEvent::Message(WsMessage::Text(text.to_string()))) 70 } 71 Err(e) if e.kind() == io::ErrorKind::WouldBlock => { 72 // No data available, continue 73 None 74 } 75 Err(e) => { 76 error!("multicast: error receiving data: {}", e); 77 None 78 } 79 } 80 } 81 82 pub fn send(&self, msg: &EventClientMessage) -> Result<()> { 83 let json = msg.to_json(); 84 let len = json.len(); 85 86 debug!("writing to multicast relay"); 87 let mut buf: Vec<u8> = Vec::with_capacity(4 + len); 88 89 // Write the length of the message as 4 bytes (big-endian) 90 buf.extend_from_slice(&(len as u32).to_be_bytes()); 91 92 // Append the JSON message bytes 93 buf.extend_from_slice(json.as_bytes()); 94 95 self.socket.send_to(&buf, SocketAddr::V4(self.address))?; 96 Ok(()) 97 } 98 99 pub fn status(&self) -> RelayStatus { 100 self.status 101 } 102 } 103 104 pub fn setup_multicast_relay( 105 wakeup: impl Fn() + Send + Sync + Clone + 'static, 106 ) -> Result<MulticastRelay> { 107 use mio::{Events, Interest, Poll, Token}; 108 109 let port = 9797; 110 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port); 111 let multicast_ip = Ipv4Addr::new(239, 19, 88, 1); 112 113 let mut socket = UdpSocket::bind(address)?; 114 let interface = Ipv4Addr::UNSPECIFIED; 115 let multicast_address = SocketAddrV4::new(multicast_ip, port); 116 117 socket.join_multicast_v4(&multicast_ip, &interface)?; 118 119 let mut poll = Poll::new()?; 120 poll.registry().register( 121 &mut socket, 122 Token(0), 123 Interest::READABLE | Interest::WRITABLE, 124 )?; 125 126 // wakeup our render thread when we have new stuff on the socket 127 std::thread::spawn(move || { 128 let mut events = Events::with_capacity(1); 129 loop { 130 if let Err(err) = poll.poll(&mut events, None) { 131 error!("multicast socket poll error: {err}. ending multicast poller."); 132 return; 133 } 134 wakeup(); 135 136 std::thread::yield_now(); 137 } 138 }); 139 140 Ok(MulticastRelay::new(multicast_address, socket, interface)) 141 } 142 /// MulticastRelayCache lazily initializes the multicast connection and buffers 143 /// outbound events until a connection is available. 144 #[derive(Default)] 145 pub struct MulticastRelayCache { 146 multicast: Option<MulticastRelay>, 147 cache: BroadcastCache, 148 } 149 150 impl MulticastRelayCache { 151 pub fn is_setup(&self) -> bool { 152 self.multicast.is_some() 153 } 154 155 pub fn try_setup<W>(&mut self, wakeup: &W) 156 where 157 W: Wakeup, 158 { 159 let wake = wakeup.clone(); 160 let Ok(multicast) = setup_multicast_relay(move || wake.wake()) else { 161 return; 162 }; 163 164 self.multicast = Some(multicast); 165 } 166 167 pub fn broadcast(&mut self, msg: EventClientMessage) { 168 BroadcastRelay::multicast(self.multicast.as_mut(), &mut self.cache).broadcast(msg); 169 } 170 171 #[profiling::function] 172 pub fn try_recv<F>(&mut self, mut process: F) 173 where 174 for<'a> F: FnMut(RawEventData<'a>), 175 { 176 let Some(multicast) = &mut self.multicast else { 177 return; 178 }; 179 180 if multicast.should_rejoin() { 181 if let Err(e) = multicast.rejoin() { 182 tracing::error!("multicast: rejoin error: {e}"); 183 } 184 } 185 186 BroadcastRelay::multicast(Some(multicast), &mut self.cache).try_flush_queue(); 187 188 let Some(WsEvent::Message(WsMessage::Text(text))) = multicast.try_recv() else { 189 return; 190 }; 191 192 process(RawEventData { 193 url: "multicast", 194 event_json: &text, 195 relay_type: RelayImplType::Multicast, 196 }); 197 } 198 }