commit 620e22769977200dc19586310bea30a3a3089608
parent 14e59ed278f52eb7347fa374c67a439a79971c99
Author: Greg Heartsfield <scsibug@imap.cc>
Date: Sat, 1 Jan 2022 08:08:54 -0600
fix: connection issues with Firefox
This adds Hyper, and a 200 response code. Prior to this, Firefox
would fail to connect. There is also a text document displayed at the
root URL to indicate this is a Nostr relay.
Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/15
Diffstat:
4 files changed, 266 insertions(+), 30 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -356,6 +356,25 @@ dependencies = [
]
[[package]]
+name = "h2"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd"
+dependencies = [
+ "bytes",
+ "fnv",
+ "futures-core",
+ "futures-sink",
+ "futures-util",
+ "http",
+ "indexmap",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
+[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -400,18 +419,59 @@ dependencies = [
]
[[package]]
+name = "http-body"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
+dependencies = [
+ "bytes",
+ "http",
+ "pin-project-lite",
+]
+
+[[package]]
name = "httparse"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503"
[[package]]
+name = "httpdate"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
+
+[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
+name = "hyper"
+version = "0.14.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b7ec3e62bdc98a2f0393a5048e4c30ef659440ea6e0e572965103e72bd836f55"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "httparse",
+ "httpdate",
+ "itoa",
+ "pin-project-lite",
+ "socket2",
+ "tokio",
+ "tower-service",
+ "tracing",
+ "want",
+]
+
+[[package]]
name = "idna"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -423,6 +483,16 @@ dependencies = [
]
[[package]]
+name = "indexmap"
+version = "1.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
+dependencies = [
+ "autocfg 1.0.1",
+ "hashbrown",
+]
+
+[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -582,6 +652,7 @@ dependencies = [
"futures-util",
"governor",
"hex",
+ "hyper",
"lazy_static",
"log",
"nonzero_ext",
@@ -1062,6 +1133,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
[[package]]
+name = "socket2"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516"
+dependencies = [
+ "libc",
+ "winapi",
+]
+
+[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1166,6 +1247,20 @@ dependencies = [
]
[[package]]
+name = "tokio-util"
+version = "0.6.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "log",
+ "pin-project-lite",
+ "tokio",
+]
+
+[[package]]
name = "toml"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1175,6 +1270,38 @@ dependencies = [
]
[[package]]
+name = "tower-service"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
+
+[[package]]
+name = "tracing"
+version = "0.1.29"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
+dependencies = [
+ "cfg-if",
+ "pin-project-lite",
+ "tracing-core",
+]
+
+[[package]]
+name = "tracing-core"
+version = "0.1.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
+dependencies = [
+ "lazy_static",
+]
+
+[[package]]
+name = "try-lock"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
+
+[[package]]
name = "tungstenite"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1260,6 +1387,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
+name = "want"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
+dependencies = [
+ "log",
+ "try-lock",
+]
+
+[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
@@ -23,3 +23,4 @@ rusqlite = "^0.26"
lazy_static = "^1.4"
governor = "^0.4"
nonzero_ext = "^0.3"
+hyper={ version="0.14", features=["server","http1","http2","tcp"] }
diff --git a/src/main.rs b/src/main.rs
@@ -1,6 +1,11 @@
//! Server process
use futures::SinkExt;
use futures::StreamExt;
+use hyper::service::{make_service_fn, service_fn};
+use hyper::upgrade::Upgraded;
+use hyper::{
+ header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
+};
use log::*;
use nostr_rs_relay::close::Close;
use nostr_rs_relay::config;
@@ -12,14 +17,17 @@ use nostr_rs_relay::protostream;
use nostr_rs_relay::protostream::NostrMessage::*;
use nostr_rs_relay::protostream::NostrResponse::*;
use std::collections::HashMap;
+use std::convert::Infallible;
use std::env;
+use std::net::SocketAddr;
use std::path::Path;
-use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Builder;
use tokio::sync::broadcast;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
+use tokio_tungstenite::WebSocketStream;
+use tungstenite::handshake;
use tungstenite::protocol::WebSocketConfig;
fn db_from_args(args: Vec<String>) -> Option<String> {
@@ -28,6 +36,88 @@ fn db_from_args(args: Vec<String>) -> Option<String> {
}
None
}
+async fn handle_web_request(
+ mut request: Request<Body>,
+ remote_addr: SocketAddr,
+ broadcast: Sender<Event>,
+ event_tx: tokio::sync::mpsc::Sender<Event>,
+ shutdown: Receiver<()>,
+) -> Result<Response<Body>, Infallible> {
+ match (
+ request.uri().path(),
+ request.headers().contains_key(header::UPGRADE),
+ ) {
+ //if the request is ws_echo and the request headers contains an Upgrade key
+ ("/", true) => {
+ debug!("websocket with upgrade request");
+ //assume request is a handshake, so create the handshake response
+ let response = match handshake::server::create_response_with_body(&request, || {
+ Body::empty()
+ }) {
+ Ok(response) => {
+ //in case the handshake response creation succeeds,
+ //spawn a task to handle the websocket connection
+ tokio::spawn(async move {
+ //using the hyper feature of upgrading a connection
+ match upgrade::on(&mut request).await {
+ //if successfully upgraded
+ Ok(upgraded) => {
+ //create a websocket stream from the upgraded object
+ let ws_stream = WebSocketStream::from_raw_socket(
+ //pass the upgraded object
+ //as the base layer stream of the Websocket
+ upgraded,
+ tokio_tungstenite::tungstenite::protocol::Role::Server,
+ None,
+ )
+ .await;
+ tokio::spawn(nostr_server(
+ ws_stream, broadcast, event_tx, shutdown,
+ ));
+ }
+ Err(e) => println!(
+ "error when trying to upgrade connection \
+ from address {} to websocket connection. \
+ Error is: {}",
+ remote_addr, e
+ ),
+ }
+ });
+ //return the response to the handshake request
+ response
+ }
+ Err(error) => {
+ warn!("websocket response failed");
+ let mut res =
+ Response::new(Body::from(format!("Failed to create websocket: {}", error)));
+ *res.status_mut() = StatusCode::BAD_REQUEST;
+ return Ok(res);
+ }
+ };
+ Ok::<_, Infallible>(response)
+ }
+ ("/", false) => {
+ // handle request at root with no upgrade header
+ Ok(Response::new(Body::from(format!(
+ "This is a Nostr relay.\n"
+ ))))
+ }
+ (_, _) => {
+ //handle any other url
+ Ok(Response::builder()
+ .status(StatusCode::NOT_FOUND)
+ .body(Body::from("Nothing here."))
+ .unwrap())
+ }
+ }
+}
+
+async fn shutdown_signal() {
+ // Wait for the CTRL+C signal
+ tokio::signal::ctrl_c()
+ .await
+ .expect("failed to install CTRL+C signal handler");
+}
/// Start running a Nostr relay server.
fn main() -> Result<(), Error> {
@@ -46,6 +136,7 @@ fn main() -> Result<(), Error> {
}
*settings = c;
}
+
let config = config::SETTINGS.read().unwrap();
// do some config validation.
if !Path::new(&config.database.data_directory).is_dir() {
@@ -54,6 +145,7 @@ fn main() -> Result<(), Error> {
}
debug!("config: {:?}", config);
let addr = format!("{}:{}", config.network.address.trim(), config.network.port);
+ let socket_addr = addr.parse().expect("listening address not valid");
// configure tokio runtime
let rt = Builder::new_multi_thread()
.enable_all()
@@ -63,8 +155,7 @@ fn main() -> Result<(), Error> {
// start tokio
rt.block_on(async {
let settings = config::SETTINGS.read().unwrap();
- let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
- info!("listening on: {}", addr);
+ info!("listening on: {}", socket_addr);
// all client-submitted valid events are broadcast to every
// other client on this channel. This should be large enough
// to accomodate slower readers (messages are dropped if
@@ -77,7 +168,7 @@ fn main() -> Result<(), Error> {
// requested server shutdown.
let (invoke_shutdown, _) = broadcast::channel::<()>(1);
let ctrl_c_shutdown = invoke_shutdown.clone();
- // listen for ctrl-c interruupts
+ // // listen for ctrl-c interruupts
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
info!("shutting down due to SIGINT");
@@ -87,28 +178,35 @@ fn main() -> Result<(), Error> {
// writing events, and for publishing events that have been
// written (to all connected clients).
db::db_writer(event_rx, bcast_tx.clone(), invoke_shutdown.subscribe()).await;
-
- // track unique client connection count
- let mut client_accept_count: usize = 0;
- let mut stop_listening = invoke_shutdown.subscribe();
- // handle new client connection requests, or SIGINT signals.
- loop {
- tokio::select! {
- _ = stop_listening.recv() => {
- break;
- }
- Ok((stream, _)) = listener.accept() => {
- client_accept_count += 1;
- info!("creating new connection for client #{}",client_accept_count);
- tokio::spawn(nostr_server(
- stream,
- bcast_tx.clone(),
- event_tx.clone(),
- invoke_shutdown.subscribe(),
- ));
- }
+ info!("db writer created");
+ // A `Service` is needed for every connection, so this
+ // creates one from our `handle_request` function.
+ let make_svc = make_service_fn(|conn: &AddrStream| {
+ let remote_addr = conn.remote_addr();
+ let bcast = bcast_tx.clone();
+ let event = event_tx.clone();
+ let stop = invoke_shutdown.clone();
+ async move {
+ // service_fn converts our function into a `Service`
+ Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
+ handle_web_request(
+ request,
+ remote_addr,
+ bcast.clone(),
+ event.clone(),
+ stop.subscribe(),
+ )
+ }))
}
+ });
+ let server = Server::bind(&socket_addr)
+ .serve(make_svc)
+ .with_graceful_shutdown(shutdown_signal());
+ // run hyper
+ if let Err(e) = server.await {
+ eprintln!("server error: {}", e);
}
+ // our code
});
Ok(())
}
@@ -116,7 +214,7 @@ fn main() -> Result<(), Error> {
/// Handle new client connections. This runs through an event loop
/// for all client communication.
async fn nostr_server(
- stream: TcpStream,
+ ws_stream: WebSocketStream<Upgraded>,
broadcast: Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<Event>,
mut shutdown: Receiver<()>,
@@ -130,8 +228,8 @@ async fn nostr_server(
config.max_frame_size = settings.limits.max_ws_frame_bytes;
}
// upgrade the TCP connection to WebSocket
- let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await;
- let ws_stream = conn.expect("websocket handshake error");
+ //let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await;
+ //let ws_stream = conn.expect("websocket handshake error");
// wrap websocket into a stream & sink of Nostr protocol messages
let mut nostr_stream = protostream::wrap_ws_in_nostr(ws_stream);
// Track internal client state
diff --git a/src/protostream.rs b/src/protostream.rs
@@ -9,9 +9,9 @@ use futures::sink::Sink;
use futures::stream::Stream;
use futures::task::Context;
use futures::task::Poll;
+use hyper::upgrade::Upgraded;
use log::*;
use serde::{Deserialize, Serialize};
-use tokio::net::TcpStream;
use tokio_tungstenite::WebSocketStream;
use tungstenite::error::Error as WsError;
use tungstenite::protocol::Message;
@@ -40,11 +40,11 @@ pub enum NostrResponse {
/// A Nostr protocol stream is layered on top of a Websocket stream.
pub struct NostrStream {
- ws_stream: WebSocketStream<TcpStream>,
+ ws_stream: WebSocketStream<Upgraded>,
}
/// Given a websocket, return a protocol stream wrapper.
-pub fn wrap_ws_in_nostr(ws: WebSocketStream<TcpStream>) -> NostrStream {
+pub fn wrap_ws_in_nostr(ws: WebSocketStream<Upgraded>) -> NostrStream {
NostrStream { ws_stream: ws }
}