events.rs (3910B)
1 use enostr::{PoolEventBuf, PoolRelay, RelayEvent, RelayMessage, RelayPool}; 2 use notedeck::{AppContext, UnknownIds}; 3 use tracing::{error, info}; 4 5 pub fn try_process_events_core( 6 app_ctx: &mut AppContext<'_>, 7 pool: &mut enostr::RelayPool, 8 ctx: &egui::Context, 9 mut receive: impl FnMut(&mut AppContext, &mut RelayPool, PoolEventBuf), 10 ) { 11 let ctx2 = ctx.clone(); 12 let wakeup = move || { 13 ctx2.request_repaint(); 14 }; 15 16 pool.keepalive_ping(wakeup); 17 18 // NOTE: we don't use the while let loop due to borrow issues 19 #[allow(clippy::while_let_loop)] 20 loop { 21 let ev = if let Some(ev) = pool.try_recv() { 22 ev.into_owned() 23 } else { 24 break; 25 }; 26 27 match (&ev.event).into() { 28 RelayEvent::Opened => { 29 tracing::trace!("Opened relay {}", ev.relay); 30 } 31 RelayEvent::Closed => tracing::warn!("{} connection closed", &ev.relay), 32 RelayEvent::Other(msg) => { 33 tracing::trace!("relay {} sent other event {:?}", ev.relay, &msg) 34 } 35 RelayEvent::Error(error) => error!("relay {} had error: {error:?}", &ev.relay), 36 RelayEvent::Message(msg) => { 37 process_message_core(app_ctx, pool, &ev.relay, &msg); 38 } 39 } 40 41 receive(app_ctx, pool, ev); 42 } 43 44 if app_ctx.unknown_ids.ready_to_send() { 45 pool_unknown_id_send(app_ctx.unknown_ids, pool); 46 } 47 } 48 49 fn process_message_core( 50 ctx: &mut AppContext<'_>, 51 pool: &mut enostr::RelayPool, 52 relay: &str, 53 msg: &RelayMessage, 54 ) { 55 match msg { 56 RelayMessage::Event(_subid, ev) => { 57 let relay = if let Some(relay) = pool.relays.iter().find(|r| r.url() == relay) { 58 relay 59 } else { 60 error!("couldn't find relay {} for note processing!?", relay); 61 return; 62 }; 63 64 match relay { 65 PoolRelay::Websocket(_) => { 66 //info!("processing event {}", event); 67 tracing::trace!("processing event {ev}"); 68 if let Err(err) = ctx.ndb.process_event_with( 69 ev, 70 nostrdb::IngestMetadata::new() 71 .client(false) 72 .relay(relay.url()), 73 ) { 74 error!("error processing event {ev}: {err}"); 75 } 76 } 77 PoolRelay::Multicast(_) => { 78 // multicast events are client events 79 if let Err(err) = ctx.ndb.process_event_with( 80 ev, 81 nostrdb::IngestMetadata::new() 82 .client(true) 83 .relay(relay.url()), 84 ) { 85 error!("error processing multicast event {ev}: {err}"); 86 } 87 } 88 } 89 } 90 RelayMessage::Notice(msg) => tracing::warn!("Notice from {}: {}", relay, msg), 91 RelayMessage::OK(cr) => info!("OK {:?}", cr), 92 RelayMessage::Eose(id) => { 93 tracing::trace!("Relay {} received eose: {id}", relay) 94 } 95 RelayMessage::Closed(sid, reason) => { 96 tracing::trace!( 97 "Relay {} with sub {sid} received close because: {reason}", 98 relay 99 ); 100 } 101 } 102 } 103 104 fn pool_unknown_id_send(unknown_ids: &mut UnknownIds, pool: &mut enostr::RelayPool) { 105 tracing::debug!("unknown_id_send called on: {:?}", &unknown_ids); 106 let filter = unknown_ids.filter().expect("filter"); 107 tracing::debug!( 108 "Getting {} unknown ids from relays", 109 unknown_ids.ids_iter().len() 110 ); 111 let msg = enostr::ClientMessage::req("unknownids".to_string(), filter); 112 unknown_ids.clear(); 113 pool.send(&msg); 114 }