nostr-rs-relay

My dev fork of nostr-rs-relay
git clone git://jb55.com/nostr-rs-relay
Log | Files | Refs | README | LICENSE

commit bbbf8b3e9623c9655b2d76e9b56a8d779d7d20e0
parent 4376f59efcb5b4ab7e2be9bbed13386256683f7b
Author: William Casarin <jb55@jb55.com>
Date:   Tue,  8 Nov 2022 19:57:12 -0800

Command result events

This introduces a new notice-like event with more structure so that clients
can know if an event is sucesfully written to the database. Clients
can't really do much with the current NOTICE messages, but with these
structured result messages, it can know if an event was sucessfully
saved.

Whenever there is an error and we have an id available (event id, sub
id, etc) we return these structured OK events instead.

Example:

When saving the following event:
    ["EVENT",{"id": "event_id" }]

The server will now return events in the following format:
    ["OK", event_id, "true|false", message]

For example, on a successful save:
    ["OK", "event_id", "true"]

If we already have the event:
    ["OK", "event_id", "true", "duplicate"]

If the event is rejected:
    ["OK", "event_id", "false", "you are blocked"]

If a subscription fails:
    ["OK", "sub_id", "false", "Too many subscriptions"]

NIP coming soon!

Diffstat:
Msrc/db.rs | 26+++++++++++++++-----------
Msrc/lib.rs | 1+
Asrc/notice.rs | 48++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/server.rs | 37++++++++++++++++++++++++-------------
4 files changed, 88 insertions(+), 24 deletions(-)

