mod.rs (3847B)
1 use anyhow::{anyhow, Result}; 2 use nostr_rs_relay::config; 3 use nostr_rs_relay::server::start_server; 4 //use http::{Request, Response}; 5 use hyper::{Client, StatusCode, Uri}; 6 use std::net::TcpListener; 7 use std::sync::atomic::{AtomicU16, Ordering}; 8 use std::sync::mpsc as syncmpsc; 9 use std::sync::mpsc::{Receiver as MpscReceiver, Sender as MpscSender}; 10 use std::thread; 11 use std::thread::JoinHandle; 12 use std::time::Duration; 13 use tracing::{debug, info}; 14 15 pub struct Relay { 16 pub port: u16, 17 pub handle: JoinHandle<()>, 18 pub shutdown_tx: MpscSender<()>, 19 } 20 21 pub fn start_relay() -> Result<Relay> { 22 // setup tracing 23 let _trace_sub = tracing_subscriber::fmt::try_init(); 24 info!("Starting a new relay"); 25 // replace default settings 26 let mut settings = config::Settings::default(); 27 // identify open port 28 info!("Checking for address..."); 29 let port = get_available_port().unwrap(); 30 info!("Found open port: {}", port); 31 // bind to local interface only 32 settings.network.address = "127.0.0.1".to_owned(); 33 settings.network.port = port; 34 // create an in-memory DB with multiple readers 35 settings.database.in_memory = true; 36 settings.database.min_conn = 4; 37 settings.database.max_conn = 8; 38 let (shutdown_tx, shutdown_rx): (MpscSender<()>, MpscReceiver<()>) = syncmpsc::channel(); 39 let handle = thread::spawn(|| { 40 // server will block the thread it is run on. 41 let _ = start_server(settings, shutdown_rx); 42 }); 43 // how do we know the relay has finished starting up? 44 Ok(Relay { 45 port, 46 handle, 47 shutdown_tx, 48 }) 49 } 50 51 // check if the server is healthy via HTTP request 52 async fn server_ready(relay: &Relay) -> Result<bool> { 53 let uri: String = format!("http://127.0.0.1:{}/", relay.port); 54 let client = Client::new(); 55 let uri: Uri = uri.parse().unwrap(); 56 let res = client.get(uri).await?; 57 Ok(res.status() == StatusCode::OK) 58 } 59 60 pub async fn wait_for_healthy_relay(relay: &Relay) -> Result<()> { 61 // TODO: maximum time to wait for server to become healthy. 62 // give it a little time to start up before we start polling 63 tokio::time::sleep(Duration::from_millis(10)).await; 64 loop { 65 let server_check = server_ready(relay).await; 66 match server_check { 67 Ok(true) => { 68 // server responded with 200-OK. 69 break; 70 } 71 Ok(false) => { 72 // server responded with an error, we're done. 73 return Err(anyhow!("Got non-200-OK from relay")); 74 } 75 Err(_) => { 76 // server is not yet ready, probably connection refused... 77 debug!("Relay not ready, will try again..."); 78 tokio::time::sleep(Duration::from_millis(10)).await; 79 } 80 } 81 } 82 info!("relay is ready"); 83 Ok(()) 84 // simple message sent to web browsers 85 //let mut request = Request::builder() 86 // .uri("https://www.rust-lang.org/") 87 // .header("User-Agent", "my-awesome-agent/1.0"); 88 } 89 90 // from https://elliotekj.com/posts/2017/07/25/find-available-tcp-port-rust/ 91 // This needed some modification; if multiple tasks all ask for open ports, they will tend to get the same one. 92 // instead we should try to try these incrementally/globally. 93 94 static PORT_COUNTER: AtomicU16 = AtomicU16::new(4030); 95 96 fn get_available_port() -> Option<u16> { 97 let startsearch = PORT_COUNTER.fetch_add(10, Ordering::SeqCst); 98 if startsearch >= 20000 { 99 // wrap around 100 PORT_COUNTER.store(4030, Ordering::Relaxed); 101 } 102 (startsearch..20000).find(|port| port_is_available(*port)) 103 } 104 pub fn port_is_available(port: u16) -> bool { 105 info!("checking on port {}", port); 106 match TcpListener::bind(("127.0.0.1", port)) { 107 Ok(_) => true, 108 Err(_) => false, 109 } 110 }