subs_debug.rs (7910B)
1 use std::{collections::HashMap, mem, time::SystemTime}; 2 3 use ewebsock::WsMessage; 4 use nostrdb::Filter; 5 6 use crate::{ClientMessage, Error, RelayEvent, RelayMessage}; 7 8 use super::message::calculate_command_result_size; 9 10 type RelayId = String; 11 type SubId = String; 12 13 pub struct SubsDebug { 14 data: HashMap<RelayId, RelayStats>, 15 time_incd: SystemTime, 16 pub relay_events_selection: Option<RelayId>, 17 } 18 19 #[derive(Default)] 20 pub struct RelayStats { 21 pub count: TransferStats, 22 pub events: Vec<RelayLogEvent>, 23 pub sub_data: HashMap<SubId, SubStats>, 24 } 25 26 #[derive(Clone)] 27 pub enum RelayLogEvent { 28 Send(ClientMessage), 29 Recieve(OwnedRelayEvent), 30 } 31 32 #[derive(Clone)] 33 pub enum OwnedRelayEvent { 34 Opened, 35 Closed, 36 Other(String), 37 Error(String), 38 Message(String), 39 } 40 41 impl From<RelayEvent<'_>> for OwnedRelayEvent { 42 fn from(value: RelayEvent<'_>) -> Self { 43 match value { 44 RelayEvent::Opened => OwnedRelayEvent::Opened, 45 RelayEvent::Closed => OwnedRelayEvent::Closed, 46 RelayEvent::Other(ws_message) => { 47 let ws_str = match ws_message { 48 WsMessage::Binary(_) => "Binary".to_owned(), 49 WsMessage::Text(t) => format!("Text:{}", t), 50 WsMessage::Unknown(u) => format!("Unknown:{}", u), 51 WsMessage::Ping(_) => "Ping".to_owned(), 52 WsMessage::Pong(_) => "Pong".to_owned(), 53 }; 54 OwnedRelayEvent::Other(ws_str) 55 } 56 RelayEvent::Error(error) => OwnedRelayEvent::Error(error.to_string()), 57 RelayEvent::Message(relay_message) => { 58 let relay_msg = match relay_message { 59 RelayMessage::OK(_) => "OK".to_owned(), 60 RelayMessage::Eose(s) => format!("EOSE:{}", s), 61 RelayMessage::Event(_, s) => format!("EVENT:{}", s), 62 RelayMessage::Notice(s) => format!("NOTICE:{}", s), 63 }; 64 OwnedRelayEvent::Message(relay_msg) 65 } 66 } 67 } 68 } 69 70 #[derive(PartialEq, Eq, Hash, Clone)] 71 pub struct RelaySub { 72 pub(crate) subid: String, 73 pub(crate) filter: String, 74 } 75 76 #[derive(Default)] 77 pub struct SubStats { 78 pub filter: String, 79 pub count: TransferStats, 80 } 81 82 #[derive(Default)] 83 pub struct TransferStats { 84 pub up_total: usize, 85 pub down_total: usize, 86 87 // 1 sec < last tick < 2 sec 88 pub up_sec_prior: usize, 89 pub down_sec_prior: usize, 90 91 // < 1 sec since last tick 92 up_sec_cur: usize, 93 down_sec_cur: usize, 94 } 95 96 impl Default for SubsDebug { 97 fn default() -> Self { 98 Self { 99 data: Default::default(), 100 time_incd: SystemTime::now(), 101 relay_events_selection: None, 102 } 103 } 104 } 105 106 impl SubsDebug { 107 pub fn get_data(&self) -> &HashMap<RelayId, RelayStats> { 108 &self.data 109 } 110 111 pub(crate) fn send_cmd(&mut self, relay: String, cmd: &ClientMessage) { 112 let data = self.data.entry(relay).or_default(); 113 let msg_num_bytes = calculate_client_message_size(cmd); 114 match cmd { 115 ClientMessage::Req { sub_id, filters } => { 116 data.sub_data.insert( 117 sub_id.to_string(), 118 SubStats { 119 filter: filters_to_string(filters), 120 count: Default::default(), 121 }, 122 ); 123 } 124 125 ClientMessage::Close { sub_id } => { 126 data.sub_data.remove(sub_id); 127 } 128 129 _ => {} 130 } 131 132 data.count.up_sec_cur += msg_num_bytes; 133 134 data.events.push(RelayLogEvent::Send(cmd.clone())); 135 } 136 137 pub(crate) fn receive_cmd(&mut self, relay: String, cmd: RelayEvent) { 138 let data = self.data.entry(relay).or_default(); 139 let msg_num_bytes = calculate_relay_event_size(&cmd); 140 if let RelayEvent::Message(RelayMessage::Event(sid, _)) = cmd { 141 if let Some(sub_data) = data.sub_data.get_mut(sid) { 142 let c = &mut sub_data.count; 143 c.down_sec_cur += msg_num_bytes; 144 } 145 }; 146 147 data.count.down_sec_cur += msg_num_bytes; 148 149 data.events.push(RelayLogEvent::Recieve(cmd.into())); 150 } 151 152 pub fn try_increment_stats(&mut self) { 153 let cur_time = SystemTime::now(); 154 if let Ok(dur) = cur_time.duration_since(self.time_incd) { 155 if dur.as_secs() >= 1 { 156 self.time_incd = cur_time; 157 self.internal_inc_stats(); 158 } 159 } 160 } 161 162 fn internal_inc_stats(&mut self) { 163 for relay_data in self.data.values_mut() { 164 let c = &mut relay_data.count; 165 inc_data_count(c); 166 167 for sub in relay_data.sub_data.values_mut() { 168 inc_data_count(&mut sub.count); 169 } 170 } 171 } 172 } 173 174 fn inc_data_count(c: &mut TransferStats) { 175 c.up_total += c.up_sec_cur; 176 c.up_sec_prior = c.up_sec_cur; 177 178 c.down_total += c.down_sec_cur; 179 c.down_sec_prior = c.down_sec_cur; 180 181 c.up_sec_cur = 0; 182 c.down_sec_cur = 0; 183 } 184 185 fn calculate_client_message_size(message: &ClientMessage) -> usize { 186 match message { 187 ClientMessage::Event(note) => note.note_json.len() + 10, // 10 is ["EVENT",] 188 ClientMessage::Req { sub_id, filters } => { 189 mem::size_of_val(message) 190 + mem::size_of_val(sub_id) 191 + sub_id.len() 192 + filters.iter().map(mem::size_of_val).sum::<usize>() 193 } 194 ClientMessage::Close { sub_id } => { 195 mem::size_of_val(message) + mem::size_of_val(sub_id) + sub_id.len() 196 } 197 ClientMessage::Raw(data) => mem::size_of_val(message) + data.len(), 198 } 199 } 200 201 fn calculate_relay_event_size(event: &RelayEvent<'_>) -> usize { 202 let base_size = mem::size_of_val(event); // Size of the enum on the stack 203 204 let variant_size = match event { 205 RelayEvent::Opened | RelayEvent::Closed => 0, // No additional data 206 RelayEvent::Other(ws_message) => calculate_ws_message_size(ws_message), 207 RelayEvent::Error(error) => calculate_error_size(error), 208 RelayEvent::Message(message) => calculate_relay_message_size(message), 209 }; 210 211 base_size + variant_size 212 } 213 214 fn calculate_ws_message_size(message: &WsMessage) -> usize { 215 match message { 216 WsMessage::Binary(vec) | WsMessage::Ping(vec) | WsMessage::Pong(vec) => { 217 mem::size_of_val(message) + vec.len() 218 } 219 WsMessage::Text(string) | WsMessage::Unknown(string) => { 220 mem::size_of_val(message) + string.len() 221 } 222 } 223 } 224 225 fn calculate_error_size(error: &Error) -> usize { 226 match error { 227 Error::Empty 228 | Error::DecodeFailed 229 | Error::HexDecodeFailed 230 | Error::InvalidBech32 231 | Error::InvalidByteSize 232 | Error::InvalidSignature 233 | Error::Io(_) 234 | Error::InvalidPublicKey => mem::size_of_val(error), // No heap usage 235 236 Error::Json(json_err) => mem::size_of_val(error) + json_err.to_string().len(), 237 238 Error::Nostrdb(nostrdb_err) => mem::size_of_val(error) + nostrdb_err.to_string().len(), 239 240 Error::Generic(string) => mem::size_of_val(error) + string.len(), 241 } 242 } 243 244 fn calculate_relay_message_size(message: &RelayMessage) -> usize { 245 match message { 246 RelayMessage::OK(result) => calculate_command_result_size(result), 247 RelayMessage::Eose(str_ref) 248 | RelayMessage::Event(str_ref, _) 249 | RelayMessage::Notice(str_ref) => mem::size_of_val(message) + str_ref.len(), 250 } 251 } 252 253 fn filters_to_string(f: &Vec<Filter>) -> String { 254 let mut cur_str = String::new(); 255 for filter in f { 256 if let Ok(json) = filter.json() { 257 if !cur_str.is_empty() { 258 cur_str.push_str(", "); 259 } 260 cur_str.push_str(&json); 261 } 262 } 263 264 cur_str 265 }