server.rs (29938B)
1 //! Server process 2 use crate::close::Close; 3 use crate::close::CloseCmd; 4 use crate::config::{Settings, VerifiedUsersMode}; 5 use crate::conn; 6 use crate::db; 7 use crate::db::SubmittedEvent; 8 use crate::error::{Error, Result}; 9 use crate::event::Event; 10 use crate::event::EventCmd; 11 use crate::info::RelayInfo; 12 use crate::nip05; 13 use crate::subscription::Subscription; 14 use futures::SinkExt; 15 use futures::StreamExt; 16 use http::header::HeaderMap; 17 use hyper::header::ACCEPT; 18 use hyper::service::{make_service_fn, service_fn}; 19 use hyper::upgrade::Upgraded; 20 use hyper::{ 21 header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode, 22 }; 23 use serde::{Deserialize, Serialize}; 24 use serde_json::json; 25 use std::collections::HashMap; 26 use std::convert::Infallible; 27 use std::net::SocketAddr; 28 use std::path::Path; 29 use std::sync::mpsc::Receiver as MpscReceiver; 30 use std::time::Duration; 31 use std::time::Instant; 32 use tokio::runtime::Builder; 33 use tokio::sync::broadcast::{self, Receiver, Sender}; 34 use tokio::sync::mpsc; 35 use tokio::sync::oneshot; 36 use tokio_tungstenite::WebSocketStream; 37 use tracing::*; 38 use tungstenite::error::CapacityError::MessageTooLong; 39 use tungstenite::error::Error as WsError; 40 use tungstenite::handshake; 41 use tungstenite::protocol::Message; 42 use tungstenite::protocol::WebSocketConfig; 43 44 /// Handle arbitrary HTTP requests, including for WebSocket upgrades. 45 async fn handle_web_request( 46 mut request: Request<Body>, 47 pool: db::SqlitePool, 48 settings: Settings, 49 remote_addr: SocketAddr, 50 broadcast: Sender<Event>, 51 event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>, 52 shutdown: Receiver<()>, 53 ) -> Result<Response<Body>, Infallible> { 54 match ( 55 request.uri().path(), 56 request.headers().contains_key(header::UPGRADE), 57 ) { 58 // Request for / as websocket 59 ("/", true) => { 60 trace!("websocket with upgrade request"); 61 //assume request is a handshake, so create the handshake response 62 let response = match handshake::server::create_response_with_body(&request, || { 63 Body::empty() 64 }) { 65 Ok(response) => { 66 //in case the handshake response creation succeeds, 67 //spawn a task to handle the websocket connection 68 tokio::spawn(async move { 69 //using the hyper feature of upgrading a connection 70 match upgrade::on(&mut request).await { 71 //if successfully upgraded 72 Ok(upgraded) => { 73 // set WebSocket configuration options 74 let config = WebSocketConfig { 75 max_message_size: settings.limits.max_ws_message_bytes, 76 max_frame_size: settings.limits.max_ws_frame_bytes, 77 ..Default::default() 78 }; 79 //create a websocket stream from the upgraded object 80 let ws_stream = WebSocketStream::from_raw_socket( 81 //pass the upgraded object 82 //as the base layer stream of the Websocket 83 upgraded, 84 tokio_tungstenite::tungstenite::protocol::Role::Server, 85 Some(config), 86 ) 87 .await; 88 let user_agent = get_header_string("user-agent", request.headers()); 89 // determine the remote IP from headers if the exist 90 let header_ip = settings 91 .network 92 .remote_ip_header 93 .as_ref() 94 .and_then(|x| get_header_string(x, request.headers())); 95 // use the socket addr as a backup 96 let remote_ip = 97 header_ip.unwrap_or_else(|| remote_addr.ip().to_string()); 98 let client_info = ClientInfo { 99 remote_ip, 100 user_agent, 101 }; 102 // spawn a nostr server with our websocket 103 tokio::spawn(nostr_server( 104 pool, 105 client_info, 106 settings, 107 ws_stream, 108 broadcast, 109 event_tx, 110 shutdown, 111 )); 112 } 113 // todo: trace, don't print... 114 Err(e) => println!( 115 "error when trying to upgrade connection \ 116 from address {} to websocket connection. \ 117 Error is: {}", 118 remote_addr, e 119 ), 120 } 121 }); 122 //return the response to the handshake request 123 response 124 } 125 Err(error) => { 126 warn!("websocket response failed"); 127 let mut res = 128 Response::new(Body::from(format!("Failed to create websocket: {}", error))); 129 *res.status_mut() = StatusCode::BAD_REQUEST; 130 return Ok(res); 131 } 132 }; 133 Ok::<_, Infallible>(response) 134 } 135 // Request for Relay info 136 ("/", false) => { 137 // handle request at root with no upgrade header 138 // Check if this is a nostr server info request 139 let accept_header = &request.headers().get(ACCEPT); 140 // check if application/nostr+json is included 141 if let Some(media_types) = accept_header { 142 if let Ok(mt_str) = media_types.to_str() { 143 if mt_str.contains("application/nostr+json") { 144 // build a relay info response 145 debug!("Responding to server info request"); 146 let rinfo = RelayInfo::from(settings.info); 147 let b = Body::from(serde_json::to_string_pretty(&rinfo).unwrap()); 148 return Ok(Response::builder() 149 .status(200) 150 .header("Content-Type", "application/nostr+json") 151 .header("Access-Control-Allow-Origin", "*") 152 .body(b) 153 .unwrap()); 154 } 155 } 156 } 157 Ok(Response::builder() 158 .status(200) 159 .header("Content-Type", "text/plain") 160 .body(Body::from("Please use a Nostr client to connect.")) 161 .unwrap()) 162 } 163 (_, _) => { 164 //handle any other url 165 Ok(Response::builder() 166 .status(StatusCode::NOT_FOUND) 167 .body(Body::from("Nothing here.")) 168 .unwrap()) 169 } 170 } 171 } 172 173 fn get_header_string(header: &str, headers: &HeaderMap) -> Option<String> { 174 headers 175 .get(header) 176 .and_then(|x| x.to_str().ok().map(|x| x.to_string())) 177 } 178 179 // return on a control-c or internally requested shutdown signal 180 async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) { 181 let mut term_signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) 182 .expect("could not define signal"); 183 loop { 184 tokio::select! { 185 _ = shutdown_signal.recv() => { 186 info!("Shutting down webserver as requested"); 187 // server shutting down, exit loop 188 break; 189 }, 190 _ = tokio::signal::ctrl_c() => { 191 info!("Shutting down webserver due to SIGINT"); 192 break; 193 }, 194 _ = term_signal.recv() => { 195 info!("Shutting down webserver due to SIGTERM"); 196 break; 197 }, 198 } 199 } 200 } 201 202 /// Start running a Nostr relay server. 203 pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result<(), Error> { 204 trace!("Config: {:?}", settings); 205 // do some config validation. 206 if !Path::new(&settings.database.data_directory).is_dir() { 207 error!("Database directory does not exist"); 208 return Err(Error::DatabaseDirError); 209 } 210 let addr = format!( 211 "{}:{}", 212 settings.network.address.trim(), 213 settings.network.port 214 ); 215 let socket_addr = addr.parse().expect("listening address not valid"); 216 // address whitelisting settings 217 if let Some(addr_whitelist) = &settings.authorization.pubkey_whitelist { 218 info!( 219 "Event publishing restricted to {} pubkey(s)", 220 addr_whitelist.len() 221 ); 222 } 223 // check if NIP-05 enforced user verification is on 224 if settings.verified_users.is_active() { 225 info!( 226 "NIP-05 user verification mode:{:?}", 227 settings.verified_users.mode 228 ); 229 if let Some(d) = settings.verified_users.verify_update_duration() { 230 info!("NIP-05 check user verification every: {:?}", d); 231 } 232 if let Some(d) = settings.verified_users.verify_expiration_duration() { 233 info!("NIP-05 user verification expires after: {:?}", d); 234 } 235 if let Some(wl) = &settings.verified_users.domain_whitelist { 236 info!("NIP-05 domain whitelist: {:?}", wl); 237 } 238 if let Some(bl) = &settings.verified_users.domain_blacklist { 239 info!("NIP-05 domain blacklist: {:?}", bl); 240 } 241 } 242 // configure tokio runtime 243 let rt = Builder::new_multi_thread() 244 .enable_all() 245 .thread_name("tokio-ws") 246 .build() 247 .unwrap(); 248 // start tokio 249 rt.block_on(async { 250 let broadcast_buffer_limit = settings.limits.broadcast_buffer; 251 let persist_buffer_limit = settings.limits.event_persist_buffer; 252 let verified_users_active = settings.verified_users.is_active(); 253 let db_min_conn = settings.database.min_conn; 254 let db_max_conn = settings.database.max_conn; 255 let settings = settings.clone(); 256 info!("listening on: {}", socket_addr); 257 // all client-submitted valid events are broadcast to every 258 // other client on this channel. This should be large enough 259 // to accomodate slower readers (messages are dropped if 260 // clients can not keep up). 261 let (bcast_tx, _) = broadcast::channel::<Event>(broadcast_buffer_limit); 262 // validated events that need to be persisted are sent to the 263 // database on via this channel. 264 let (event_tx, event_rx) = mpsc::channel::<SubmittedEvent>(persist_buffer_limit); 265 // establish a channel for letting all threads now about a 266 // requested server shutdown. 267 let (invoke_shutdown, shutdown_listen) = broadcast::channel::<()>(1); 268 // create a channel for sending any new metadata event. These 269 // will get processed relatively slowly (a potentially 270 // multi-second blocking HTTP call) on a single thread, so we 271 // buffer requests on the channel. No harm in dropping events 272 // here, since we are protecting against DoS. This can make 273 // it difficult to setup initial metadata in bulk, since 274 // overwhelming this will drop events and won't register 275 // metadata events. 276 let (metadata_tx, metadata_rx) = broadcast::channel::<Event>(4096); 277 // start the database writer thread. Give it a channel for 278 // writing events, and for publishing events that have been 279 // written (to all connected clients). 280 db::db_writer( 281 settings.clone(), 282 event_rx, 283 bcast_tx.clone(), 284 metadata_tx.clone(), 285 shutdown_listen, 286 ) 287 .await; 288 info!("db writer created"); 289 290 // create a nip-05 verifier thread; if enabled. 291 if settings.verified_users.mode != VerifiedUsersMode::Disabled { 292 let verifier_opt = 293 nip05::Verifier::new(metadata_rx, bcast_tx.clone(), settings.clone()); 294 if let Ok(mut v) = verifier_opt { 295 if verified_users_active { 296 tokio::task::spawn(async move { 297 info!("starting up NIP-05 verifier..."); 298 v.run().await; 299 }); 300 } 301 } 302 } 303 // listen for (external to tokio) shutdown request 304 let controlled_shutdown = invoke_shutdown.clone(); 305 tokio::spawn(async move { 306 info!("control message listener started"); 307 match shutdown_rx.recv() { 308 Ok(()) => { 309 info!("control message requesting shutdown"); 310 controlled_shutdown.send(()).ok(); 311 } 312 Err(std::sync::mpsc::RecvError) => { 313 debug!("shutdown requestor is disconnected"); 314 } 315 }; 316 }); 317 // listen for ctrl-c interruupts 318 let ctrl_c_shutdown = invoke_shutdown.clone(); 319 // listener for webserver shutdown 320 let webserver_shutdown_listen = invoke_shutdown.subscribe(); 321 322 tokio::spawn(async move { 323 tokio::signal::ctrl_c().await.unwrap(); 324 info!("shutting down due to SIGINT (main)"); 325 ctrl_c_shutdown.send(()).ok(); 326 }); 327 // build a connection pool for sqlite connections 328 let pool = db::build_pool( 329 "client query", 330 &settings, 331 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY 332 | rusqlite::OpenFlags::SQLITE_OPEN_SHARED_CACHE, 333 db_min_conn, 334 db_max_conn, 335 true, 336 ); 337 // A `Service` is needed for every connection, so this 338 // creates one from our `handle_request` function. 339 let make_svc = make_service_fn(|conn: &AddrStream| { 340 let svc_pool = pool.clone(); 341 let remote_addr = conn.remote_addr(); 342 let bcast = bcast_tx.clone(); 343 let event = event_tx.clone(); 344 let stop = invoke_shutdown.clone(); 345 let settings = settings.clone(); 346 async move { 347 // service_fn converts our function into a `Service` 348 Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { 349 handle_web_request( 350 request, 351 svc_pool.clone(), 352 settings.clone(), 353 remote_addr, 354 bcast.clone(), 355 event.clone(), 356 stop.subscribe(), 357 ) 358 })) 359 } 360 }); 361 let server = Server::bind(&socket_addr) 362 .serve(make_svc) 363 .with_graceful_shutdown(ctrl_c_or_signal(webserver_shutdown_listen)); 364 // run hyper in this thread. This is why the thread does not return. 365 if let Err(e) = server.await { 366 eprintln!("server error: {}", e); 367 } 368 }); 369 Ok(()) 370 } 371 372 /// Nostr protocol messages from a client 373 #[derive(Deserialize, Serialize, Clone, PartialEq, Eq, Debug)] 374 #[serde(untagged)] 375 pub enum NostrMessage { 376 /// An `EVENT` message 377 EventMsg(EventCmd), 378 /// A `REQ` message 379 SubMsg(Subscription), 380 /// A `CLOSE` message 381 CloseMsg(CloseCmd), 382 } 383 384 /// Convert Message to NostrMessage 385 fn convert_to_msg(msg: String, max_bytes: Option<usize>) -> Result<NostrMessage> { 386 let parsed_res: Result<NostrMessage> = serde_json::from_str(&msg).map_err(|e| e.into()); 387 match parsed_res { 388 Ok(m) => { 389 if let NostrMessage::EventMsg(_) = m { 390 if let Some(max_size) = max_bytes { 391 // check length, ensure that some max size is set. 392 if msg.len() > max_size && max_size > 0 { 393 return Err(Error::EventMaxLengthError(msg.len())); 394 } 395 } 396 } 397 Ok(m) 398 } 399 Err(e) => { 400 debug!("proto parse error: {:?}", e); 401 debug!("parse error on message: {}", msg.trim()); 402 Err(Error::ProtoParseError) 403 } 404 } 405 } 406 407 /// Turn a string into a NOTICE message ready to send over a WebSocket 408 fn make_notice_message(msg: &str) -> Message { 409 Message::text(json!(["NOTICE", msg]).to_string()) 410 } 411 412 struct ClientInfo { 413 remote_ip: String, 414 user_agent: Option<String>, 415 } 416 417 /// Handle new client connections. This runs through an event loop 418 /// for all client communication. 419 async fn nostr_server( 420 pool: db::SqlitePool, 421 client_info: ClientInfo, 422 settings: Settings, 423 mut ws_stream: WebSocketStream<Upgraded>, 424 broadcast: Sender<Event>, 425 event_tx: mpsc::Sender<SubmittedEvent>, 426 mut shutdown: Receiver<()>, 427 ) { 428 // get a broadcast channel for clients to communicate on 429 let mut bcast_rx = broadcast.subscribe(); 430 // Track internal client state 431 let mut conn = conn::ClientConn::new(client_info.remote_ip); 432 // Use the remote IP as the client identifier 433 let cid = conn.get_client_prefix(); 434 // Create a channel for receiving query results from the database. 435 // we will send out the tx handle to any query we generate. 436 let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(256); 437 // Create channel for receiving NOTICEs 438 let (notice_tx, mut notice_rx) = mpsc::channel::<String>(32); 439 440 // last time this client sent data (message, ping, etc.) 441 let mut last_message_time = Instant::now(); 442 443 // ping interval (every 5 minutes) 444 let default_ping_dur = Duration::from_secs(settings.network.ping_interval_seconds.into()); 445 446 // disconnect after 20 minutes without a ping response or event. 447 let max_quiet_time = Duration::from_secs(60 * 20); 448 449 let start = tokio::time::Instant::now() + default_ping_dur; 450 let mut ping_interval = tokio::time::interval_at(start, default_ping_dur); 451 452 // maintain a hashmap of a oneshot channel for active subscriptions. 453 // when these subscriptions are cancelled, make a message 454 // available to the executing query so it knows to stop. 455 let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new(); 456 457 // for stats, keep track of how many events the client published, 458 // and how many it received from queries. 459 let mut client_published_event_count: usize = 0; 460 let mut client_received_event_count: usize = 0; 461 debug!("new connection for client: {}, ip: {:?}", cid, conn.ip()); 462 if let Some(ua) = client_info.user_agent { 463 debug!("client: {} has user-agent: {:?}", cid, ua); 464 } 465 loop { 466 tokio::select! { 467 _ = shutdown.recv() => { 468 info!("Close connection down due to shutdown, client: {}, ip: {:?}", cid, conn.ip()); 469 // server shutting down, exit loop 470 break; 471 }, 472 _ = ping_interval.tick() => { 473 // check how long since we talked to client 474 // if it has been too long, disconnect 475 if last_message_time.elapsed() > max_quiet_time { 476 debug!("ending connection due to lack of client ping response"); 477 break; 478 } 479 // Send a ping 480 ws_stream.send(Message::Ping(Vec::new())).await.ok(); 481 }, 482 Some(notice_msg) = notice_rx.recv() => { 483 ws_stream.send(make_notice_message(¬ice_msg)).await.ok(); 484 }, 485 Some(query_result) = query_rx.recv() => { 486 // database informed us of a query result we asked for 487 let subesc = query_result.sub_id.replace('"', ""); 488 if query_result.event == "EOSE" { 489 let send_str = format!("[\"EOSE\",\"{}\"]", subesc); 490 ws_stream.send(Message::Text(send_str)).await.ok(); 491 } else { 492 client_received_event_count += 1; 493 // send a result 494 let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event); 495 ws_stream.send(Message::Text(send_str)).await.ok(); 496 } 497 }, 498 // TODO: consider logging the LaggedRecv error 499 Ok(global_event) = bcast_rx.recv() => { 500 // an event has been broadcast to all clients 501 // first check if there is a subscription for this event. 502 for (s, sub) in conn.subscriptions() { 503 if !sub.interested_in_event(&global_event) { 504 continue; 505 } 506 507 // TODO: serialize at broadcast time, instead of 508 // once for each consumer. 509 if let Ok(event_str) = serde_json::to_string(&global_event) { 510 debug!("sub match for client: {}, sub: {:?}, event: {:?}", 511 cid, s, 512 global_event.get_event_id_prefix()); 513 // create an event response and send it 514 let subesc = s.replace('"', ""); 515 ws_stream.send(Message::Text(format!("[\"EVENT\",\"{}\",{}]", subesc, event_str))).await.ok(); 516 } else { 517 warn!("could not serialize event: {:?}", global_event.get_event_id_prefix()); 518 } 519 } 520 }, 521 ws_next = ws_stream.next() => { 522 // update most recent message time for client 523 last_message_time = Instant::now(); 524 // Consume text messages from the client, parse into Nostr messages. 525 let nostr_msg = match ws_next { 526 Some(Ok(Message::Text(m))) => { 527 convert_to_msg(m,settings.limits.max_event_bytes) 528 }, 529 Some(Ok(Message::Binary(_))) => { 530 ws_stream.send( 531 make_notice_message("binary messages are not accepted")).await.ok(); 532 continue; 533 }, 534 Some(Ok(Message::Ping(_) | Message::Pong(_))) => { 535 // get a ping/pong, ignore. tungstenite will 536 // send responses automatically. 537 continue; 538 }, 539 Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => { 540 ws_stream.send( 541 make_notice_message( 542 &format!("message too large ({} > {})",size, max_size))).await.ok(); 543 continue; 544 }, 545 None | 546 Some(Ok(Message::Close(_)) | 547 Err(WsError::AlreadyClosed | WsError::ConnectionClosed | 548 WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake))) 549 => { 550 debug!("websocket close from client: {}, ip: {:?}",cid, conn.ip()); 551 break; 552 }, 553 Some(Err(WsError::Io(e))) => { 554 // IO errors are considered fatal 555 warn!("IO error (client: {}, ip: {:?}): {:?}", cid, conn.ip(), e); 556 break; 557 } 558 x => { 559 // default condition on error is to close the client connection 560 info!("unknown error (client: {}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x); 561 break; 562 } 563 }; 564 565 // convert ws_next into proto_next 566 match nostr_msg { 567 Ok(NostrMessage::EventMsg(ec)) => { 568 // An EventCmd needs to be validated to be converted into an Event 569 // handle each type of message 570 let parsed : Result<Event> = Result::<Event>::from(ec); 571 match parsed { 572 Ok(e) => { 573 let id_prefix:String = e.id.chars().take(8).collect(); 574 debug!("successfully parsed/validated event: {:?} from client: {}", id_prefix, cid); 575 // check if the event is too far in the future. 576 if e.is_valid_timestamp(settings.options.reject_future_seconds) { 577 // Write this to the database. 578 let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone() }; 579 event_tx.send(submit_event).await.ok(); 580 client_published_event_count += 1; 581 } else { 582 info!("client: {} sent a far future-dated event", cid); 583 if let Some(fut_sec) = settings.options.reject_future_seconds { 584 ws_stream.send(make_notice_message(&format!("The event created_at field is out of the acceptable range (+{}sec) for this relay and was not stored.",fut_sec))).await.ok(); 585 } 586 } 587 }, 588 Err(_) => { 589 info!("client: {} sent an invalid event", cid); 590 ws_stream.send(make_notice_message("event was invalid")).await.ok(); 591 } 592 } 593 }, 594 Ok(NostrMessage::SubMsg(s)) => { 595 debug!("client: {} requesting a subscription", cid); 596 // subscription handling consists of: 597 // * registering the subscription so future events can be matched 598 // * making a channel to cancel to request later 599 // * sending a request for a SQL query 600 let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>(); 601 match conn.subscribe(s.clone()) { 602 Ok(()) => { 603 // when we insert, if there was a previous query running with the same name, cancel it. 604 if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) { 605 previous_query.send(()).ok(); 606 } 607 // start a database query 608 db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await; 609 }, 610 Err(e) => { 611 info!("Subscription error: {}", e); 612 ws_stream.send(make_notice_message(&e.to_string())).await.ok(); 613 } 614 } 615 }, 616 Ok(NostrMessage::CloseMsg(cc)) => { 617 // closing a request simply removes the subscription. 618 let parsed : Result<Close> = Result::<Close>::from(cc); 619 if let Ok(c) = parsed { 620 // check if a query is currently 621 // running, and remove it if so. 622 let stop_tx = running_queries.remove(&c.id); 623 if let Some(tx) = stop_tx { 624 tx.send(()).ok(); 625 } 626 // stop checking new events against 627 // the subscription 628 conn.unsubscribe(&c); 629 } else { 630 info!("invalid command ignored"); 631 ws_stream.send(make_notice_message("could not parse command")).await.ok(); 632 } 633 }, 634 Err(Error::ConnError) => { 635 debug!("got connection close/error, disconnecting client: {}, ip: {:?}",cid, conn.ip()); 636 break; 637 } 638 Err(Error::EventMaxLengthError(s)) => { 639 info!("client: {} sent event larger ({} bytes) than max size", cid, s); 640 ws_stream.send(make_notice_message("event exceeded max size")).await.ok(); 641 }, 642 Err(Error::ProtoParseError) => { 643 info!("client {} sent event that could not be parsed", cid); 644 ws_stream.send(make_notice_message("could not parse command")).await.ok(); 645 }, 646 Err(e) => { 647 info!("got non-fatal error from client: {}, error: {:?}", cid, e); 648 }, 649 } 650 }, 651 } 652 } 653 // connection cleanup - ensure any still running queries are terminated. 654 for (_, stop_tx) in running_queries { 655 stop_tx.send(()).ok(); 656 } 657 info!( 658 "stopping connection for client: {}, ip: {:?} (client sent {} event(s), received {})", 659 cid, 660 conn.ip(), 661 client_published_event_count, 662 client_received_event_count 663 ); 664 }