lib.rs (10917B)
1 use crate::channels::Channel; 2 use crate::channels::Channels; 3 use crate::channels::ListPeerChannel; 4 use crate::event::ClnResponse; 5 use crate::event::ConnectionState; 6 use crate::event::Event; 7 use crate::event::LoadingState; 8 use crate::event::Request; 9 use crate::invoice::Invoice; 10 use crate::summary::Summary; 11 use crate::watch::fetch_paid_invoices; 12 13 use lnsocket::bitcoin::secp256k1::{PublicKey, SecretKey, rand}; 14 use lnsocket::{CommandoClient, LNSocket}; 15 use nostrdb::Ndb; 16 use notedeck::AppContext; 17 use notedeck::AppResponse; 18 use serde_json::json; 19 use std::collections::HashMap; 20 use std::str::FromStr; 21 use std::sync::Arc; 22 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; 23 24 mod channels; 25 mod event; 26 mod invoice; 27 mod summary; 28 mod ui; 29 mod watch; 30 31 #[derive(Default)] 32 pub struct ClnDash { 33 initialized: bool, 34 connection_state: ConnectionState, 35 summary: LoadingState<Summary, lnsocket::Error>, 36 get_info: LoadingState<String, lnsocket::Error>, 37 channels: LoadingState<Channels, lnsocket::Error>, 38 invoices: LoadingState<Vec<Invoice>, lnsocket::Error>, 39 channel: Option<CommChannel>, 40 last_summary: Option<Summary>, 41 // invoice label to zapreq id 42 invoice_zap_reqs: HashMap<String, [u8; 32]>, 43 } 44 45 #[derive(serde::Deserialize)] 46 pub struct ZapReqId { 47 #[serde(with = "hex::serde")] 48 id: [u8; 32], 49 } 50 51 impl Default for ConnectionState { 52 fn default() -> Self { 53 ConnectionState::Dead("uninitialized".to_string()) 54 } 55 } 56 57 struct CommChannel { 58 req_tx: UnboundedSender<Request>, 59 event_rx: UnboundedReceiver<Event>, 60 } 61 62 impl notedeck::App for ClnDash { 63 fn update(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse { 64 if !self.initialized { 65 self.connection_state = ConnectionState::Connecting; 66 67 self.setup_connection(); 68 self.initialized = true; 69 } 70 71 self.process_events(ctx.ndb); 72 73 self.show(ui, ctx); 74 75 AppResponse::none() 76 } 77 } 78 79 impl ClnDash { 80 fn show(&mut self, ui: &mut egui::Ui, ctx: &mut AppContext) { 81 egui::Frame::new() 82 .inner_margin(egui::Margin::same(20)) 83 .show(ui, |ui| { 84 egui::ScrollArea::vertical().show(ui, |ui| { 85 ui::connection_state_ui(ui, &self.connection_state); 86 crate::summary::summary_ui(ui, self.last_summary.as_ref(), &self.summary); 87 crate::invoice::invoices_ui(ui, &self.invoice_zap_reqs, ctx, &self.invoices); 88 crate::channels::channels_ui(ui, &self.channels); 89 crate::ui::get_info_ui(ui, &self.get_info); 90 }); 91 }); 92 } 93 94 fn setup_connection(&mut self) { 95 let (req_tx, mut req_rx) = unbounded_channel::<Request>(); 96 let (event_tx, event_rx) = unbounded_channel::<Event>(); 97 self.channel = Some(CommChannel { req_tx, event_rx }); 98 99 tokio::spawn(async move { 100 let key = SecretKey::new(&mut rand::thread_rng()); 101 let their_pubkey = PublicKey::from_str(&std::env::var("CLNDASH_ID").unwrap_or( 102 "03f3c108ccd536b8526841f0a5c58212bb9e6584a1eb493080e7c1cc34f82dad71".to_string(), 103 )) 104 .unwrap(); 105 106 let host = std::env::var("CLNDASH_HOST").unwrap_or("ln.damus.io:9735".to_string()); 107 let lnsocket = match LNSocket::connect_and_init(key, their_pubkey, &host).await { 108 Err(err) => { 109 let _ = event_tx.send(Event::Ended { 110 reason: err.to_string(), 111 }); 112 return; 113 } 114 115 Ok(lnsocket) => { 116 let _ = event_tx.send(Event::Connected); 117 lnsocket 118 } 119 }; 120 121 let rune = std::env::var("CLNDASH_RUNE").unwrap_or( 122 "Vns1Zxvidr4J8pP2ZCg3Wjp2SyGyyf5RHgvFG8L36yM9MzMmbWV0aG9kPWdldGluZm8=".to_string(), 123 ); 124 let commando = Arc::new(CommandoClient::spawn(lnsocket, &rune)); 125 126 loop { 127 match req_rx.recv().await { 128 None => { 129 let _ = event_tx.send(Event::Ended { 130 reason: "channel dead?".to_string(), 131 }); 132 break; 133 } 134 135 Some(req) => { 136 tracing::debug!("calling {req:?}"); 137 match req { 138 Request::GetInfo => { 139 let event_tx = event_tx.clone(); 140 let commando = commando.clone(); 141 tokio::spawn(async move { 142 match commando.call("getinfo", json!({})).await { 143 Ok(v) => { 144 let _ = event_tx 145 .send(Event::Response(ClnResponse::GetInfo(v))); 146 } 147 Err(err) => { 148 tracing::error!("get_info error {}", err); 149 } 150 } 151 }); 152 } 153 154 Request::PaidInvoices(n) => { 155 let event_tx = event_tx.clone(); 156 let commando = commando.clone(); 157 tokio::spawn(async move { 158 let invoices = fetch_paid_invoices(commando, n).await; 159 let _ = event_tx 160 .send(Event::Response(ClnResponse::PaidInvoices(invoices))); 161 }); 162 } 163 164 Request::ListPeerChannels => { 165 let event_tx = event_tx.clone(); 166 let commando = commando.clone(); 167 tokio::spawn(async move { 168 let peer_channels = 169 commando.call("listpeerchannels", json!({})).await; 170 let channels = peer_channels.map(|v| { 171 let peer_channels: Vec<ListPeerChannel> = 172 serde_json::from_value(v["channels"].clone()).unwrap(); 173 to_channels(peer_channels) 174 }); 175 let _ = event_tx.send(Event::Response( 176 ClnResponse::ListPeerChannels(channels), 177 )); 178 }); 179 } 180 } 181 } 182 } 183 } 184 }); 185 } 186 187 fn process_events(&mut self, ndb: &Ndb) { 188 let Some(channel) = &mut self.channel else { 189 return; 190 }; 191 192 while let Ok(event) = channel.event_rx.try_recv() { 193 match event { 194 Event::Ended { reason } => { 195 self.connection_state = ConnectionState::Dead(reason); 196 } 197 198 Event::Connected => { 199 self.connection_state = ConnectionState::Active; 200 let _ = channel.req_tx.send(Request::GetInfo); 201 let _ = channel.req_tx.send(Request::ListPeerChannels); 202 let _ = channel.req_tx.send(Request::PaidInvoices(100)); 203 } 204 205 Event::Response(resp) => match resp { 206 ClnResponse::ListPeerChannels(chans) => { 207 if let LoadingState::Loaded(prev) = &self.channels { 208 self.last_summary = Some(crate::summary::compute_summary(prev)); 209 } 210 211 self.summary = match &chans { 212 Ok(chans) => { 213 LoadingState::Loaded(crate::summary::compute_summary(chans)) 214 } 215 Err(err) => LoadingState::Failed(err.clone()), 216 }; 217 self.channels = LoadingState::from_result(chans); 218 } 219 220 ClnResponse::GetInfo(value) => { 221 let res = serde_json::to_string_pretty(&value); 222 self.get_info = 223 LoadingState::from_result(res.map_err(|_| lnsocket::Error::Json)); 224 } 225 226 ClnResponse::PaidInvoices(invoices) => { 227 // process zap requests 228 229 if let Ok(invoices) = &invoices { 230 for invoice in invoices { 231 let zap_req_id: Option<ZapReqId> = 232 serde_json::from_str(&invoice.description).ok(); 233 if let Some(zap_req_id) = zap_req_id { 234 self.invoice_zap_reqs 235 .insert(invoice.label.clone(), zap_req_id.id); 236 let _ = ndb.process_event(&format!( 237 "[\"EVENT\",\"a\",{}]", 238 &invoice.description 239 )); 240 } 241 } 242 } 243 244 self.invoices = LoadingState::from_result(invoices); 245 } 246 }, 247 } 248 } 249 } 250 } 251 252 fn to_channels(peer_channels: Vec<ListPeerChannel>) -> Channels { 253 let mut avail_out: i64 = 0; 254 let mut avail_in: i64 = 0; 255 let mut max_total_msat: i64 = 0; 256 257 let mut channels: Vec<Channel> = peer_channels 258 .into_iter() 259 .map(|c| { 260 let to_us = (c.to_us_msat - c.our_reserve_msat).max(0); 261 let to_them_raw = (c.total_msat - c.to_us_msat).max(0); 262 let to_them = (to_them_raw - c.their_reserve_msat).max(0); 263 264 avail_out += to_us; 265 avail_in += to_them; 266 if c.total_msat > max_total_msat { 267 max_total_msat = c.total_msat; // <-- max, not sum 268 } 269 270 Channel { 271 to_us, 272 to_them, 273 original: c, 274 } 275 }) 276 .collect(); 277 278 channels.sort_by(|a, b| { 279 let a_capacity = a.to_them + a.to_us; 280 let b_capacity = b.to_them + b.to_us; 281 282 a_capacity.partial_cmp(&b_capacity).unwrap().reverse() 283 }); 284 285 Channels { 286 max_total_msat, 287 avail_out, 288 avail_in, 289 channels, 290 } 291 }