notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

watch.rs (6412B)


      1 use crate::invoice::Invoice;
      2 use lnsocket::CallOpts;
      3 use lnsocket::CommandoClient;
      4 use serde::Deserialize;
      5 use serde_json::json;
      6 use std::sync::Arc;
      7 
      8 #[derive(Deserialize)]
      9 struct UpdatedInvoicesResponse {
     10     updated: u64,
     11 }
     12 
     13 #[derive(Deserialize)]
     14 struct PayIndexInvoices {
     15     invoices: Vec<PayIndexScan>,
     16 }
     17 
     18 #[derive(Deserialize)]
     19 struct PayIndexScan {
     20     pay_index: Option<u64>,
     21 }
     22 
     23 async fn find_lastpay_index(commando: Arc<CommandoClient>) -> Result<Option<u64>, lnsocket::Error> {
     24     const PAGE: u64 = 250;
     25     // 1) get the current updated tail
     26     let created_value = commando
     27         .call(
     28             "wait",
     29             json!({"subsystem":"invoices","indexname":"updated","nextvalue":0}),
     30         )
     31         .await?;
     32     let response: UpdatedInvoicesResponse =
     33         serde_json::from_value(created_value).map_err(|_| lnsocket::Error::Json)?;
     34 
     35     // start our window at the tail
     36     let mut start_at = response
     37         .updated
     38         .saturating_add(1) // +1 because we want max(1, updated - PAGE + 1)
     39         .saturating_sub(PAGE)
     40         .max(1);
     41 
     42     loop {
     43         // 2) fetch a window (indexed by "updated")
     44         let val = commando
     45             .call_with_opts(
     46                 "listinvoices",
     47                 json!({
     48                     "index": "updated",
     49                     "start": start_at,
     50                     "limit": PAGE,
     51                 }),
     52                 // only fetch the one field we care about
     53                 CallOpts::default().filter(json!({
     54                     "invoices": [{"pay_index": true}]
     55                 })),
     56             )
     57             .await?;
     58 
     59         let parsed: PayIndexInvoices =
     60             serde_json::from_value(val).map_err(|_| lnsocket::Error::Json)?;
     61 
     62         if let Some(pi) = parsed.invoices.iter().filter_map(|inv| inv.pay_index).max() {
     63             return Ok(Some(pi));
     64         }
     65 
     66         // 4) no paid invoice in this slice—step back or bail
     67         if start_at == 1 {
     68             return Ok(None);
     69         }
     70 
     71         start_at = start_at.saturating_sub(PAGE).max(1);
     72     }
     73 }
     74 
     75 pub async fn fetch_paid_invoices(
     76     commando: Arc<CommandoClient>,
     77     limit: u32,
     78 ) -> Result<Vec<Invoice>, lnsocket::Error> {
     79     use tokio::task::JoinSet;
     80 
     81     // look for an invoice with the last paid index
     82     let Some(lastpay_index) = find_lastpay_index(commando.clone()).await? else {
     83         // no paid invoices
     84         return Ok(vec![]);
     85     };
     86 
     87     let mut set: JoinSet<Result<Invoice, lnsocket::Error>> = JoinSet::new();
     88     let start = lastpay_index.saturating_sub(limit as u64);
     89 
     90     // 3) Fire off at most `concurrency` `waitanyinvoice` calls at a time,
     91     //    collect all successful responses into a Vec.
     92     // fire them ALL at once
     93     for idx in start..lastpay_index {
     94         let c = commando.clone();
     95         set.spawn(async move {
     96             let val = c
     97                 .call(
     98                     "waitanyinvoice",
     99                     serde_json::json!({ "lastpay_index": idx }),
    100                 )
    101                 .await?;
    102             let parsed: Invoice = serde_json::from_value(val).map_err(|_| lnsocket::Error::Json)?;
    103             Ok(parsed)
    104         });
    105     }
    106 
    107     let mut results = Vec::with_capacity(limit as usize);
    108     while let Some(res) = set.join_next().await {
    109         results.push(res.map_err(|_| lnsocket::Error::Io(std::io::ErrorKind::Interrupted))??);
    110     }
    111 
    112     results.sort_by(|a, b| a.updated_index.cmp(&b.updated_index).reverse());
    113 
    114     Ok(results)
    115 }
    116 
    117 // wip watch subsystem
    118 /*
    119 async fn watch_subsystem(
    120     commando: CommandoClient,
    121     subsystem: WaitSubsystem,
    122     index: WaitIndex,
    123     event_tx: UnboundedSender<Event>,
    124     mut cancel_rx: Receiver<()>,
    125 ) {
    126     // Step 1: Fetch current index value so we can back up ~20
    127     let mut nextvalue: u64 = match commando
    128         .call(
    129             "wait",
    130             serde_json::json!({
    131                 "indexname": index.as_str(),
    132                 "subsystem": subsystem.as_str(),
    133                 "nextvalue": 0
    134             }),
    135         )
    136         .await
    137     {
    138         Ok(v) => {
    139             // You showed the result has `updated` as the current highest index
    140             let current = v.get("updated").and_then(|x| x.as_u64()).unwrap_or(0);
    141             current.saturating_sub(20) // back up 20, clamp at 0
    142         }
    143         Err(err) => {
    144             tracing::warn!("initial wait(…nextvalue=0) failed: {}", err);
    145             0
    146         }
    147     };
    148 
    149     loop {
    150         // You can add a timeout to avoid hanging forever in weird network states.
    151         let fut = commando.call(
    152             "wait",
    153             serde_json::to_value(WaitRequest {
    154                 indexname: "invoices".into(),
    155                 subsystem: "lightningd".into(),
    156                 nextvalue,
    157             })
    158             .unwrap(),
    159         );
    160 
    161         tokio::select! {
    162             _ = &mut cancel_rx => {
    163                 // graceful shutdown
    164                 break;
    165             }
    166 
    167             res = fut => {
    168                 match res {
    169                     Ok(v) => {
    170                         // Typical shape: { "nextvalue": n, "invoicestatus": { ... } } (varies by plugin/index)
    171                         // Adjust these lookups for your node’s actual wait payload.
    172                         if let Some(nv) = v.get("nextvalue").and_then(|x| x.as_u64()) {
    173                             nextvalue = nv + 1;
    174                         } else {
    175                             // Defensive: never get stuck — bump at least by 1
    176                             nextvalue += 1;
    177                         }
    178 
    179                         // Inspect/route
    180                         let kind = v.get("status").and_then(|s| s.as_str());
    181                         let ev = match kind {
    182                             Some("paid") => ClnResponse::Invoice(InvoiceEvent::Paid(v.clone())),
    183                             Some("created") => ClnResponse::Invoice(InvoiceEvent::Created(v.clone())),
    184                             _ => ClnResponse::Invoice(InvoiceEvent::Other(v.clone())),
    185                         };
    186                         let _ = event_tx.send(Event::Response(ev));
    187                     }
    188                     Err(err) => {
    189                         tracing::warn!("wait(invoices) error: {err}");
    190                         // small backoff so we don't tight-loop on persistent errors
    191                         tokio::time::sleep(std::time::Duration::from_millis(500)).await;
    192                     }
    193                 }
    194             }
    195         }
    196     }
    197 }
    198 */