mod.rs (6502B)
1 use ewebsock::{Options, WsEvent, WsMessage, WsReceiver, WsSender}; 2 use mio::net::UdpSocket; 3 use std::io; 4 use std::net::IpAddr; 5 use std::net::{SocketAddr, SocketAddrV4}; 6 use std::time::{Duration, Instant}; 7 8 use crate::{ClientMessage, EventClientMessage, Result}; 9 use std::fmt; 10 use std::hash::{Hash, Hasher}; 11 use std::net::Ipv4Addr; 12 use tracing::{debug, error}; 13 14 pub mod message; 15 pub mod pool; 16 pub mod subs_debug; 17 18 #[derive(Debug, Copy, Clone)] 19 pub enum RelayStatus { 20 Connected, 21 Connecting, 22 Disconnected, 23 } 24 25 pub struct MulticastRelay { 26 last_join: Instant, 27 status: RelayStatus, 28 address: SocketAddrV4, 29 socket: UdpSocket, 30 interface: Ipv4Addr, 31 } 32 33 impl MulticastRelay { 34 pub fn new(address: SocketAddrV4, socket: UdpSocket, interface: Ipv4Addr) -> Self { 35 let last_join = Instant::now(); 36 let status = RelayStatus::Connected; 37 MulticastRelay { 38 status, 39 address, 40 socket, 41 interface, 42 last_join, 43 } 44 } 45 46 /// Multicast seems to fail every 260 seconds. We force a rejoin every 200 seconds or 47 /// so to ensure we are always in the group 48 pub fn rejoin(&mut self) -> Result<()> { 49 self.last_join = Instant::now(); 50 self.status = RelayStatus::Disconnected; 51 self.socket 52 .leave_multicast_v4(self.address.ip(), &self.interface)?; 53 self.socket 54 .join_multicast_v4(self.address.ip(), &self.interface)?; 55 self.status = RelayStatus::Connected; 56 Ok(()) 57 } 58 59 pub fn should_rejoin(&self) -> bool { 60 (Instant::now() - self.last_join) >= Duration::from_secs(200) 61 } 62 63 pub fn try_recv(&self) -> Option<WsEvent> { 64 let mut buffer = [0u8; 65535]; 65 // Read the size header 66 match self.socket.recv_from(&mut buffer) { 67 Ok((size, src)) => { 68 let parsed_size = u32::from_be_bytes(buffer[0..4].try_into().ok()?) as usize; 69 debug!("multicast: read size {} from start of header", size - 4); 70 71 if size != parsed_size + 4 { 72 error!( 73 "multicast: partial data received: expected {}, got {}", 74 parsed_size, size 75 ); 76 return None; 77 } 78 79 let text = String::from_utf8_lossy(&buffer[4..size]); 80 debug!("multicast: received {} bytes from {}: {}", size, src, &text); 81 Some(WsEvent::Message(WsMessage::Text(text.to_string()))) 82 } 83 Err(e) if e.kind() == io::ErrorKind::WouldBlock => { 84 // No data available, continue 85 None 86 } 87 Err(e) => { 88 error!("multicast: error receiving data: {}", e); 89 None 90 } 91 } 92 } 93 94 pub fn send(&self, msg: &EventClientMessage) -> Result<()> { 95 let json = msg.to_json(); 96 let len = json.len(); 97 98 debug!("writing to multicast relay"); 99 let mut buf: Vec<u8> = Vec::with_capacity(4 + len); 100 101 // Write the length of the message as 4 bytes (big-endian) 102 buf.extend_from_slice(&(len as u32).to_be_bytes()); 103 104 // Append the JSON message bytes 105 buf.extend_from_slice(json.as_bytes()); 106 107 self.socket.send_to(&buf, SocketAddr::V4(self.address))?; 108 Ok(()) 109 } 110 } 111 112 pub fn setup_multicast_relay( 113 wakeup: impl Fn() + Send + Sync + Clone + 'static, 114 ) -> Result<MulticastRelay> { 115 use mio::{Events, Interest, Poll, Token}; 116 117 let port = 9797; 118 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port); 119 let multicast_ip = Ipv4Addr::new(239, 19, 88, 1); 120 121 let mut socket = UdpSocket::bind(address)?; 122 let interface = Ipv4Addr::UNSPECIFIED; 123 let multicast_address = SocketAddrV4::new(multicast_ip, port); 124 125 socket.join_multicast_v4(&multicast_ip, &interface)?; 126 127 let mut poll = Poll::new()?; 128 poll.registry().register( 129 &mut socket, 130 Token(0), 131 Interest::READABLE | Interest::WRITABLE, 132 )?; 133 134 // wakeup our render thread when we have new stuff on the socket 135 std::thread::spawn(move || { 136 let mut events = Events::with_capacity(1); 137 loop { 138 if let Err(err) = poll.poll(&mut events, Some(Duration::from_millis(100))) { 139 error!("multicast socket poll error: {err}. ending multicast poller."); 140 return; 141 } 142 wakeup(); 143 144 std::thread::yield_now(); 145 } 146 }); 147 148 Ok(MulticastRelay::new(multicast_address, socket, interface)) 149 } 150 151 pub struct Relay { 152 pub url: String, 153 pub status: RelayStatus, 154 pub sender: WsSender, 155 pub receiver: WsReceiver, 156 } 157 158 impl fmt::Debug for Relay { 159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 160 f.debug_struct("Relay") 161 .field("url", &self.url) 162 .field("status", &self.status) 163 .finish() 164 } 165 } 166 167 impl Hash for Relay { 168 fn hash<H: Hasher>(&self, state: &mut H) { 169 // Hashes the Relay by hashing the URL 170 self.url.hash(state); 171 } 172 } 173 174 impl PartialEq for Relay { 175 fn eq(&self, other: &Self) -> bool { 176 self.url == other.url 177 } 178 } 179 180 impl Eq for Relay {} 181 182 impl Relay { 183 pub fn new(url: String, wakeup: impl Fn() + Send + Sync + 'static) -> Result<Self> { 184 let status = RelayStatus::Connecting; 185 let (sender, receiver) = ewebsock::connect_with_wakeup(&url, Options::default(), wakeup)?; 186 187 Ok(Self { 188 url, 189 sender, 190 receiver, 191 status, 192 }) 193 } 194 195 pub fn send(&mut self, msg: &ClientMessage) { 196 let json = match msg.to_json() { 197 Ok(json) => { 198 debug!("sending {} to {}", json, self.url); 199 json 200 } 201 Err(e) => { 202 error!("error serializing json for filter: {e}"); 203 return; 204 } 205 }; 206 207 let txt = WsMessage::Text(json); 208 self.sender.send(txt); 209 } 210 211 pub fn connect(&mut self, wakeup: impl Fn() + Send + Sync + 'static) -> Result<()> { 212 let (sender, receiver) = 213 ewebsock::connect_with_wakeup(&self.url, Options::default(), wakeup)?; 214 self.status = RelayStatus::Connecting; 215 self.sender = sender; 216 self.receiver = receiver; 217 Ok(()) 218 } 219 220 pub fn ping(&mut self) { 221 let msg = WsMessage::Ping(vec![]); 222 self.sender.send(msg); 223 } 224 }