nostrdb-rs

nostrdb in rust!
git clone git://jb55.com/nostrdb-rs
Log | Files | Refs | Submodules | README | LICENSE

future.rs (2865B)


      1 use crate::{Ndb, NoteKey, Subscription};
      2 
      3 use std::{
      4     pin::Pin,
      5     task::{Context, Poll},
      6 };
      7 
      8 use futures::Stream;
      9 use tracing::error;
     10 
     11 /// Used to track query futures
     12 #[derive(Debug, Clone)]
     13 pub(crate) struct SubscriptionState {
     14     pub done: bool,
     15     pub waker: Option<std::task::Waker>,
     16 }
     17 
     18 /// A subscription that you can .await on. This can enables very clean
     19 /// integration into Rust's async state machinery.
     20 pub struct SubscriptionStream {
     21     // some handle or state
     22     // e.g., a reference to a non-blocking API or a shared atomic state
     23     ndb: Ndb,
     24     sub_id: Subscription,
     25     max_notes: u32,
     26     unsubscribe_on_drop: bool,
     27 }
     28 
     29 impl SubscriptionStream {
     30     pub fn new(ndb: Ndb, sub_id: Subscription) -> Self {
     31         // Most of the time we only want to fetch a few things. If expecting
     32         // lots of data, use `set_max_notes_per_await`
     33         let max_notes = 32;
     34         let unsubscribe_on_drop = true;
     35         SubscriptionStream {
     36             ndb,
     37             sub_id,
     38             unsubscribe_on_drop,
     39             max_notes,
     40         }
     41     }
     42 
     43     pub fn notes_per_await(mut self, max_notes: u32) -> Self {
     44         self.max_notes = max_notes;
     45         self
     46     }
     47 
     48     /// Unsubscribe the subscription when this stream goes out of scope. On
     49     /// by default. Recommended unless you want subscription leaks.
     50     pub fn unsubscribe_on_drop(mut self, yes: bool) -> Self {
     51         self.unsubscribe_on_drop = yes;
     52         self
     53     }
     54 
     55     pub fn sub_id(&self) -> Subscription {
     56         self.sub_id
     57     }
     58 }
     59 
     60 impl Drop for SubscriptionStream {
     61     fn drop(&mut self) {
     62         // Perform cleanup here, like removing the subscription from the global map
     63         {
     64             let mut map = self.ndb.subs.lock().unwrap();
     65             map.remove(&self.sub_id);
     66         }
     67         // unsubscribe
     68         if let Err(err) = self.ndb.unsubscribe(self.sub_id) {
     69             error!(
     70                 "Error unsubscribing from {} in SubscriptionStream Drop: {err}",
     71                 self.sub_id.id()
     72             );
     73         }
     74     }
     75 }
     76 
     77 impl Stream for SubscriptionStream {
     78     type Item = Vec<NoteKey>;
     79 
     80     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     81         let pinned = std::pin::pin!(self);
     82         let me = pinned.as_ref().get_ref();
     83         let mut map = me.ndb.subs.lock().unwrap();
     84         let sub_state = map.entry(me.sub_id).or_insert(SubscriptionState {
     85             done: false,
     86             waker: None,
     87         });
     88 
     89         // we've unsubscribed
     90         if sub_state.done {
     91             return Poll::Ready(None);
     92         }
     93 
     94         let notes = me.ndb.poll_for_notes(me.sub_id, me.max_notes);
     95         if !notes.is_empty() {
     96             return Poll::Ready(Some(notes));
     97         }
     98 
     99         // Not ready yet, store waker
    100         sub_state.waker = Some(cx.waker().clone());
    101         std::task::Poll::Pending
    102     }
    103 }