commit 20a570efcaaf3a868e450181846773fbd9af1c8c
parent 39099b8a3873adc1556a7238d3c2fa1efce7bd9c
Author: Thomas <31560900+0xtlt@users.noreply.github.com>
Date: Sun, 6 Nov 2022 12:46:02 +0100
Merge pull request #2 from 0xtlt/0xtlt/issue1
🐛 Multi relays support
Diffstat:
4 files changed, 103 insertions(+), 49 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
@@ -1,4 +1,12 @@
-# 0.1.0 - NIP-01 Support
+# Changelog
+
+## 0.2.0 - Architecture change
+
+- Removed: `Client.listen` function (Replaced by `Client.next_data`)
+- Added: `Client.next_data` function
+- Added: `Client.remove_relay` function
+
+## 0.1.0 - NIP-01 Support
- Added: `Client` structure
- Added: `Client::new` function
diff --git a/Cargo.toml b/Cargo.toml
@@ -8,7 +8,7 @@ keywords = ["nostr", "rust", "protocol", "encryption", "decryption"]
categories = ["api-bindings"]
license = "MIT"
authors = ["Thomas Tastet"]
-version = "0.1.0"
+version = "0.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
diff --git a/README.md b/README.md
@@ -16,7 +16,7 @@ This example uses [Tungstenite](https://crates.io/crates/tungstenite) for event
```toml
[dependencies]
-nostr_rust = "0.1"
+nostr_rust = "0.2"
tungstenite = "0.17"
```
@@ -32,8 +32,8 @@ use tungstenite::Message;
use nostr_rust::{nostr_client::Client, req::ReqFilter, Identity};
-fn handle_message(relay_url: String, message: Message) -> Result<(), String> {
- println!("Received message from {}: {:?}", { relay_url }, message);
+fn handle_message(relay_url: &String, message: &Message) -> Result<(), String> {
+ println!("Received message from {}: {:?}", relay_url, message);
Ok(())
}
@@ -51,7 +51,11 @@ fn main() {
let nostr_clone = nostr_client.clone();
let handle_thread = thread::spawn(move || {
println!("Listening...");
- nostr_clone.lock().unwrap().listen(handle_message).unwrap();
+ let events = nostr_clone.lock().unwrap().next_data().unwrap();
+
+ for (relay_url, message) in events.iter() {
+ handle_message(relay_url, message).unwrap();
+ }
});
// Change metadata
diff --git a/src/nostr_client.rs b/src/nostr_client.rs
@@ -1,15 +1,15 @@
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
use crate::events::Event;
use crate::req::{Req, ReqFilter};
use crate::websocket::SimplifiedWS;
use serde_json::json;
use tungstenite::Message;
-/// Relay Type contains the relay address and the websocket connection
-pub type Relay = (String, SimplifiedWS);
-
/// Nostr Client
pub struct Client {
- pub relays: Vec<Relay>,
+ pub relays: HashMap<String, Arc<Mutex<SimplifiedWS>>>,
}
impl Client {
@@ -21,7 +21,9 @@ impl Client {
/// let client = Client::new(vec!["wss://nostr-pub.wellorder.net"]).unwrap();
/// ```
pub fn new(default_relays: Vec<&str>) -> Result<Self, String> {
- let mut client = Self { relays: vec![] };
+ let mut client = Self {
+ relays: HashMap::new(),
+ };
for relay in default_relays {
client.add_relay(relay)?;
@@ -37,7 +39,7 @@ impl Client {
/// ```rust
/// use nostr_rust::nostr_client::Client;
/// let mut client = Client::new(vec!["wss://nostr-pub.wellorder.net"]).unwrap();
- /// client.add_relay("wss://nostr-pub.wellorder.net").unwrap();
+ /// client.add_relay("wss://relay.damus.io").unwrap();
/// ```
pub fn add_relay(&mut self, relay: &str) -> Result<(), String> {
let client = match SimplifiedWS::new(relay) {
@@ -45,7 +47,42 @@ impl Client {
Err(err) => return Err(format!("Error connecting to relay: {}", err)),
};
- self.relays.push((relay.to_string(), client));
+ // Check if relay is already added
+ if self.relays.contains_key(relay) {
+ return Err(format!("Relay {} already added", relay));
+ }
+
+ self.relays
+ .insert(relay.to_string(), Arc::new(Mutex::new(client)));
+
+ Ok(())
+ }
+
+ /// Remove a relay from the client
+ /// # Example
+ /// ```rust
+ /// use nostr_rust::nostr_client::Client;
+ /// let mut client = Client::new(vec!["wss://nostr-pub.wellorder.net"]).unwrap();
+ /// client.remove_relay("wss://nostr-pub.wellorder.net").unwrap();
+ /// ```
+ pub fn remove_relay(&mut self, relay: &str) -> Result<(), String> {
+ println!("Removing relay {}", relay);
+ if !self.relays.contains_key(relay) {
+ println!("Relay {} not found", relay);
+ return Err(format!("Relay {} not found", relay));
+ }
+
+ println!("Removing relay {}", relay);
+
+ // Close the connection
+ self.relays
+ .remove(relay)
+ .unwrap()
+ .lock()
+ .unwrap()
+ .socket
+ .close(None)
+ .unwrap();
Ok(())
}
@@ -54,13 +91,16 @@ impl Client {
pub fn publish_event(&mut self, event: &Event) -> Result<(), String> {
let json_stringified = json!(["EVENT", event]).to_string();
let message = Message::text(json_stringified);
- match self.relays[0].1.send_message(&message) {
- Ok(_) => Ok(()),
- Err(_) => Err("Unable to send message".to_string()),
+
+ for relay in self.relays.values() {
+ let mut relay = relay.lock().unwrap();
+ relay.send_message(&message)?;
}
+
+ Ok(())
}
- /// Listen for data from the relays
+ /// Get next data from the relays
/// # Example
/// ```rust
/// use std::{
@@ -70,7 +110,7 @@ impl Client {
/// use tungstenite::Message;
/// use nostr_rust::{nostr_client::Client, req::ReqFilter};
///
- /// fn handle_message(relay_url: String, message: Message) -> Result<(), String> {
+ /// fn handle_message(relay_url: &String, message: &Message) -> Result<(), String> {
/// println!("Received message: {:?}", message);
///
/// Ok(())
@@ -80,9 +120,12 @@ impl Client {
///
/// // Run a new thread to listen
/// let nostr_clone = client.clone();
- /// let nostr_thread = thread::spawn(move || {
- /// println!("Listening...");
- /// nostr_clone.lock().unwrap().listen(handle_message).unwrap();
+ /// let nostr_thread = thread::spawn(move || loop {
+ /// let events = nostr_clone.lock().unwrap().next_data().unwrap();
+ ///
+ /// for (relay_url, message) in events.iter() {
+ /// handle_message(relay_url, message).unwrap();
+ /// }
/// });
///
/// // Subscribe to the most beautiful Nostr profile event
@@ -103,28 +146,18 @@ impl Client {
/// }])
/// .unwrap();
///
- /// // Wait 2s for the thread to finish
- /// std::thread::sleep(std::time::Duration::from_secs(2));
+ /// // Wait 3s for the thread to finish
+ /// std::thread::sleep(std::time::Duration::from_secs(3));
/// ```
- pub fn listen<F, E>(&mut self, callback: F) -> Result<(), E>
- where
- F: Fn(String, Message) -> Result<(), E>,
- {
- for relay in &mut self.relays {
- let client = &mut relay.1;
- println!("Listening for messages from relay {}", relay.0);
- loop {
- match client.read_message() {
- Ok(message) => callback(relay.0.clone(), message),
- Err(err) => {
- println!("Error reading message: {}", err);
- continue;
- }
- }?;
- }
+ pub fn next_data(&mut self) -> Result<Vec<(String, tungstenite::Message)>, String> {
+ let mut events: Vec<(String, tungstenite::Message)> = Vec::new();
+
+ for (relay_name, socket) in self.relays.iter() {
+ let message = socket.lock().unwrap().read_message()?;
+ events.push((relay_name.clone(), message));
}
- Ok(())
+ Ok(events)
}
/// Subscribe
@@ -150,10 +183,13 @@ impl Client {
pub fn subscribe(&mut self, filters: Vec<ReqFilter>) -> Result<String, String> {
let req = Req::new(None, filters);
let message = Message::text(req.to_string());
- match self.relays[0].1.send_message(&message) {
- Ok(_) => Ok(req.subscription_id),
- Err(_) => Err("Unable to send message".to_string()),
+
+ for relay in self.relays.values() {
+ let mut relay = relay.lock().unwrap();
+ relay.send_message(&message)?;
}
+
+ Ok(req.subscription_id)
}
/// Subscribe with a specific ID
@@ -184,10 +220,13 @@ impl Client {
) -> Result<(), String> {
let req = Req::new(Some(subscription_id), filters);
let message = Message::text(req.to_string());
- match self.relays[0].1.send_message(&message) {
- Ok(_) => Ok(()),
- Err(_) => Err("Unable to send message".to_string()),
+
+ for relay in self.relays.values() {
+ let mut relay = relay.lock().unwrap();
+ relay.send_message(&message)?;
}
+
+ Ok(())
}
/// Unsubscribe
@@ -213,9 +252,12 @@ impl Client {
/// ```
pub fn unsubscribe(&mut self, subscription_id: &str) -> Result<(), String> {
let message = Message::text(json!(["CLOSE", subscription_id]).to_string());
- match self.relays[0].1.send_message(&message) {
- Ok(_) => Ok(()),
- Err(_) => Err("Unable to send message".to_string()),
+
+ for relay in self.relays.values() {
+ let mut relay = relay.lock().unwrap();
+ relay.send_message(&message)?;
}
+
+ Ok(())
}
}