notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

subs_debug.rs (7910B)


      1 use std::{collections::HashMap, mem, time::SystemTime};
      2 
      3 use ewebsock::WsMessage;
      4 use nostrdb::Filter;
      5 
      6 use crate::{ClientMessage, Error, RelayEvent, RelayMessage};
      7 
      8 use super::message::calculate_command_result_size;
      9 
     10 type RelayId = String;
     11 type SubId = String;
     12 
     13 pub struct SubsDebug {
     14     data: HashMap<RelayId, RelayStats>,
     15     time_incd: SystemTime,
     16     pub relay_events_selection: Option<RelayId>,
     17 }
     18 
     19 #[derive(Default)]
     20 pub struct RelayStats {
     21     pub count: TransferStats,
     22     pub events: Vec<RelayLogEvent>,
     23     pub sub_data: HashMap<SubId, SubStats>,
     24 }
     25 
     26 #[derive(Clone)]
     27 pub enum RelayLogEvent {
     28     Send(ClientMessage),
     29     Recieve(OwnedRelayEvent),
     30 }
     31 
     32 #[derive(Clone)]
     33 pub enum OwnedRelayEvent {
     34     Opened,
     35     Closed,
     36     Other(String),
     37     Error(String),
     38     Message(String),
     39 }
     40 
     41 impl From<RelayEvent<'_>> for OwnedRelayEvent {
     42     fn from(value: RelayEvent<'_>) -> Self {
     43         match value {
     44             RelayEvent::Opened => OwnedRelayEvent::Opened,
     45             RelayEvent::Closed => OwnedRelayEvent::Closed,
     46             RelayEvent::Other(ws_message) => {
     47                 let ws_str = match ws_message {
     48                     WsMessage::Binary(_) => "Binary".to_owned(),
     49                     WsMessage::Text(t) => format!("Text:{}", t),
     50                     WsMessage::Unknown(u) => format!("Unknown:{}", u),
     51                     WsMessage::Ping(_) => "Ping".to_owned(),
     52                     WsMessage::Pong(_) => "Pong".to_owned(),
     53                 };
     54                 OwnedRelayEvent::Other(ws_str)
     55             }
     56             RelayEvent::Error(error) => OwnedRelayEvent::Error(error.to_string()),
     57             RelayEvent::Message(relay_message) => {
     58                 let relay_msg = match relay_message {
     59                     RelayMessage::OK(_) => "OK".to_owned(),
     60                     RelayMessage::Eose(s) => format!("EOSE:{}", s),
     61                     RelayMessage::Event(_, s) => format!("EVENT:{}", s),
     62                     RelayMessage::Notice(s) => format!("NOTICE:{}", s),
     63                 };
     64                 OwnedRelayEvent::Message(relay_msg)
     65             }
     66         }
     67     }
     68 }
     69 
     70 #[derive(PartialEq, Eq, Hash, Clone)]
     71 pub struct RelaySub {
     72     pub(crate) subid: String,
     73     pub(crate) filter: String,
     74 }
     75 
     76 #[derive(Default)]
     77 pub struct SubStats {
     78     pub filter: String,
     79     pub count: TransferStats,
     80 }
     81 
     82 #[derive(Default)]
     83 pub struct TransferStats {
     84     pub up_total: usize,
     85     pub down_total: usize,
     86 
     87     // 1 sec < last tick < 2 sec
     88     pub up_sec_prior: usize,
     89     pub down_sec_prior: usize,
     90 
     91     // < 1 sec since last tick
     92     up_sec_cur: usize,
     93     down_sec_cur: usize,
     94 }
     95 
     96 impl Default for SubsDebug {
     97     fn default() -> Self {
     98         Self {
     99             data: Default::default(),
    100             time_incd: SystemTime::now(),
    101             relay_events_selection: None,
    102         }
    103     }
    104 }
    105 
    106 impl SubsDebug {
    107     pub fn get_data(&self) -> &HashMap<RelayId, RelayStats> {
    108         &self.data
    109     }
    110 
    111     pub(crate) fn send_cmd(&mut self, relay: String, cmd: &ClientMessage) {
    112         let data = self.data.entry(relay).or_default();
    113         let msg_num_bytes = calculate_client_message_size(cmd);
    114         match cmd {
    115             ClientMessage::Req { sub_id, filters } => {
    116                 data.sub_data.insert(
    117                     sub_id.to_string(),
    118                     SubStats {
    119                         filter: filters_to_string(filters),
    120                         count: Default::default(),
    121                     },
    122                 );
    123             }
    124 
    125             ClientMessage::Close { sub_id } => {
    126                 data.sub_data.remove(sub_id);
    127             }
    128 
    129             _ => {}
    130         }
    131 
    132         data.count.up_sec_cur += msg_num_bytes;
    133 
    134         data.events.push(RelayLogEvent::Send(cmd.clone()));
    135     }
    136 
    137     pub(crate) fn receive_cmd(&mut self, relay: String, cmd: RelayEvent) {
    138         let data = self.data.entry(relay).or_default();
    139         let msg_num_bytes = calculate_relay_event_size(&cmd);
    140         if let RelayEvent::Message(RelayMessage::Event(sid, _)) = cmd {
    141             if let Some(sub_data) = data.sub_data.get_mut(sid) {
    142                 let c = &mut sub_data.count;
    143                 c.down_sec_cur += msg_num_bytes;
    144             }
    145         };
    146 
    147         data.count.down_sec_cur += msg_num_bytes;
    148 
    149         data.events.push(RelayLogEvent::Recieve(cmd.into()));
    150     }
    151 
    152     pub fn try_increment_stats(&mut self) {
    153         let cur_time = SystemTime::now();
    154         if let Ok(dur) = cur_time.duration_since(self.time_incd) {
    155             if dur.as_secs() >= 1 {
    156                 self.time_incd = cur_time;
    157                 self.internal_inc_stats();
    158             }
    159         }
    160     }
    161 
    162     fn internal_inc_stats(&mut self) {
    163         for relay_data in self.data.values_mut() {
    164             let c = &mut relay_data.count;
    165             inc_data_count(c);
    166 
    167             for sub in relay_data.sub_data.values_mut() {
    168                 inc_data_count(&mut sub.count);
    169             }
    170         }
    171     }
    172 }
    173 
    174 fn inc_data_count(c: &mut TransferStats) {
    175     c.up_total += c.up_sec_cur;
    176     c.up_sec_prior = c.up_sec_cur;
    177 
    178     c.down_total += c.down_sec_cur;
    179     c.down_sec_prior = c.down_sec_cur;
    180 
    181     c.up_sec_cur = 0;
    182     c.down_sec_cur = 0;
    183 }
    184 
    185 fn calculate_client_message_size(message: &ClientMessage) -> usize {
    186     match message {
    187         ClientMessage::Event(note) => note.note_json.len() + 10, // 10 is ["EVENT",]
    188         ClientMessage::Req { sub_id, filters } => {
    189             mem::size_of_val(message)
    190                 + mem::size_of_val(sub_id)
    191                 + sub_id.len()
    192                 + filters.iter().map(mem::size_of_val).sum::<usize>()
    193         }
    194         ClientMessage::Close { sub_id } => {
    195             mem::size_of_val(message) + mem::size_of_val(sub_id) + sub_id.len()
    196         }
    197         ClientMessage::Raw(data) => mem::size_of_val(message) + data.len(),
    198     }
    199 }
    200 
    201 fn calculate_relay_event_size(event: &RelayEvent<'_>) -> usize {
    202     let base_size = mem::size_of_val(event); // Size of the enum on the stack
    203 
    204     let variant_size = match event {
    205         RelayEvent::Opened | RelayEvent::Closed => 0, // No additional data
    206         RelayEvent::Other(ws_message) => calculate_ws_message_size(ws_message),
    207         RelayEvent::Error(error) => calculate_error_size(error),
    208         RelayEvent::Message(message) => calculate_relay_message_size(message),
    209     };
    210 
    211     base_size + variant_size
    212 }
    213 
    214 fn calculate_ws_message_size(message: &WsMessage) -> usize {
    215     match message {
    216         WsMessage::Binary(vec) | WsMessage::Ping(vec) | WsMessage::Pong(vec) => {
    217             mem::size_of_val(message) + vec.len()
    218         }
    219         WsMessage::Text(string) | WsMessage::Unknown(string) => {
    220             mem::size_of_val(message) + string.len()
    221         }
    222     }
    223 }
    224 
    225 fn calculate_error_size(error: &Error) -> usize {
    226     match error {
    227         Error::Empty
    228         | Error::DecodeFailed
    229         | Error::HexDecodeFailed
    230         | Error::InvalidBech32
    231         | Error::InvalidByteSize
    232         | Error::InvalidSignature
    233         | Error::Io(_)
    234         | Error::InvalidPublicKey => mem::size_of_val(error), // No heap usage
    235 
    236         Error::Json(json_err) => mem::size_of_val(error) + json_err.to_string().len(),
    237 
    238         Error::Nostrdb(nostrdb_err) => mem::size_of_val(error) + nostrdb_err.to_string().len(),
    239 
    240         Error::Generic(string) => mem::size_of_val(error) + string.len(),
    241     }
    242 }
    243 
    244 fn calculate_relay_message_size(message: &RelayMessage) -> usize {
    245     match message {
    246         RelayMessage::OK(result) => calculate_command_result_size(result),
    247         RelayMessage::Eose(str_ref)
    248         | RelayMessage::Event(str_ref, _)
    249         | RelayMessage::Notice(str_ref) => mem::size_of_val(message) + str_ref.len(),
    250     }
    251 }
    252 
    253 fn filters_to_string(f: &Vec<Filter>) -> String {
    254     let mut cur_str = String::new();
    255     for filter in f {
    256         if let Ok(json) = filter.json() {
    257             if !cur_str.is_empty() {
    258                 cur_str.push_str(", ");
    259             }
    260             cur_str.push_str(&json);
    261         }
    262     }
    263 
    264     cur_str
    265 }