commit 8b4c43ae71b9d6a1b9703eadbb201a5b14e77d51
parent 35ceb7cb6471ae567c31668d6c81d6ad54fbf606
Author: Greg Heartsfield <scsibug@imap.cc>
Date: Sun, 5 Dec 2021 18:14:14 -0600
feat: add and remove subscriptions from client requests
A hashmap of active subscriptions is maintained for each client. REQ
and CLOSE commands will modify the subscription list.
Diffstat:
5 files changed, 89 insertions(+), 25 deletions(-)
diff --git a/src/close.rs b/src/close.rs
@@ -2,20 +2,28 @@ use crate::error::{Error, Result};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
-pub struct Close {
+pub struct CloseCmd {
cmd: String,
id: String,
}
-impl Close {
- pub fn parse(json: &str) -> Result<Close> {
- let c: Close = serde_json::from_str(json)?; //.map_err(|e| Error::JsonParseFailed(e));
- if c.cmd != "CLOSE" {
- return Err(Error::CloseParseFailed);
+#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
+pub struct Close {
+ id: String,
+}
+
+impl From<CloseCmd> for Result<Close> {
+ fn from(cc: CloseCmd) -> Result<Close> {
+ // ensure command is correct
+ if cc.cmd != "CLOSE" {
+ return Err(Error::CommandUnknownError);
+ } else {
+ return Ok(Close { id: cc.id });
}
- return Ok(c);
}
+}
+impl Close {
pub fn get_id(&self) -> String {
self.id.clone()
}
diff --git a/src/conn.rs b/src/conn.rs
@@ -1,14 +1,21 @@
-//use std::collections::HashMap;
+use crate::close::Close;
+use crate::error::Result;
+use crate::subscription::Subscription;
+use log::*;
+use std::collections::HashMap;
use uuid::Uuid;
+// subscription identifiers must be reasonably sized.
+const MAX_SUBSCRIPTION_ID_LEN: usize = 256;
+
// state for a client connection
pub struct ClientConn {
_client_id: Uuid,
// current set of subscriptions
- //subscriptions: HashMap<String, Subscription>,
+ subscriptions: HashMap<String, Subscription>,
// websocket
//stream: WebSocketStream<TcpStream>,
- _max_subs: usize,
+ max_subs: usize,
}
impl ClientConn {
@@ -16,7 +23,46 @@ impl ClientConn {
let client_id = Uuid::new_v4();
ClientConn {
_client_id: client_id,
- _max_subs: 128,
+ subscriptions: HashMap::new(),
+ max_subs: 128,
+ }
+ }
+
+ pub fn subscribe(&mut self, s: Subscription) -> Result<()> {
+ let k = s.get_id();
+ let sub_id_len = k.len();
+ if sub_id_len > MAX_SUBSCRIPTION_ID_LEN {
+ info!("Dropping subscription with huge ({}) length", sub_id_len);
+ return Ok(());
+ }
+ // check if an existing subscription exists, and replace if so
+ if self.subscriptions.contains_key(&k) {
+ self.subscriptions.remove(&k);
+ self.subscriptions.insert(k, s);
+ info!("Replaced existing subscription");
+ return Ok(());
}
+
+ // check if there is room for another subscription.
+ if self.subscriptions.len() >= self.max_subs {
+ info!("Client has reached the maximum number of unique subscriptions");
+ return Ok(());
+ }
+ // add subscription
+ self.subscriptions.insert(k, s);
+ info!(
+ "Registered new subscription, currently have {} active subs",
+ self.subscriptions.len()
+ );
+ return Ok(());
+ }
+
+ pub fn unsubscribe(&mut self, c: Close) {
+ // TODO: return notice if subscription did not exist.
+ self.subscriptions.remove(&c.get_id());
+ info!(
+ "Removed subscription, currently have {} active subs",
+ self.subscriptions.len()
+ );
}
}
diff --git a/src/event.rs b/src/event.rs
@@ -16,7 +16,7 @@ pub struct EventCmd {
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct Event {
- pub(crate) id: String,
+ pub id: String,
pub(crate) pubkey: String,
pub(crate) created_at: u64,
pub(crate) kind: u64,
diff --git a/src/main.rs b/src/main.rs
@@ -1,5 +1,6 @@
use futures::StreamExt;
use log::*;
+use nostr_rs_relay::close::Close;
use nostr_rs_relay::conn;
use nostr_rs_relay::error::{Error, Result};
use nostr_rs_relay::event::Event;
@@ -66,7 +67,7 @@ async fn nostr_server(
//let task_queue = mpsc::channel::<NostrMessage>(16);
// track connection state so we can break when it fails
// Track internal client state
- let _conn = conn::ClientConn::new();
+ let mut conn = conn::ClientConn::new();
let mut conn_good = true;
loop {
tokio::select! {
@@ -77,26 +78,35 @@ async fn nostr_server(
// handle each type of message
let parsed : Result<Event> = Result::<Event>::from(ec);
match parsed {
- Ok(_) => {info!("Successfully parsed/validated event")},
+ Ok(e) => {
+ let id_prefix:String = e.id.chars().take(8).collect();
+ info!("Successfully parsed/validated event: {}", id_prefix)},
Err(_) => {info!("Invalid event ignored")}
}
},
Some(Ok(SubMsg(s))) => {
- info!("Sub-open request from client: {:?}", s);
+ // subscription handling consists of:
+ // adding new subscriptions to the client conn:
+ conn.subscribe(s).ok();
+ // TODO: sending a request for a SQL query
},
- Some(Ok(CloseMsg(c))) => {
- info!("Sub-close request from client: {:?}", c);
+ Some(Ok(CloseMsg(cc))) => {
+ // closing a request simply removes the subscription.
+ let parsed : Result<Close> = Result::<Close>::from(cc);
+ match parsed {
+ Ok(c) => {conn.unsubscribe(c);},
+ Err(_) => {info!("Invalid command ignored");}
+ }
},
None => {
info!("stream ended");
- //conn_good = true;
},
Some(Err(Error::ConnError)) => {
- info!("got connection error, disconnecting");
+ debug!("got connection error, disconnecting");
conn_good = false;
- if conn_good {
- info!("Lint bug?, https://github.com/rust-lang/rust/pull/57302");
- }
+ if conn_good {
+ info!("Lint bug?, https://github.com/rust-lang/rust/pull/57302");
+ }
return
}
Some(Err(e)) => {
@@ -105,7 +115,7 @@ async fn nostr_server(
}
}
}
- if conn_good == false {
+ if !conn_good {
break;
}
}
diff --git a/src/protostream.rs b/src/protostream.rs
@@ -1,4 +1,4 @@
-use crate::close::Close;
+use crate::close::CloseCmd;
use crate::error::{Error, Result};
use crate::event::EventCmd;
use crate::subscription::Subscription;
@@ -20,7 +20,7 @@ use tungstenite::protocol::Message;
pub enum NostrMessage {
EventMsg(EventCmd),
SubMsg(Subscription),
- CloseMsg(Close),
+ CloseMsg(CloseCmd),
}
// Either an event w/ subscription, or a notice