future.rs (2865B)
1 use crate::{Ndb, NoteKey, Subscription}; 2 3 use std::{ 4 pin::Pin, 5 task::{Context, Poll}, 6 }; 7 8 use futures::Stream; 9 use tracing::error; 10 11 /// Used to track query futures 12 #[derive(Debug, Clone)] 13 pub(crate) struct SubscriptionState { 14 pub done: bool, 15 pub waker: Option<std::task::Waker>, 16 } 17 18 /// A subscription that you can .await on. This can enables very clean 19 /// integration into Rust's async state machinery. 20 pub struct SubscriptionStream { 21 // some handle or state 22 // e.g., a reference to a non-blocking API or a shared atomic state 23 ndb: Ndb, 24 sub_id: Subscription, 25 max_notes: u32, 26 unsubscribe_on_drop: bool, 27 } 28 29 impl SubscriptionStream { 30 pub fn new(ndb: Ndb, sub_id: Subscription) -> Self { 31 // Most of the time we only want to fetch a few things. If expecting 32 // lots of data, use `set_max_notes_per_await` 33 let max_notes = 32; 34 let unsubscribe_on_drop = true; 35 SubscriptionStream { 36 ndb, 37 sub_id, 38 unsubscribe_on_drop, 39 max_notes, 40 } 41 } 42 43 pub fn notes_per_await(mut self, max_notes: u32) -> Self { 44 self.max_notes = max_notes; 45 self 46 } 47 48 /// Unsubscribe the subscription when this stream goes out of scope. On 49 /// by default. Recommended unless you want subscription leaks. 50 pub fn unsubscribe_on_drop(mut self, yes: bool) -> Self { 51 self.unsubscribe_on_drop = yes; 52 self 53 } 54 55 pub fn sub_id(&self) -> Subscription { 56 self.sub_id 57 } 58 } 59 60 impl Drop for SubscriptionStream { 61 fn drop(&mut self) { 62 // Perform cleanup here, like removing the subscription from the global map 63 { 64 let mut map = self.ndb.subs.lock().unwrap(); 65 map.remove(&self.sub_id); 66 } 67 // unsubscribe 68 if let Err(err) = self.ndb.unsubscribe(self.sub_id) { 69 error!( 70 "Error unsubscribing from {} in SubscriptionStream Drop: {err}", 71 self.sub_id.id() 72 ); 73 } 74 } 75 } 76 77 impl Stream for SubscriptionStream { 78 type Item = Vec<NoteKey>; 79 80 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 81 let pinned = std::pin::pin!(self); 82 let me = pinned.as_ref().get_ref(); 83 let mut map = me.ndb.subs.lock().unwrap(); 84 let sub_state = map.entry(me.sub_id).or_insert(SubscriptionState { 85 done: false, 86 waker: None, 87 }); 88 89 // we've unsubscribed 90 if sub_state.done { 91 return Poll::Ready(None); 92 } 93 94 let notes = me.ndb.poll_for_notes(me.sub_id, me.max_notes); 95 if !notes.is_empty() { 96 return Poll::Ready(Some(notes)); 97 } 98 99 // Not ready yet, store waker 100 sub_state.waker = Some(cx.waker().clone()); 101 std::task::Poll::Pending 102 } 103 }