watch.rs (6412B)
1 use crate::invoice::Invoice; 2 use lnsocket::CallOpts; 3 use lnsocket::CommandoClient; 4 use serde::Deserialize; 5 use serde_json::json; 6 use std::sync::Arc; 7 8 #[derive(Deserialize)] 9 struct UpdatedInvoicesResponse { 10 updated: u64, 11 } 12 13 #[derive(Deserialize)] 14 struct PayIndexInvoices { 15 invoices: Vec<PayIndexScan>, 16 } 17 18 #[derive(Deserialize)] 19 struct PayIndexScan { 20 pay_index: Option<u64>, 21 } 22 23 async fn find_lastpay_index(commando: Arc<CommandoClient>) -> Result<Option<u64>, lnsocket::Error> { 24 const PAGE: u64 = 250; 25 // 1) get the current updated tail 26 let created_value = commando 27 .call( 28 "wait", 29 json!({"subsystem":"invoices","indexname":"updated","nextvalue":0}), 30 ) 31 .await?; 32 let response: UpdatedInvoicesResponse = 33 serde_json::from_value(created_value).map_err(|_| lnsocket::Error::Json)?; 34 35 // start our window at the tail 36 let mut start_at = response 37 .updated 38 .saturating_add(1) // +1 because we want max(1, updated - PAGE + 1) 39 .saturating_sub(PAGE) 40 .max(1); 41 42 loop { 43 // 2) fetch a window (indexed by "updated") 44 let val = commando 45 .call_with_opts( 46 "listinvoices", 47 json!({ 48 "index": "updated", 49 "start": start_at, 50 "limit": PAGE, 51 }), 52 // only fetch the one field we care about 53 CallOpts::default().filter(json!({ 54 "invoices": [{"pay_index": true}] 55 })), 56 ) 57 .await?; 58 59 let parsed: PayIndexInvoices = 60 serde_json::from_value(val).map_err(|_| lnsocket::Error::Json)?; 61 62 if let Some(pi) = parsed.invoices.iter().filter_map(|inv| inv.pay_index).max() { 63 return Ok(Some(pi)); 64 } 65 66 // 4) no paid invoice in this slice—step back or bail 67 if start_at == 1 { 68 return Ok(None); 69 } 70 71 start_at = start_at.saturating_sub(PAGE).max(1); 72 } 73 } 74 75 pub async fn fetch_paid_invoices( 76 commando: Arc<CommandoClient>, 77 limit: u32, 78 ) -> Result<Vec<Invoice>, lnsocket::Error> { 79 use tokio::task::JoinSet; 80 81 // look for an invoice with the last paid index 82 let Some(lastpay_index) = find_lastpay_index(commando.clone()).await? else { 83 // no paid invoices 84 return Ok(vec![]); 85 }; 86 87 let mut set: JoinSet<Result<Invoice, lnsocket::Error>> = JoinSet::new(); 88 let start = lastpay_index.saturating_sub(limit as u64); 89 90 // 3) Fire off at most `concurrency` `waitanyinvoice` calls at a time, 91 // collect all successful responses into a Vec. 92 // fire them ALL at once 93 for idx in start..lastpay_index { 94 let c = commando.clone(); 95 set.spawn(async move { 96 let val = c 97 .call( 98 "waitanyinvoice", 99 serde_json::json!({ "lastpay_index": idx }), 100 ) 101 .await?; 102 let parsed: Invoice = serde_json::from_value(val).map_err(|_| lnsocket::Error::Json)?; 103 Ok(parsed) 104 }); 105 } 106 107 let mut results = Vec::with_capacity(limit as usize); 108 while let Some(res) = set.join_next().await { 109 results.push(res.map_err(|_| lnsocket::Error::Io(std::io::ErrorKind::Interrupted))??); 110 } 111 112 results.sort_by(|a, b| a.updated_index.cmp(&b.updated_index).reverse()); 113 114 Ok(results) 115 } 116 117 // wip watch subsystem 118 /* 119 async fn watch_subsystem( 120 commando: CommandoClient, 121 subsystem: WaitSubsystem, 122 index: WaitIndex, 123 event_tx: UnboundedSender<Event>, 124 mut cancel_rx: Receiver<()>, 125 ) { 126 // Step 1: Fetch current index value so we can back up ~20 127 let mut nextvalue: u64 = match commando 128 .call( 129 "wait", 130 serde_json::json!({ 131 "indexname": index.as_str(), 132 "subsystem": subsystem.as_str(), 133 "nextvalue": 0 134 }), 135 ) 136 .await 137 { 138 Ok(v) => { 139 // You showed the result has `updated` as the current highest index 140 let current = v.get("updated").and_then(|x| x.as_u64()).unwrap_or(0); 141 current.saturating_sub(20) // back up 20, clamp at 0 142 } 143 Err(err) => { 144 tracing::warn!("initial wait(…nextvalue=0) failed: {}", err); 145 0 146 } 147 }; 148 149 loop { 150 // You can add a timeout to avoid hanging forever in weird network states. 151 let fut = commando.call( 152 "wait", 153 serde_json::to_value(WaitRequest { 154 indexname: "invoices".into(), 155 subsystem: "lightningd".into(), 156 nextvalue, 157 }) 158 .unwrap(), 159 ); 160 161 tokio::select! { 162 _ = &mut cancel_rx => { 163 // graceful shutdown 164 break; 165 } 166 167 res = fut => { 168 match res { 169 Ok(v) => { 170 // Typical shape: { "nextvalue": n, "invoicestatus": { ... } } (varies by plugin/index) 171 // Adjust these lookups for your node’s actual wait payload. 172 if let Some(nv) = v.get("nextvalue").and_then(|x| x.as_u64()) { 173 nextvalue = nv + 1; 174 } else { 175 // Defensive: never get stuck — bump at least by 1 176 nextvalue += 1; 177 } 178 179 // Inspect/route 180 let kind = v.get("status").and_then(|s| s.as_str()); 181 let ev = match kind { 182 Some("paid") => ClnResponse::Invoice(InvoiceEvent::Paid(v.clone())), 183 Some("created") => ClnResponse::Invoice(InvoiceEvent::Created(v.clone())), 184 _ => ClnResponse::Invoice(InvoiceEvent::Other(v.clone())), 185 }; 186 let _ = event_tx.send(Event::Response(ev)); 187 } 188 Err(err) => { 189 tracing::warn!("wait(invoices) error: {err}"); 190 // small backoff so we don't tight-loop on persistent errors 191 tokio::time::sleep(std::time::Duration::from_millis(500)).await; 192 } 193 } 194 } 195 } 196 } 197 } 198 */