nostr-rs-relay

My dev fork of nostr-rs-relay
git clone git://jb55.com/nostr-rs-relay
Log | Files | Refs | README | LICENSE

commit 56c40f2be92750337ad8f81270142444c2937acc
parent cacd1ccb361be0227b9bd1f5b343f3039daf1ce6
Author: Greg Heartsfield <scsibug@imap.cc>
Date:   Sun, 12 Dec 2021 10:03:28 -0600

refactor: improve error messages

Diffstat:
Msrc/conn.rs | 18+++++++++++-------
Msrc/db.rs | 15+++++++--------
Msrc/main.rs | 58+++++++++++++++++++++++++++++++++++-----------------------
Msrc/protostream.rs | 2+-
4 files changed, 54 insertions(+), 39 deletions(-)

diff --git a/src/conn.rs b/src/conn.rs @@ -61,26 +61,30 @@ impl ClientConn { // prevent arbitrarily long subscription identifiers from // being used. if sub_id_len > MAX_SUBSCRIPTION_ID_LEN { - info!("Dropping subscription with huge ({}) length", sub_id_len); + info!( + "ignoring sub request with excessive length: ({})", + sub_id_len + ); return Ok(()); } // check if an existing subscription exists, and replace if so if self.subscriptions.contains_key(&k) { self.subscriptions.remove(&k); self.subscriptions.insert(k, s); - debug!("Replaced existing subscription"); + debug!("replaced existing subscription"); return Ok(()); } // check if there is room for another subscription. if self.subscriptions.len() >= self.max_subs { - info!("Client has reached the maximum number of unique subscriptions"); + // TODO: return error/notice for this + info!("client has reached the maximum number of unique subscriptions"); return Ok(()); } // add subscription self.subscriptions.insert(k, s); - info!( - "Registered new subscription, currently have {} active subs", + debug!( + "registered new subscription, currently have {} active subs", self.subscriptions.len() ); Ok(()) @@ -90,8 +94,8 @@ impl ClientConn { pub fn unsubscribe(&mut self, c: Close) { // TODO: return notice if subscription did not exist. self.subscriptions.remove(&c.id); - info!( - "Removed subscription, currently have {} active subs", + debug!( + "removed subscription, currently have {} active subs", self.subscriptions.len() ); } diff --git a/src/db.rs b/src/db.rs @@ -73,12 +73,12 @@ pub async fn db_writer( Path::new(DB_FILE), OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, )?; - info!("Opened database for writing"); + info!("opened database for writing"); // TODO: determine if we need to execute the init script. // TODO: check database app id / version before proceeding. match conn.execute_batch(INIT_SQL) { - Ok(()) => info!("init completed"), - Err(err) => info!("update failed: {}", err), + Ok(()) => info!("database pragma/schema initialized and ready"), + Err(err) => error!("update failed: {}", err), } loop { // call blocking read on channel @@ -88,13 +88,12 @@ pub async fn db_writer( break; } let event = next_event.unwrap(); - info!("Got event to write: {}", event.get_event_id_prefix()); match write_event(&mut conn, &event) { Ok(updated) => { if updated == 0 { info!("nothing inserted (dupe?)"); } else { - info!("persisted new event"); + info!("persisted event: {}", event.get_event_id_prefix()); } } Err(err) => { @@ -251,7 +250,7 @@ fn query_from_sub(sub: &Subscription) -> String { query.push_str(" WHERE "); query.push_str(&filter_clauses.join(" OR ")); } - debug!("Query: {}", query); + debug!("query string: {}", query); query } @@ -270,8 +269,8 @@ pub async fn db_query( let conn = Connection::open_with_flags(Path::new(DB_FILE), OpenFlags::SQLITE_OPEN_READ_ONLY) .unwrap(); - info!("Opened database for reading"); - info!("Going to query for: {:?}", sub); + debug!("opened database for reading"); + debug!("going to query for: {:?}", sub); // generate SQL query let q = query_from_sub(&sub); // execute the query diff --git a/src/main.rs b/src/main.rs @@ -35,7 +35,7 @@ fn main() -> Result<(), Error> { // start tokio rt.block_on(async { let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); - info!("Listening on: {}", addr); + info!("listening on: {}", addr); // all client-submitted valid events are broadcast to every // other client on this channel. This should be large enough // to accomodate slower readers (messages are dropped if @@ -53,7 +53,7 @@ fn main() -> Result<(), Error> { // listen for ctrl-c interruupts tokio::spawn(async move { tokio::signal::ctrl_c().await.unwrap(); - info!("Shutting down due to SIGINT"); + info!("shutting down due to SIGINT"); ctrl_c_shutdown.send(()).ok(); }); let mut stop_listening = invoke_shutdown.subscribe(); @@ -104,7 +104,11 @@ async fn nostr_server( // available to the executing query so it knows to stop. //let (abandon_query_tx, _) = oneshot::channel::<()>(); let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new(); - + // for stats, keep track of how many events the client published, + // and how many it received from queries. + let mut client_published_event_count: usize = 0; + let mut client_received_event_count: usize = 0; + info!("new connection for client: {}", cid); loop { tokio::select! { _ = shutdown.recv() => { @@ -112,23 +116,24 @@ async fn nostr_server( break; }, Some(query_result) = query_rx.recv() => { - info!("Got query result"); + // database informed us of a query result we asked for let res = EventRes(query_result.sub_id,query_result.event); + client_received_event_count += 1; nostr_stream.send(res).await.ok(); }, Ok(global_event) = bcast_rx.recv() => { - // ignoring closed broadcast errors, there will always be one sender available. - // Is there a subscription for this event? + // an event has been broadcast to all clients + // first check if there is a subscription for this event. let sub_name_opt = conn.get_matching_subscription(&global_event); - if sub_name_opt.is_some() { - let sub_name = sub_name_opt.unwrap(); - let event_str = serde_json::to_string(&global_event); - if event_str.is_ok() { - info!("sub match: client: {}, sub: {}, event: {}", - cid, sub_name, - global_event.get_event_id_prefix()); + if let Some(sub_name) = sub_name_opt { + // TODO: serialize at broadcast time, instead of + // once for each consumer. + if let Ok(event_str) = serde_json::to_string(&global_event) { + debug!("sub match: client: {}, sub: {}, event: {}", + cid, sub_name, + global_event.get_event_id_prefix()); // create an event response and send it - let res = EventRes(sub_name.to_owned(),event_str.unwrap()); + let res = EventRes(sub_name.to_owned(),event_str); nostr_stream.send(res).await.ok(); } else { warn!("could not convert event to string"); @@ -145,20 +150,21 @@ async fn nostr_server( match parsed { Ok(e) => { let id_prefix:String = e.id.chars().take(8).collect(); - info!("Successfully parsed/validated event: {} from client: {}", id_prefix, cid); + debug!("successfully parsed/validated event: {} from client: {}", id_prefix, cid); // Write this to the database event_tx.send(e.clone()).await.ok(); + client_published_event_count += 1; // send this event to everyone listening. let bcast_res = broadcast.send(e); if bcast_res.is_err() { - warn!("Could not send broadcast message: {:?}", bcast_res); + warn!("could not send broadcast message: {:?}", bcast_res); } }, - Err(_) => {info!("Client {} sent an invalid event", cid)} + Err(_) => {info!("client {} sent an invalid event", cid)} } }, Some(Ok(SubMsg(s))) => { - info!("Client {} requesting a subscription", cid); + debug!("client {} requesting a subscription", cid); // subscription handling consists of: // * registering the subscription so future events can be matched @@ -176,22 +182,25 @@ async fn nostr_server( let parsed : Result<Close> = Result::<Close>::from(cc); match parsed { Ok(c) => { + // check if a query is currently + // running, and remove it if so. let stop_tx = running_queries.remove(&c.id); if let Some(tx) = stop_tx { - info!("Removing query, telling DB to abandon query"); tx.send(()).ok(); } + // stop checking new events against + // the subscription conn.unsubscribe(c); }, - Err(_) => {info!("Invalid command ignored");} + Err(_) => {info!("invalid command ignored");} } }, None => { - info!("normal websocket close from client: {}",cid); + debug!("normal websocket close from client: {}",cid); break; }, Some(Err(Error::ConnError)) => { - info!("got connection close/error, disconnecting client: {}",cid); + debug!("got connection close/error, disconnecting client: {}",cid); break; } Some(Err(e)) => { @@ -205,5 +214,8 @@ async fn nostr_server( for (_, stop_tx) in running_queries.into_iter() { stop_tx.send(()).ok(); } - info!("stopping client connection: {}", cid); + info!( + "stopping connection for client: {} (client sent {} event(s), received {})", + cid, client_published_event_count, client_received_event_count + ); } diff --git a/src/protostream.rs b/src/protostream.rs @@ -57,7 +57,7 @@ impl Stream for NostrStream { match parsed_res { Ok(m) => Ok(m), Err(e) => { - debug!("Proto parse error: {:?}", e); + debug!("proto parse error: {:?}", e); Err(Error::ProtoParseError) } }