nostr-rs-relay

My dev fork of nostr-rs-relay
git clone git://jb55.com/nostr-rs-relay
Log | Files | Refs | README | LICENSE

commit 8ecce3f5660d3cee805b13f9667cd56d237c3ac7
parent caffbbbede0f2eb66dff40611935f2094e8dc206
Author: Greg Heartsfield <scsibug@imap.cc>
Date:   Wed,  2 Nov 2022 18:33:44 -0500

feat: show client IP in logs

Diffstat:
Msrc/conn.rs | 14+++++++++++---
Msrc/db.rs | 4++--
Msrc/main.rs | 2+-
Msrc/schema.rs | 2+-
Msrc/server.rs | 33++++++++++++++++++++-------------
5 files changed, 35 insertions(+), 20 deletions(-)

diff --git a/src/conn.rs b/src/conn.rs @@ -14,6 +14,8 @@ const MAX_SUBSCRIPTION_ID_LEN: usize = 256; /// State for a client connection pub struct ClientConn { + /// Client IP (either from socket, or configured proxy header + client_ip: String, /// Unique client identifier generated at connection time client_id: Uuid, /// The current set of active client subscriptions @@ -24,16 +26,17 @@ pub struct ClientConn { impl Default for ClientConn { fn default() -> Self { - Self::new() + Self::new("unknown".to_owned()) } } impl ClientConn { /// Create a new, empty connection state. #[must_use] - pub fn new() -> Self { + pub fn new(client_ip: String) -> Self { let client_id = Uuid::new_v4(); ClientConn { + client_ip, client_id, subscriptions: HashMap::new(), max_subs: 32, @@ -47,6 +50,11 @@ impl ClientConn { self.client_id.to_string().chars().take(8).collect() } + #[must_use] + pub fn ip(&self) -> &str { + &self.client_ip + } + /// Find all matching subscriptions. #[must_use] pub fn get_matching_subscriptions(&self, e: &Event) -> Vec<&str> { @@ -102,7 +110,7 @@ impl ClientConn { // TODO: return notice if subscription did not exist. self.subscriptions.remove(&c.id); debug!( - "removed subscription, currently have {} active subs (cid={})", + "removed subscription, currently have {} active subs (cid={:?})", self.subscriptions.len(), self.client_id ); diff --git a/src/db.rs b/src/db.rs @@ -316,7 +316,7 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> { if is_lower_hex(tagval) && (tagval.len() % 2 == 0) { tx.execute( "INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)", - params![ev_id, &tagname, hex::decode(&tagval).ok()], + params![ev_id, &tagname, hex::decode(tagval).ok()], )?; } else { tx.execute( @@ -510,7 +510,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) { let mut blob_vals: Vec<Box<dyn ToSql>> = vec![]; for v in val { if (v.len() % 2 == 0) && is_lower_hex(v) { - if let Ok(h) = hex::decode(&v) { + if let Ok(h) = hex::decode(v) { blob_vals.push(Box::new(h)); } } else { diff --git a/src/main.rs b/src/main.rs @@ -34,11 +34,11 @@ fn main() { // enable tracing with tokio-console ConsoleLayer::builder().with_default_env().init(); } - // update with database location if let Some(db) = db_dir { settings.database.data_directory = db; } + let (_, ctrl_rx): (MpscSender<()>, MpscReceiver<()>) = syncmpsc::channel(); // run this in a new thread let handle = thread::spawn(|| { diff --git a/src/schema.rs b/src/schema.rs @@ -332,7 +332,7 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> { if (tagval.len() % 2 == 0) && is_lower_hex(tagval) { tx.execute( "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", - params![event_id, tagname, hex::decode(&tagval).ok()], + params![event_id, tagname, hex::decode(tagval).ok()], )?; } else { // otherwise, insert as text diff --git a/src/server.rs b/src/server.rs @@ -84,11 +84,14 @@ async fn handle_web_request( Some(config), ) .await; - + // spawn server with info... but include IP here. + let remote_ip = remote_addr.ip().to_string(); tokio::spawn(nostr_server( - pool, settings, ws_stream, broadcast, event_tx, shutdown, + pool, remote_ip, settings, ws_stream, broadcast, event_tx, + shutdown, )); } + // todo: trace, don't print... Err(e) => println!( "error when trying to upgrade connection \ from address {} to websocket connection. \ @@ -167,7 +170,6 @@ async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) { info!("Shutting down webserver due to SIGTERM"); break; }, - } } } @@ -386,6 +388,7 @@ fn make_notice_message(msg: &str) -> Message { /// for all client communication. async fn nostr_server( pool: db::SqlitePool, + remote_ip: String, settings: Settings, mut ws_stream: WebSocketStream<Upgraded>, broadcast: Sender<Event>, @@ -395,7 +398,8 @@ async fn nostr_server( // get a broadcast channel for clients to communicate on let mut bcast_rx = broadcast.subscribe(); // Track internal client state - let mut conn = conn::ClientConn::new(); + let mut conn = conn::ClientConn::new(remote_ip); + // Use the remote IP as the client identifier let cid = conn.get_client_prefix(); // Create a channel for receiving query results from the database. // we will send out the tx handle to any query we generate. @@ -424,11 +428,11 @@ async fn nostr_server( // and how many it received from queries. let mut client_published_event_count: usize = 0; let mut client_received_event_count: usize = 0; - info!("new connection for client: {:?}", cid); + info!("new connection for client: {:?}, ip: {:?}", cid, conn.ip()); loop { tokio::select! { _ = shutdown.recv() => { - info!("Shutting client connection down due to shutdown: {:?}", cid); + info!("Shutting client connection down due to shutdown: {:?}, ip: {:?}", cid, conn.ip()); // server shutting down, exit loop break; }, @@ -507,17 +511,17 @@ async fn nostr_server( Err(WsError::AlreadyClosed | WsError::ConnectionClosed | WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake))) => { - debug!("websocket close from client: {:?}",cid); + debug!("websocket close from client: {:?}, ip: {:?}",cid, conn.ip()); break; }, Some(Err(WsError::Io(e))) => { // IO errors are considered fatal - warn!("IO error (client: {:?}): {:?}", cid, e); + warn!("IO error (client: {:?}, ip: {:?}): {:?}", cid, conn.ip(), e); break; } x => { // default condition on error is to close the client connection - info!("unknown error (client: {:?}): {:?} (closing conn)", cid, x); + info!("unknown error (client: {:?}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x); break; } }; @@ -546,7 +550,7 @@ async fn nostr_server( } }, Err(_) => { - info!("client {:?} sent an invalid event", cid); + info!("client: {:?} sent an invalid event", cid); ws_stream.send(make_notice_message("event was invalid")).await.ok(); } } @@ -592,7 +596,7 @@ async fn nostr_server( } }, Err(Error::ConnError) => { - debug!("got connection close/error, disconnecting client: {:?}",cid); + debug!("got connection close/error, disconnecting client: {:?}, ip: {:?}",cid, conn.ip()); break; } Err(Error::EventMaxLengthError(s)) => { @@ -615,7 +619,10 @@ async fn nostr_server( stop_tx.send(()).ok(); } info!( - "stopping connection for client: {:?} (client sent {} event(s), received {})", - cid, client_published_event_count, client_received_event_count + "stopping connection for client: {:?}, ip: {:?} (client sent {} event(s), received {})", + cid, + conn.ip(), + client_published_event_count, + client_received_event_count ); }