notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

mod.rs (6502B)


      1 use ewebsock::{Options, WsEvent, WsMessage, WsReceiver, WsSender};
      2 use mio::net::UdpSocket;
      3 use std::io;
      4 use std::net::IpAddr;
      5 use std::net::{SocketAddr, SocketAddrV4};
      6 use std::time::{Duration, Instant};
      7 
      8 use crate::{ClientMessage, EventClientMessage, Result};
      9 use std::fmt;
     10 use std::hash::{Hash, Hasher};
     11 use std::net::Ipv4Addr;
     12 use tracing::{debug, error};
     13 
     14 pub mod message;
     15 pub mod pool;
     16 pub mod subs_debug;
     17 
     18 #[derive(Debug, Copy, Clone)]
     19 pub enum RelayStatus {
     20     Connected,
     21     Connecting,
     22     Disconnected,
     23 }
     24 
     25 pub struct MulticastRelay {
     26     last_join: Instant,
     27     status: RelayStatus,
     28     address: SocketAddrV4,
     29     socket: UdpSocket,
     30     interface: Ipv4Addr,
     31 }
     32 
     33 impl MulticastRelay {
     34     pub fn new(address: SocketAddrV4, socket: UdpSocket, interface: Ipv4Addr) -> Self {
     35         let last_join = Instant::now();
     36         let status = RelayStatus::Connected;
     37         MulticastRelay {
     38             status,
     39             address,
     40             socket,
     41             interface,
     42             last_join,
     43         }
     44     }
     45 
     46     /// Multicast seems to fail every 260 seconds. We force a rejoin every 200 seconds or
     47     /// so to ensure we are always in the group
     48     pub fn rejoin(&mut self) -> Result<()> {
     49         self.last_join = Instant::now();
     50         self.status = RelayStatus::Disconnected;
     51         self.socket
     52             .leave_multicast_v4(self.address.ip(), &self.interface)?;
     53         self.socket
     54             .join_multicast_v4(self.address.ip(), &self.interface)?;
     55         self.status = RelayStatus::Connected;
     56         Ok(())
     57     }
     58 
     59     pub fn should_rejoin(&self) -> bool {
     60         (Instant::now() - self.last_join) >= Duration::from_secs(200)
     61     }
     62 
     63     pub fn try_recv(&self) -> Option<WsEvent> {
     64         let mut buffer = [0u8; 65535];
     65         // Read the size header
     66         match self.socket.recv_from(&mut buffer) {
     67             Ok((size, src)) => {
     68                 let parsed_size = u32::from_be_bytes(buffer[0..4].try_into().ok()?) as usize;
     69                 debug!("multicast: read size {} from start of header", size - 4);
     70 
     71                 if size != parsed_size + 4 {
     72                     error!(
     73                         "multicast: partial data received: expected {}, got {}",
     74                         parsed_size, size
     75                     );
     76                     return None;
     77                 }
     78 
     79                 let text = String::from_utf8_lossy(&buffer[4..size]);
     80                 debug!("multicast: received {} bytes from {}: {}", size, src, &text);
     81                 Some(WsEvent::Message(WsMessage::Text(text.to_string())))
     82             }
     83             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
     84                 // No data available, continue
     85                 None
     86             }
     87             Err(e) => {
     88                 error!("multicast: error receiving data: {}", e);
     89                 None
     90             }
     91         }
     92     }
     93 
     94     pub fn send(&self, msg: &EventClientMessage) -> Result<()> {
     95         let json = msg.to_json();
     96         let len = json.len();
     97 
     98         debug!("writing to multicast relay");
     99         let mut buf: Vec<u8> = Vec::with_capacity(4 + len);
    100 
    101         // Write the length of the message as 4 bytes (big-endian)
    102         buf.extend_from_slice(&(len as u32).to_be_bytes());
    103 
    104         // Append the JSON message bytes
    105         buf.extend_from_slice(json.as_bytes());
    106 
    107         self.socket.send_to(&buf, SocketAddr::V4(self.address))?;
    108         Ok(())
    109     }
    110 }
    111 
    112 pub fn setup_multicast_relay(
    113     wakeup: impl Fn() + Send + Sync + Clone + 'static,
    114 ) -> Result<MulticastRelay> {
    115     use mio::{Events, Interest, Poll, Token};
    116 
    117     let port = 9797;
    118     let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
    119     let multicast_ip = Ipv4Addr::new(239, 19, 88, 1);
    120 
    121     let mut socket = UdpSocket::bind(address)?;
    122     let interface = Ipv4Addr::UNSPECIFIED;
    123     let multicast_address = SocketAddrV4::new(multicast_ip, port);
    124 
    125     socket.join_multicast_v4(&multicast_ip, &interface)?;
    126 
    127     let mut poll = Poll::new()?;
    128     poll.registry().register(
    129         &mut socket,
    130         Token(0),
    131         Interest::READABLE | Interest::WRITABLE,
    132     )?;
    133 
    134     // wakeup our render thread when we have new stuff on the socket
    135     std::thread::spawn(move || {
    136         let mut events = Events::with_capacity(1);
    137         loop {
    138             if let Err(err) = poll.poll(&mut events, Some(Duration::from_millis(100))) {
    139                 error!("multicast socket poll error: {err}. ending multicast poller.");
    140                 return;
    141             }
    142             wakeup();
    143 
    144             std::thread::yield_now();
    145         }
    146     });
    147 
    148     Ok(MulticastRelay::new(multicast_address, socket, interface))
    149 }
    150 
    151 pub struct Relay {
    152     pub url: String,
    153     pub status: RelayStatus,
    154     pub sender: WsSender,
    155     pub receiver: WsReceiver,
    156 }
    157 
    158 impl fmt::Debug for Relay {
    159     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    160         f.debug_struct("Relay")
    161             .field("url", &self.url)
    162             .field("status", &self.status)
    163             .finish()
    164     }
    165 }
    166 
    167 impl Hash for Relay {
    168     fn hash<H: Hasher>(&self, state: &mut H) {
    169         // Hashes the Relay by hashing the URL
    170         self.url.hash(state);
    171     }
    172 }
    173 
    174 impl PartialEq for Relay {
    175     fn eq(&self, other: &Self) -> bool {
    176         self.url == other.url
    177     }
    178 }
    179 
    180 impl Eq for Relay {}
    181 
    182 impl Relay {
    183     pub fn new(url: String, wakeup: impl Fn() + Send + Sync + 'static) -> Result<Self> {
    184         let status = RelayStatus::Connecting;
    185         let (sender, receiver) = ewebsock::connect_with_wakeup(&url, Options::default(), wakeup)?;
    186 
    187         Ok(Self {
    188             url,
    189             sender,
    190             receiver,
    191             status,
    192         })
    193     }
    194 
    195     pub fn send(&mut self, msg: &ClientMessage) {
    196         let json = match msg.to_json() {
    197             Ok(json) => {
    198                 debug!("sending {} to {}", json, self.url);
    199                 json
    200             }
    201             Err(e) => {
    202                 error!("error serializing json for filter: {e}");
    203                 return;
    204             }
    205         };
    206 
    207         let txt = WsMessage::Text(json);
    208         self.sender.send(txt);
    209     }
    210 
    211     pub fn connect(&mut self, wakeup: impl Fn() + Send + Sync + 'static) -> Result<()> {
    212         let (sender, receiver) =
    213             ewebsock::connect_with_wakeup(&self.url, Options::default(), wakeup)?;
    214         self.status = RelayStatus::Connecting;
    215         self.sender = sender;
    216         self.receiver = receiver;
    217         Ok(())
    218     }
    219 
    220     pub fn ping(&mut self) {
    221         let msg = WsMessage::Ping(vec![]);
    222         self.sender.send(msg);
    223     }
    224 }