conn.rs (3311B)
1 //! Client connection state 2 use crate::close::Close; 3 use crate::error::Error; 4 use crate::error::Result; 5 6 use crate::subscription::Subscription; 7 use std::collections::HashMap; 8 use tracing::{debug, info}; 9 use uuid::Uuid; 10 11 /// A subscription identifier has a maximum length 12 const MAX_SUBSCRIPTION_ID_LEN: usize = 256; 13 14 /// State for a client connection 15 pub struct ClientConn { 16 /// Client IP (either from socket, or configured proxy header 17 client_ip: String, 18 /// Unique client identifier generated at connection time 19 client_id: Uuid, 20 /// The current set of active client subscriptions 21 subscriptions: HashMap<String, Subscription>, 22 /// Per-connection maximum concurrent subscriptions 23 max_subs: usize, 24 } 25 26 impl Default for ClientConn { 27 fn default() -> Self { 28 Self::new("unknown".to_owned()) 29 } 30 } 31 32 impl ClientConn { 33 /// Create a new, empty connection state. 34 #[must_use] 35 pub fn new(client_ip: String) -> Self { 36 let client_id = Uuid::new_v4(); 37 ClientConn { 38 client_ip, 39 client_id, 40 subscriptions: HashMap::new(), 41 max_subs: 32, 42 } 43 } 44 45 pub fn subscriptions(&self) -> &HashMap<String, Subscription> { 46 &self.subscriptions 47 } 48 49 /// Get a short prefix of the client's unique identifier, suitable 50 /// for logging. 51 #[must_use] 52 pub fn get_client_prefix(&self) -> String { 53 self.client_id.to_string().chars().take(8).collect() 54 } 55 56 #[must_use] 57 pub fn ip(&self) -> &str { 58 &self.client_ip 59 } 60 61 /// Add a new subscription for this connection. 62 /// # Errors 63 /// 64 /// Will return `Err` if the client has too many subscriptions, or 65 /// if the provided name is excessively long. 66 pub fn subscribe(&mut self, s: Subscription) -> Result<()> { 67 let k = s.get_id(); 68 let sub_id_len = k.len(); 69 // prevent arbitrarily long subscription identifiers from 70 // being used. 71 if sub_id_len > MAX_SUBSCRIPTION_ID_LEN { 72 info!( 73 "ignoring sub request with excessive length: ({})", 74 sub_id_len 75 ); 76 return Err(Error::SubIdMaxLengthError); 77 } 78 // check if an existing subscription exists, and replace if so 79 if self.subscriptions.contains_key(&k) { 80 self.subscriptions.remove(&k); 81 self.subscriptions.insert(k, s); 82 debug!("replaced existing subscription"); 83 return Ok(()); 84 } 85 86 // check if there is room for another subscription. 87 if self.subscriptions.len() >= self.max_subs { 88 return Err(Error::SubMaxExceededError); 89 } 90 // add subscription 91 self.subscriptions.insert(k, s); 92 debug!( 93 "registered new subscription, currently have {} active subs", 94 self.subscriptions.len() 95 ); 96 Ok(()) 97 } 98 99 /// Remove the subscription for this connection. 100 pub fn unsubscribe(&mut self, c: &Close) { 101 // TODO: return notice if subscription did not exist. 102 self.subscriptions.remove(&c.id); 103 debug!( 104 "removed subscription, currently have {} active subs (cid={})", 105 self.subscriptions.len(), 106 self.get_client_prefix(), 107 ); 108 } 109 }