diff --git a/src/db.rs b/src/db.rs @@ -6,6 +6,7 @@ use crate::event::{single_char_tagname, Event}; use crate::hexrange::hex_range; use crate::hexrange::HexSearch; use crate::nip05; +use crate::notice::Notice; use crate::schema::{upgrade_db, STARTUP_SQL}; use crate::subscription::ReqFilter; use crate::subscription::Subscription; @@ -32,7 +33,7 @@ pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnection /// Events submitted from a client, with a return channel for notices pub struct SubmittedEvent { pub event: Event, - pub notice_tx: tokio::sync::mpsc::Sender<String>, + pub notice_tx: tokio::sync::mpsc::Sender<Notice>, } /// Database file @@ -158,7 +159,9 @@ pub async fn db_writer( event.get_event_id_prefix() ); notice_tx - .try_send("pubkey is not allowed to publish to this relay".to_owned()) + .try_send(Notice::message( + "pubkey is not allowed to publish to this relay".to_owned(), + )) .ok(); continue; } @@ -189,10 +192,10 @@ pub async fn db_writer( event.get_author_prefix() ); notice_tx - .try_send( + .try_send(Notice::message( "NIP-05 verification is no longer valid (expired/wrong domain)" .to_owned(), - ) + )) .ok(); continue; } @@ -203,7 +206,9 @@ pub async fn db_writer( event.get_author_prefix() ); notice_tx - .try_send("NIP-05 verification needed to publish events".to_owned()) + .try_send(Notice::message( + "NIP-05 verification needed to publish events".to_owned(), + )) .ok(); continue; } @@ -229,6 +234,7 @@ pub async fn db_writer( Ok(updated) => { if updated == 0 { trace!("ignoring duplicate or deleted event"); + notice_tx.try_send(Notice::duplicate(event.id)).ok(); } else { info!( "persisted event: {:?} from: {:?} in: {:?}", @@ -239,16 +245,14 @@ pub async fn db_writer( event_write = true; // send this out to all clients bcast_tx.send(event.clone()).ok(); + notice_tx.try_send(Notice::saved(event.id)).ok(); } } Err(err) => { warn!("event insert failed: {:?}", err); - notice_tx - .try_send( - "relay experienced an error trying to publish the latest event" - .to_owned(), - ) - .ok(); + let msg = + "relay experienced an error trying to publish the latest event".into(); + notice_tx.try_send(Notice::err_msg(msg, event.id)).ok(); } } } diff --git a/src/lib.rs b/src/lib.rs @@ -8,6 +8,7 @@ pub mod event; pub mod hexrange; pub mod info; pub mod nip05; +pub mod notice; pub mod schema; pub mod subscription; pub mod utils; diff --git a/src/notice.rs b/src/notice.rs @@ -0,0 +1,48 @@ +use crate::error; + +pub enum EventResultStatus { + Saved, + Duplicate, + Error(String), +} + +pub struct EventResult { + pub id: String, + pub status: EventResultStatus, +} + +pub enum Notice { + Message(String), + EventResult(EventResult), +} + +impl Notice { + pub fn err(err: error::Error, id: String) -> Notice { + Notice::err_msg(format!("{}", err), id) + } + + pub fn message(msg: String) -> Notice { + Notice::Message(msg) + } + + pub fn saved(id: String) -> Notice { + Notice::EventResult(EventResult { + id, + status: EventResultStatus::Saved, + }) + } + + pub fn duplicate(id: String) -> Notice { + Notice::EventResult(EventResult { + id, + status: EventResultStatus::Duplicate, + }) + } + + pub fn err_msg(msg: String, id: String) -> Notice { + Notice::EventResult(EventResult { + id, + status: EventResultStatus::Error(msg), + }) + } +} diff --git a/src/server.rs b/src/server.rs @@ -10,6 +10,7 @@ use crate::event::Event; use crate::event::EventCmd; use crate::info::RelayInfo; use crate::nip05; +use crate::notice::{EventResultStatus, Notice}; use crate::subscription::Subscription; use futures::SinkExt; use futures::StreamExt; @@ -405,8 +406,17 @@ fn convert_to_msg(msg: String, max_bytes: Option<usize>) -> Result<NostrMessage> } /// Turn a string into a NOTICE message ready to send over a WebSocket -fn make_notice_message(msg: &str) -> Message { - Message::text(json!(["NOTICE", msg]).to_string()) +fn make_notice_message(notice: Notice) -> Message { + let json = match notice { + Notice::Message(ref msg) => json!(["NOTICE", msg]), + Notice::EventResult(ref res) => match &res.status { + EventResultStatus::Saved => json!(["OK", res.id, "true"]), + EventResultStatus::Duplicate => json!(["OK", res.id, "true", "duplicate"]), + EventResultStatus::Error(msg) => json!(["OK", res.id, "false", msg]), + }, + }; + + Message::text(json.to_string()) } struct ClientInfo { @@ -435,7 +445,7 @@ async fn nostr_server( // we will send out the tx handle to any query we generate. let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(256); // Create channel for receiving NOTICEs - let (notice_tx, mut notice_rx) = mpsc::channel::<String>(32); + let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(32); // last time this client sent data (message, ping, etc.) let mut last_message_time = Instant::now(); @@ -480,7 +490,7 @@ async fn nostr_server( ws_stream.send(Message::Ping(Vec::new())).await.ok(); }, Some(notice_msg) = notice_rx.recv() => { - ws_stream.send(make_notice_message(&notice_msg)).await.ok(); + ws_stream.send(make_notice_message(notice_msg)).await.ok(); }, Some(query_result) = query_rx.recv() => { // database informed us of a query result we asked for @@ -528,7 +538,7 @@ async fn nostr_server( }, Some(Ok(Message::Binary(_))) => { ws_stream.send( - make_notice_message("binary messages are not accepted")).await.ok(); + make_notice_message(Notice::message("binary messages are not accepted".into()))).await.ok(); continue; }, Some(Ok(Message::Ping(_) | Message::Pong(_))) => { @@ -538,8 +548,7 @@ async fn nostr_server( }, Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => { ws_stream.send( - make_notice_message( - &format!("message too large ({} > {})",size, max_size))).await.ok(); + make_notice_message(Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok(); continue; }, None | @@ -581,13 +590,15 @@ async fn nostr_server( } else { info!("client: {} sent a far future-dated event", cid); if let Some(fut_sec) = settings.options.reject_future_seconds { - ws_stream.send(make_notice_message(&format!("The event created_at field is out of the acceptable range (+{}sec) for this relay and was not stored.",fut_sec))).await.ok(); + let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay and was not stored.",fut_sec); + let notice = Notice::err_msg(msg, e.id); + ws_stream.send(make_notice_message(notice)).await.ok(); } } }, Err(_) => { info!("client: {} sent an invalid event", cid); - ws_stream.send(make_notice_message("event was invalid")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("event was invalid".into()))).await.ok(); } } }, @@ -609,7 +620,7 @@ async fn nostr_server( }, Err(e) => { info!("Subscription error: {}", e); - ws_stream.send(make_notice_message(&e.to_string())).await.ok(); + ws_stream.send(make_notice_message(Notice::err(e, s.id))).await.ok(); } } }, @@ -628,7 +639,7 @@ async fn nostr_server( conn.unsubscribe(&c); } else { info!("invalid command ignored"); - ws_stream.send(make_notice_message("could not parse command")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok(); } }, Err(Error::ConnError) => { @@ -637,11 +648,11 @@ async fn nostr_server( } Err(Error::EventMaxLengthError(s)) => { info!("client: {} sent event larger ({} bytes) than max size", cid, s); - ws_stream.send(make_notice_message("event exceeded max size")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("event exceeded max size".into()))).await.ok(); }, Err(Error::ProtoParseError) => { info!("client {} sent event that could not be parsed", cid); - ws_stream.send(make_notice_message("could not parse command")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok(); }, Err(e) => { info!("got non-fatal error from client: {}, error: {:?}", cid, e);