notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit 74ce87049d50239fe5e228bf8afde8752cc01678
parent 499f10ce3947a34f851c3e179ce82bec2adb3a6f
Author: William Casarin <jb55@jb55.com>
Date:   Wed,  7 Feb 2024 15:18:23 -0800

local nostrdb subscriptions working

Signed-off-by: William Casarin <jb55@jb55.com>

Diffstat:
Menostr/src/filter.rs | 16++++++++--------
Menostr/src/relay/message.rs | 158+++++++++++++++++++++++++++++++++++++------------------------------------------
Menostr/src/relay/pool.rs | 47+++++++++++++++++++++++------------------------
Msrc/app.rs | 52++++++++++++++++++++++++++++++++++++++++------------
Msrc/error.rs | 7+++++++
Msrc/filter.rs | 55+++++++++++++++++++++++++++----------------------------
Msrc/lib.rs | 1+
Asrc/timeline.rs | 23+++++++++++++++++++++++
8 files changed, 203 insertions(+), 156 deletions(-)

diff --git a/enostr/src/filter.rs b/enostr/src/filter.rs @@ -4,23 +4,23 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)] pub struct Filter { #[serde(skip_serializing_if = "Option::is_none")] - ids: Option<Vec<EventId>>, + pub ids: Option<Vec<EventId>>, #[serde(skip_serializing_if = "Option::is_none")] - authors: Option<Vec<Pubkey>>, + pub authors: Option<Vec<Pubkey>>, #[serde(skip_serializing_if = "Option::is_none")] - kinds: Option<Vec<u64>>, + pub kinds: Option<Vec<u64>>, #[serde(rename = "#e")] #[serde(skip_serializing_if = "Option::is_none")] - events: Option<Vec<EventId>>, + pub events: Option<Vec<EventId>>, #[serde(rename = "#p")] #[serde(skip_serializing_if = "Option::is_none")] - pubkeys: Option<Vec<Pubkey>>, + pub pubkeys: Option<Vec<Pubkey>>, #[serde(skip_serializing_if = "Option::is_none")] - since: Option<u64>, // unix timestamp seconds + pub since: Option<u64>, // unix timestamp seconds #[serde(skip_serializing_if = "Option::is_none")] - until: Option<u64>, // unix timestamp seconds + pub until: Option<u64>, // unix timestamp seconds #[serde(skip_serializing_if = "Option::is_none")] - limit: Option<u16>, + pub limit: Option<u16>, } impl Filter { diff --git a/enostr/src/relay/message.rs b/enostr/src/relay/message.rs @@ -1,68 +1,71 @@ use crate::Error; use crate::Event; use crate::Result; -use log::debug; use serde_json::Value; +use tracing::{error, info}; use ewebsock::{WsEvent, WsMessage}; #[derive(Debug, Eq, PartialEq)] -pub struct CommandResult { - event_id: String, +pub struct CommandResult<'a> { + event_id: &'a str, status: bool, - message: String, + message: &'a str, } #[derive(Debug, Eq, PartialEq)] -pub enum RelayMessage { - OK(CommandResult), - Eose(String), - Event(String, Event), - Notice(String), +pub enum RelayMessage<'a> { + OK(CommandResult<'a>), + Eose(&'a str), + Event(&'a str, &'a str), + Notice(&'a str), } #[derive(Debug)] -pub enum RelayEvent { +pub enum RelayEvent<'a> { Opened, Closed, - Other(WsMessage), - Message(RelayMessage), + Other(&'a WsMessage), + Error(Error), + Message(RelayMessage<'a>), } -impl TryFrom<WsEvent> for RelayEvent { - type Error = Error; - - fn try_from(message: WsEvent) -> Result<Self> { - match message { - WsEvent::Opened => Ok(RelayEvent::Opened), - WsEvent::Closed => Ok(RelayEvent::Closed), - WsEvent::Message(ws_msg) => ws_msg.try_into(), - WsEvent::Error(s) => Err(s.into()), +impl<'a> From<&'a WsEvent> for RelayEvent<'a> { + fn from(event: &'a WsEvent) -> RelayEvent<'a> { + match event { + WsEvent::Opened => RelayEvent::Opened, + WsEvent::Closed => RelayEvent::Closed, + WsEvent::Message(ref ws_msg) => ws_msg.into(), + WsEvent::Error(s) => RelayEvent::Error(Error::Generic(s.to_owned())), } } } -impl TryFrom<WsMessage> for RelayEvent { - type Error = Error; - - fn try_from(wsmsg: WsMessage) -> Result<Self> { +impl<'a> From<&'a WsMessage> for RelayEvent<'a> { + fn from(wsmsg: &'a WsMessage) -> RelayEvent<'a> { match wsmsg { - WsMessage::Text(s) => RelayMessage::from_json(&s).map(RelayEvent::Message), - wsmsg => Ok(RelayEvent::Other(wsmsg)), + WsMessage::Text(ref s) => match RelayMessage::from_json(&s).map(RelayEvent::Message) { + Ok(msg) => msg, + Err(err) => RelayEvent::Error(err), + }, + wsmsg => { + error!("got {:?} instead of WsMessage::Text", wsmsg); + RelayEvent::Error(Error::DecodeFailed) + } } } } -impl RelayMessage { - pub fn eose(subid: String) -> Self { +impl<'a> RelayMessage<'a> { + pub fn eose(subid: &'a str) -> Self { RelayMessage::Eose(subid) } - pub fn notice(msg: String) -> Self { + pub fn notice(msg: &'a str) -> Self { RelayMessage::Notice(msg) } - pub fn ok(event_id: String, status: bool, message: String) -> Self { + pub fn ok(event_id: &'a str, status: bool, message: &'a str) -> Self { RelayMessage::OK(CommandResult { event_id: event_id, status, @@ -70,74 +73,61 @@ impl RelayMessage { }) } - pub fn event(ev: Event, sub_id: String) -> Self { + pub fn event(ev: &'a str, sub_id: &'a str) -> Self { RelayMessage::Event(sub_id, ev) } - // I was lazy and took this from the nostr crate. thx yuki! - pub fn from_json(msg: &str) -> Result<Self> { + pub fn from_json(msg: &'a str) -> Result<RelayMessage<'a>> { if msg.is_empty() { return Err(Error::Empty); } - let v: Vec<Value> = serde_json::from_str(msg).map_err(|_| Error::DecodeFailed)?; - // Notice // Relay response format: ["NOTICE", <message>] - if v[0] == "NOTICE" { - if v.len() != 2 { - return Err(Error::DecodeFailed); - } - let v_notice: String = - serde_json::from_value(v[1].clone()).map_err(|_| Error::DecodeFailed)?; - return Ok(Self::notice(v_notice)); + if &msg[0..=9] == "[\"NOTICE\"," { + // TODO: there could be more than one space, whatever + let start = if msg.bytes().nth(10) == Some(b' ') { + 12 + } else { + 11 + }; + let end = msg.len() - 2; + return Ok(Self::notice(&msg[start..end])); } // Event // Relay response format: ["EVENT", <subscription id>, <event JSON>] - if v[0] == "EVENT" { - if v.len() != 3 { - return Err(Error::DecodeFailed); - } - - let event = Event::from_json(&v[2].to_string()).map_err(|_| Error::DecodeFailed)?; - - let subscription_id: String = - serde_json::from_value(v[1].clone()).map_err(|_| Error::DecodeFailed)?; - - return Ok(Self::event(event, subscription_id)); + if &msg[0..=7] == "[\"EVENT\"" { + return Ok(Self::event(msg, "fixme")); } // EOSE (NIP-15) // Relay response format: ["EOSE", <subscription_id>] - if v[0] == "EOSE" { - if v.len() != 2 { - return Err(Error::DecodeFailed); - } - - let subscription_id: String = - serde_json::from_value(v[1].clone()).map_err(|_| Error::DecodeFailed)?; - - return Ok(Self::eose(subscription_id)); + if &msg[0..=7] == "[\"EOSE\"," { + let start = if msg.bytes().nth(8) == Some(b' ') { + 10 + } else { + 9 + }; + let end = msg.len() - 2; + return Ok(Self::eose(&msg[start..end])); } // OK (NIP-20) - // Relay response format: ["OK", <event_id>, <true|false>, <message>] - if v[0] == "OK" { - if v.len() != 4 { + // Relay response format: ["OK",<event_id>, <true|false>, <message>] + if &msg[0..=4] == "[\"OK\"," { + // TODO: fix this + let event_id = &msg[7..71]; + let booly = &msg[73..77]; + let status: bool = if booly == "true" { + true + } else if booly == "false" { + false + } else { return Err(Error::DecodeFailed); - } - - let event_id: String = - serde_json::from_value(v[1].clone()).map_err(|_| Error::DecodeFailed)?; - - let status: bool = - serde_json::from_value(v[2].clone()).map_err(|_| Error::DecodeFailed)?; - - let message: String = - serde_json::from_value(v[3].clone()).map_err(|_| Error::DecodeFailed)?; + }; - return Ok(Self::ok(event_id, status, message)); + return Ok(Self::ok(event_id, status, "fixme")); } Err(Error::DecodeFailed) @@ -209,9 +199,9 @@ mod tests { #[test] fn test_handle_invalid_event() { //Mising Event field - let invalid_event_msg = r#"["EVENT", "random_string"]"#; + let invalid_event_msg = r#"["EVENT","random_string"]"#; //Event JSON with incomplete content - let invalid_event_msg_content = r#"["EVENT", "random_string", {"id":"70b10f70c1318967eddf12527799411b1a9780ad9c43858f5e5fcd45486a13a5","pubkey":"379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe"}]"#; + let invalid_event_msg_content = r#"["EVENT","random_string",{"id":"70b10f70c1318967eddf12527799411b1a9780ad9c43858f5e5fcd45486a13a5","pubkey":"379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe"}]"#; assert_eq!( RelayMessage::from_json(invalid_event_msg).unwrap_err(), @@ -246,14 +236,14 @@ mod tests { // The subscription ID is not string assert_eq!( - RelayMessage::from_json(r#"["EOSE", 404]"#).unwrap_err(), + RelayMessage::from_json(r#"["EOSE",404]"#).unwrap_err(), Error::DecodeFailed ); } #[test] fn test_handle_valid_ok() -> Result<()> { - let valid_ok_msg = r#"["OK", "b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30", true, "pow: difficulty 25>=24"]"#; + let valid_ok_msg = r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30",true,"pow: difficulty 25>=24"]"#; let handled_valid_ok_msg = RelayMessage::ok( "b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30".to_string(), true, @@ -269,7 +259,7 @@ mod tests { // Missing params assert_eq!( RelayMessage::from_json( - r#"["OK", "b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30"]"# + r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30"]"# ) .unwrap_err(), Error::DecodeFailed @@ -277,13 +267,13 @@ mod tests { // Invalid status assert_eq!( - RelayMessage::from_json(r#"["OK", "b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30", hello, ""]"#).unwrap_err(), + RelayMessage::from_json(r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30",hello,""]"#).unwrap_err(), Error::DecodeFailed ); // Invalid message assert_eq!( - RelayMessage::from_json(r#"["OK", "b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30", hello, 404]"#).unwrap_err(), + RelayMessage::from_json(r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30",hello,404]"#).unwrap_err(), Error::DecodeFailed ); } diff --git a/enostr/src/relay/pool.rs b/enostr/src/relay/pool.rs @@ -1,21 +1,19 @@ -use crate::relay::message::RelayEvent; +use crate::relay::message::{RelayEvent, RelayMessage}; use crate::relay::{Relay, RelayStatus}; use crate::{ClientMessage, Result}; use std::time::{Duration, Instant}; #[cfg(not(target_arch = "wasm32"))] -use ewebsock::WsMessage; +use ewebsock::{WsEvent, WsMessage}; #[cfg(not(target_arch = "wasm32"))] -use tracing::debug; - -use tracing::error; +use tracing::{debug, error, info}; #[derive(Debug)] pub struct PoolEvent<'a> { pub relay: &'a str, - pub event: RelayEvent, + pub event: ewebsock::WsEvent, } pub struct PoolRelay { @@ -145,37 +143,38 @@ impl RelayPool { /// function searches each relay in the list in order, attempting to /// receive a message from each. If a message is received, return it. /// If no message is received from any relays, None is returned. - pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> { + pub fn try_recv<'a>(&'a mut self) -> Option<PoolEvent<'a>> { for relay in &mut self.relays { let relay = &mut relay.relay; - if let Some(msg) = relay.receiver.try_recv() { - match msg.try_into() { - Ok(event) => { + if let Some(event) = relay.receiver.try_recv() { + match &event { + WsEvent::Opened => { relay.status = RelayStatus::Connected; - + } + WsEvent::Closed => { + relay.status = RelayStatus::Disconnected; + } + WsEvent::Error(err) => { + error!("{:?}", err); + relay.status = RelayStatus::Disconnected; + } + WsEvent::Message(ev) => { // let's just handle pongs here. // We only need to do this natively. #[cfg(not(target_arch = "wasm32"))] - match event { - RelayEvent::Other(WsMessage::Ping(ref bs)) => { + match &ev { + WsMessage::Ping(ref bs) => { debug!("pong {}", &relay.url); relay.sender.send(WsMessage::Pong(bs.to_owned())); } _ => {} } - - return Some(PoolEvent { - event, - relay: &relay.url, - }); - } - - Err(e) => { - relay.status = RelayStatus::Disconnected; - error!("try_recv {:?}", e); - continue; } } + return Some(PoolEvent { + event, + relay: &relay.url, + }); } } diff --git a/src/app.rs b/src/app.rs @@ -10,6 +10,7 @@ use egui::widgets::Spinner; use egui::{Context, Frame, ImageSource, Margin, TextureHandle, TextureId}; use egui_extras::Size; use enostr::{ClientMessage, EventId, Filter, Profile, Pubkey, RelayEvent, RelayMessage}; +use nostrdb::{Config, Ndb, Subscription}; use poll_promise::Promise; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; @@ -48,25 +49,31 @@ pub struct Damus { compose: String, pool: RelayPool, + home_sub: Option<Subscription>, all_events: HashMap<EventId, Event>, events: Vec<EventId>, img_cache: ImageCache, + ndb: Ndb, frame_history: crate::frame_history::FrameHistory, } impl Default for Damus { fn default() -> Self { + let mut config = Config::new(); + config.set_ingester_threads(2); Self { state: DamusState::Initializing, contacts: Contacts::new(), all_events: HashMap::new(), pool: RelayPool::new(), + home_sub: None, events: vec![], img_cache: HashMap::new(), n_panels: 1, + ndb: Ndb::new(".", &config).expect("ndb"), compose: "".to_string(), frame_history: FrameHistory::default(), } @@ -92,14 +99,19 @@ fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) { } } -fn send_initial_filters(pool: &mut RelayPool, relay_url: &str) { - let filter = Filter::new().limit(100).kinds(vec![1, 42]).pubkeys( +fn get_home_filter() -> Filter { + Filter::new().limit(100).kinds(vec![1, 42]).pubkeys( [ Pubkey::from_hex("32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245") .unwrap(), ] .into(), - ); + ) +} + +fn send_initial_filters(pool: &mut RelayPool, relay_url: &str) { + let filter = get_home_filter(); + info!("Sending initial filters to {}", relay_url); let subid = "initial"; for relay in &mut pool.relays { @@ -129,15 +141,23 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) { while let Some(ev) = damus.pool.try_recv() { let relay = ev.relay.to_owned(); - match ev.event { + match (&ev.event).into() { RelayEvent::Opened => send_initial_filters(&mut damus.pool, &relay), // TODO: handle reconnects RelayEvent::Closed => warn!("{} connection closed", &relay), + RelayEvent::Error(e) => error!("{}", e), RelayEvent::Other(msg) => debug!("other event {:?}", &msg), - RelayEvent::Message(msg) => process_message(damus, &relay, msg), + RelayEvent::Message(msg) => process_message(damus, &relay, &msg), + } + } + + // do we have any new processed events? + if let Some(ref sub) = damus.home_sub { + let new_notes = damus.ndb.poll_for_notes(sub, 50); + if new_notes.len() > 0 { + info!("{} new notes! {:?}", new_notes.len(), new_notes); } } - //info!("recv {:?}", ev) } #[cfg(feature = "profiling")] @@ -145,6 +165,12 @@ fn setup_profiling() { puffin::set_scopes_on(true); // tell puffin to collect data } +fn setup_initial_nostrdb_subs(damus: &mut Damus) -> Result<()> { + let filter: nostrdb::Filter = crate::filter::convert_enostr_filter(&get_home_filter()); + damus.home_sub = Some(damus.ndb.subscribe(filter)?); + Ok(()) +} + fn update_damus(damus: &mut Damus, ctx: &egui::Context) { if damus.state == DamusState::Initializing { #[cfg(feature = "profiling")] @@ -154,6 +180,7 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) { damus.pool = RelayPool::new(); relay_setup(&mut damus.pool, ctx); damus.state = DamusState::Initialized; + setup_initial_nostrdb_subs(damus).expect("home subscription failed"); } try_process_event(damus, ctx); @@ -196,15 +223,15 @@ fn process_metadata_event(damus: &mut Damus, ev: &Event) { } } -fn process_event(damus: &mut Damus, _subid: &str, event: Event) { +fn process_event(damus: &mut Damus, _subid: &str, event: &str) { #[cfg(feature = "profiling")] puffin::profile_function!(); - if damus.all_events.get(&event.id).is_some() { - return; - } + //info!("processing event {}", event); + damus.ndb.process_event(&event); - let kind = event.kind; + /* + let kind = event.kind(); if kind == 0 { process_metadata_event(damus, &event); } else if kind == 1 { @@ -212,6 +239,7 @@ fn process_event(damus: &mut Damus, _subid: &str, event: Event) { damus.all_events.insert(cloned_id.clone(), event); damus.events.insert(0, cloned_id); } + */ } fn get_unknown_author_ids(damus: &Damus) -> Vec<Pubkey> { @@ -247,7 +275,7 @@ fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) { } } -fn process_message(damus: &mut Damus, relay: &str, msg: RelayMessage) { +fn process_message(damus: &mut Damus, relay: &str, msg: &RelayMessage) { match msg { RelayMessage::Event(subid, ev) => process_event(damus, &subid, ev), RelayMessage::Notice(msg) => warn!("Notice from {}: {}", relay, msg), diff --git a/src/error.rs b/src/error.rs @@ -3,6 +3,7 @@ use shatter::parser; #[derive(Debug)] pub enum Error { Nostr(enostr::Error), + Ndb(nostrdb::Error), Shatter(parser::Error), Image(image::error::ImageError), Generic(String), @@ -20,6 +21,12 @@ impl From<parser::Error> for Error { } } +impl From<nostrdb::Error> for Error { + fn from(e: nostrdb::Error) -> Self { + Error::Ndb(e) + } +} + impl From<image::error::ImageError> for Error { fn from(err: image::error::ImageError) -> Self { Error::Image(err) diff --git a/src/filter.rs b/src/filter.rs @@ -1,37 +1,36 @@ -impl From<enostr::Filter> for nostrdb::Filter {} - fn from(filter: enostr::Filter) -> Self { - let mut nfilter = nostrdb::Filter::new(); +pub fn convert_enostr_filter(filter: &enostr::Filter) -> nostrdb::Filter { + let mut nfilter = nostrdb::Filter::new(); - if let Some(ids) = filter.ids { - nfilter.ids(ids) - } - - if let Some(authors) = filter.authors { - nfilter.authors(authors) - } + if let Some(ref ids) = filter.ids { + nfilter.ids(ids.iter().map(|a| *a.bytes()).collect()); + } - if let Some(kinds) = filter.kinds { - nfilter.kinds(kinds) - } + if let Some(ref authors) = filter.authors { + let authors: Vec<[u8; 32]> = authors.iter().map(|a| a.bytes()).collect(); + nfilter.authors(authors); + } - // #e - if let Some(events) = filter.events { - nfilter.tags(events, 'e') - } + if let Some(ref kinds) = filter.kinds { + nfilter.kinds(kinds.clone()); + } - // #p - if let Some(pubkeys) = filter.pubkeys { - nfilter.pubkeys(pubkeys) - } + // #e + if let Some(ref events) = filter.events { + nfilter.events(events.iter().map(|a| *a.bytes()).collect()); + } - if let Some(since) = filter.since { - nfilter.since(since) - } + // #p + if let Some(ref pubkeys) = filter.pubkeys { + nfilter.pubkeys(pubkeys.iter().map(|a| a.bytes()).collect()); + } - if let Some(limit) = filter.limit { - nfilter.limit(limit) - } + if let Some(since) = filter.since { + nfilter.since(since); + } - nfilter + if let Some(limit) = filter.limit { + nfilter.limit(limit.into()); } + + nfilter } diff --git a/src/lib.rs b/src/lib.rs @@ -8,6 +8,7 @@ mod abbrev; mod fonts; mod images; mod result; +mod filter; mod ui; mod frame_history; diff --git a/src/timeline.rs b/src/timeline.rs @@ -0,0 +1,23 @@ +pub fn binary_search<T: Ord>(a: &[T], item: &T) -> usize { + let mut low = 0; + let mut high = a.len(); + + while low < high { + let mid = low + (high - low) / 2; + if item <= &a[mid] { + high = mid; + } else { + low = mid + 1; + } + } + + low +} + +pub fn binary_insertion_sort<T: Ord>(vec: &mut Vec<T>) { + for i in 1..vec.len() { + let val = vec.remove(i); + let pos = binary_search(&vec[0..i], &val); + vec.insert(pos, val); + } +}