nostr-rs-relay

My dev fork of nostr-rs-relay
git clone git://jb55.com/nostr-rs-relay
Log | Files | Refs | README | LICENSE

mod.rs (3847B)


      1 use anyhow::{anyhow, Result};
      2 use nostr_rs_relay::config;
      3 use nostr_rs_relay::server::start_server;
      4 //use http::{Request, Response};
      5 use hyper::{Client, StatusCode, Uri};
      6 use std::net::TcpListener;
      7 use std::sync::atomic::{AtomicU16, Ordering};
      8 use std::sync::mpsc as syncmpsc;
      9 use std::sync::mpsc::{Receiver as MpscReceiver, Sender as MpscSender};
     10 use std::thread;
     11 use std::thread::JoinHandle;
     12 use std::time::Duration;
     13 use tracing::{debug, info};
     14 
     15 pub struct Relay {
     16     pub port: u16,
     17     pub handle: JoinHandle<()>,
     18     pub shutdown_tx: MpscSender<()>,
     19 }
     20 
     21 pub fn start_relay() -> Result<Relay> {
     22     // setup tracing
     23     let _trace_sub = tracing_subscriber::fmt::try_init();
     24     info!("Starting a new relay");
     25     // replace default settings
     26     let mut settings = config::Settings::default();
     27     // identify open port
     28     info!("Checking for address...");
     29     let port = get_available_port().unwrap();
     30     info!("Found open port: {}", port);
     31     // bind to local interface only
     32     settings.network.address = "127.0.0.1".to_owned();
     33     settings.network.port = port;
     34     // create an in-memory DB with multiple readers
     35     settings.database.in_memory = true;
     36     settings.database.min_conn = 4;
     37     settings.database.max_conn = 8;
     38     let (shutdown_tx, shutdown_rx): (MpscSender<()>, MpscReceiver<()>) = syncmpsc::channel();
     39     let handle = thread::spawn(|| {
     40         // server will block the thread it is run on.
     41         let _ = start_server(settings, shutdown_rx);
     42     });
     43     // how do we know the relay has finished starting up?
     44     Ok(Relay {
     45         port,
     46         handle,
     47         shutdown_tx,
     48     })
     49 }
     50 
     51 // check if the server is healthy via HTTP request
     52 async fn server_ready(relay: &Relay) -> Result<bool> {
     53     let uri: String = format!("http://127.0.0.1:{}/", relay.port);
     54     let client = Client::new();
     55     let uri: Uri = uri.parse().unwrap();
     56     let res = client.get(uri).await?;
     57     Ok(res.status() == StatusCode::OK)
     58 }
     59 
     60 pub async fn wait_for_healthy_relay(relay: &Relay) -> Result<()> {
     61     // TODO: maximum time to wait for server to become healthy.
     62     // give it a little time to start up before we start polling
     63     tokio::time::sleep(Duration::from_millis(10)).await;
     64     loop {
     65         let server_check = server_ready(relay).await;
     66         match server_check {
     67             Ok(true) => {
     68                 // server responded with 200-OK.
     69                 break;
     70             }
     71             Ok(false) => {
     72                 // server responded with an error, we're done.
     73                 return Err(anyhow!("Got non-200-OK from relay"));
     74             }
     75             Err(_) => {
     76                 // server is not yet ready, probably connection refused...
     77                 debug!("Relay not ready, will try again...");
     78                 tokio::time::sleep(Duration::from_millis(10)).await;
     79             }
     80         }
     81     }
     82     info!("relay is ready");
     83     Ok(())
     84     // simple message sent to web browsers
     85     //let mut request = Request::builder()
     86     //    .uri("https://www.rust-lang.org/")
     87     //    .header("User-Agent", "my-awesome-agent/1.0");
     88 }
     89 
     90 // from https://elliotekj.com/posts/2017/07/25/find-available-tcp-port-rust/
     91 // This needed some modification; if multiple tasks all ask for open ports, they will tend to get the same one.
     92 // instead we should try to try these incrementally/globally.
     93 
     94 static PORT_COUNTER: AtomicU16 = AtomicU16::new(4030);
     95 
     96 fn get_available_port() -> Option<u16> {
     97     let startsearch = PORT_COUNTER.fetch_add(10, Ordering::SeqCst);
     98     if startsearch >= 20000 {
     99         // wrap around
    100         PORT_COUNTER.store(4030, Ordering::Relaxed);
    101     }
    102     (startsearch..20000).find(|port| port_is_available(*port))
    103 }
    104 pub fn port_is_available(port: u16) -> bool {
    105     info!("checking on port {}", port);
    106     match TcpListener::bind(("127.0.0.1", port)) {
    107         Ok(_) => true,
    108         Err(_) => false,
    109     }
    110 }