subs_debug.rs (8258B)
1 use std::{collections::HashMap, mem, time::SystemTime}; 2 3 use ewebsock::WsMessage; 4 use nostrdb::Filter; 5 6 use crate::{ClientMessage, Error, RelayEvent, RelayMessage}; 7 8 use super::message::calculate_command_result_size; 9 10 type RelayId = String; 11 type SubId = String; 12 13 pub struct SubsDebug { 14 data: HashMap<RelayId, RelayStats>, 15 time_incd: SystemTime, 16 pub relay_events_selection: Option<RelayId>, 17 } 18 19 #[derive(Default)] 20 pub struct RelayStats { 21 pub count: TransferStats, 22 pub events: Vec<RelayLogEvent>, 23 pub sub_data: HashMap<SubId, SubStats>, 24 } 25 26 #[derive(Clone)] 27 pub enum RelayLogEvent { 28 Send(ClientMessage), 29 Recieve(OwnedRelayEvent), 30 } 31 32 #[derive(Clone)] 33 pub enum OwnedRelayEvent { 34 Opened, 35 Closed, 36 Other(String), 37 Error(String), 38 Message(String), 39 } 40 41 impl From<RelayEvent<'_>> for OwnedRelayEvent { 42 fn from(value: RelayEvent<'_>) -> Self { 43 match value { 44 RelayEvent::Opened => OwnedRelayEvent::Opened, 45 RelayEvent::Closed => OwnedRelayEvent::Closed, 46 RelayEvent::Other(ws_message) => { 47 let ws_str = match ws_message { 48 WsMessage::Binary(_) => "Binary".to_owned(), 49 WsMessage::Text(t) => format!("Text:{t}"), 50 WsMessage::Unknown(u) => format!("Unknown:{u}"), 51 WsMessage::Ping(_) => "Ping".to_owned(), 52 WsMessage::Pong(_) => "Pong".to_owned(), 53 }; 54 OwnedRelayEvent::Other(ws_str) 55 } 56 RelayEvent::Error(error) => OwnedRelayEvent::Error(error.to_string()), 57 RelayEvent::Message(relay_message) => { 58 let relay_msg = match relay_message { 59 RelayMessage::OK(_) => "OK".to_owned(), 60 RelayMessage::Eose(s) => format!("EOSE:{s}"), 61 RelayMessage::Event(_, s) => format!("EVENT:{s}"), 62 RelayMessage::Notice(s) => format!("NOTICE:{s}"), 63 RelayMessage::Closed(sub_id, message) => { 64 format!("CLOSED:{sub_id}:{message}") 65 } 66 }; 67 OwnedRelayEvent::Message(relay_msg) 68 } 69 } 70 } 71 } 72 73 #[derive(PartialEq, Eq, Hash, Clone)] 74 pub struct _RelaySub { 75 pub(crate) subid: String, 76 pub(crate) filter: String, 77 } 78 79 #[derive(Default)] 80 pub struct SubStats { 81 pub filter: String, 82 pub count: TransferStats, 83 } 84 85 #[derive(Default)] 86 pub struct TransferStats { 87 pub up_total: usize, 88 pub down_total: usize, 89 90 // 1 sec < last tick < 2 sec 91 pub up_sec_prior: usize, 92 pub down_sec_prior: usize, 93 94 // < 1 sec since last tick 95 up_sec_cur: usize, 96 down_sec_cur: usize, 97 } 98 99 impl Default for SubsDebug { 100 fn default() -> Self { 101 Self { 102 data: Default::default(), 103 time_incd: SystemTime::now(), 104 relay_events_selection: None, 105 } 106 } 107 } 108 109 impl SubsDebug { 110 pub fn get_data(&self) -> &HashMap<RelayId, RelayStats> { 111 &self.data 112 } 113 114 pub(crate) fn send_cmd(&mut self, relay: String, cmd: &ClientMessage) { 115 let data = self.data.entry(relay).or_default(); 116 let msg_num_bytes = calculate_client_message_size(cmd); 117 match cmd { 118 ClientMessage::Req { sub_id, filters } => { 119 data.sub_data.insert( 120 sub_id.to_string(), 121 SubStats { 122 filter: filters_to_string(filters), 123 count: Default::default(), 124 }, 125 ); 126 } 127 128 ClientMessage::Close { sub_id } => { 129 data.sub_data.remove(sub_id); 130 } 131 132 _ => {} 133 } 134 135 data.count.up_sec_cur += msg_num_bytes; 136 137 data.events.push(RelayLogEvent::Send(cmd.clone())); 138 } 139 140 pub(crate) fn receive_cmd(&mut self, relay: String, cmd: RelayEvent) { 141 let data = self.data.entry(relay).or_default(); 142 let msg_num_bytes = calculate_relay_event_size(&cmd); 143 if let RelayEvent::Message(RelayMessage::Event(sid, _)) = cmd { 144 if let Some(sub_data) = data.sub_data.get_mut(sid) { 145 let c = &mut sub_data.count; 146 c.down_sec_cur += msg_num_bytes; 147 } 148 }; 149 150 data.count.down_sec_cur += msg_num_bytes; 151 152 data.events.push(RelayLogEvent::Recieve(cmd.into())); 153 } 154 155 pub fn try_increment_stats(&mut self) { 156 let cur_time = SystemTime::now(); 157 if let Ok(dur) = cur_time.duration_since(self.time_incd) { 158 if dur.as_secs() >= 1 { 159 self.time_incd = cur_time; 160 self.internal_inc_stats(); 161 } 162 } 163 } 164 165 fn internal_inc_stats(&mut self) { 166 for relay_data in self.data.values_mut() { 167 let c = &mut relay_data.count; 168 inc_data_count(c); 169 170 for sub in relay_data.sub_data.values_mut() { 171 inc_data_count(&mut sub.count); 172 } 173 } 174 } 175 } 176 177 fn inc_data_count(c: &mut TransferStats) { 178 c.up_total += c.up_sec_cur; 179 c.up_sec_prior = c.up_sec_cur; 180 181 c.down_total += c.down_sec_cur; 182 c.down_sec_prior = c.down_sec_cur; 183 184 c.up_sec_cur = 0; 185 c.down_sec_cur = 0; 186 } 187 188 fn calculate_client_message_size(message: &ClientMessage) -> usize { 189 match message { 190 ClientMessage::Event(note) => note.note_json.len() + 10, // 10 is ["EVENT",] 191 ClientMessage::Req { sub_id, filters } => { 192 mem::size_of_val(message) 193 + mem::size_of_val(sub_id) 194 + sub_id.len() 195 + filters.iter().map(mem::size_of_val).sum::<usize>() 196 } 197 ClientMessage::Close { sub_id } => { 198 mem::size_of_val(message) + mem::size_of_val(sub_id) + sub_id.len() 199 } 200 ClientMessage::Raw(data) => mem::size_of_val(message) + data.len(), 201 } 202 } 203 204 fn calculate_relay_event_size(event: &RelayEvent<'_>) -> usize { 205 let base_size = mem::size_of_val(event); // Size of the enum on the stack 206 207 let variant_size = match event { 208 RelayEvent::Opened | RelayEvent::Closed => 0, // No additional data 209 RelayEvent::Other(ws_message) => calculate_ws_message_size(ws_message), 210 RelayEvent::Error(error) => calculate_error_size(error), 211 RelayEvent::Message(message) => calculate_relay_message_size(message), 212 }; 213 214 base_size + variant_size 215 } 216 217 fn calculate_ws_message_size(message: &WsMessage) -> usize { 218 match message { 219 WsMessage::Binary(vec) | WsMessage::Ping(vec) | WsMessage::Pong(vec) => { 220 mem::size_of_val(message) + vec.len() 221 } 222 WsMessage::Text(string) | WsMessage::Unknown(string) => { 223 mem::size_of_val(message) + string.len() 224 } 225 } 226 } 227 228 fn calculate_error_size(error: &Error) -> usize { 229 match error { 230 Error::Empty 231 | Error::HexDecodeFailed 232 | Error::InvalidBech32 233 | Error::InvalidByteSize 234 | Error::InvalidSignature 235 | Error::InvalidRelayUrl 236 | Error::Io(_) 237 | Error::InvalidPublicKey => mem::size_of_val(error), // No heap usage 238 239 Error::DecodeFailed(string) => mem::size_of_val(error) + string.len(), 240 241 Error::Json(json_err) => mem::size_of_val(error) + json_err.to_string().len(), 242 243 Error::Nostrdb(nostrdb_err) => mem::size_of_val(error) + nostrdb_err.to_string().len(), 244 245 Error::Generic(string) => mem::size_of_val(error) + string.len(), 246 } 247 } 248 249 fn calculate_relay_message_size(message: &RelayMessage) -> usize { 250 match message { 251 RelayMessage::OK(result) => calculate_command_result_size(result), 252 RelayMessage::Eose(str_ref) 253 | RelayMessage::Event(str_ref, _) 254 | RelayMessage::Notice(str_ref) => mem::size_of_val(message) + str_ref.len(), 255 RelayMessage::Closed(sub_id, reason) => { 256 mem::size_of_val(message) + sub_id.len() + reason.len() 257 } 258 } 259 } 260 261 fn filters_to_string(f: &Vec<Filter>) -> String { 262 let mut cur_str = String::new(); 263 for filter in f { 264 if let Ok(json) = filter.json() { 265 if !cur_str.is_empty() { 266 cur_str.push_str(", "); 267 } 268 cur_str.push_str(&json); 269 } 270 } 271 272 cur_str 273 }