commit 6c94987a7aa274799767f87a461ea853a760048b
parent 7d9a23c87dfaa04622774031f8dd4015f640e673
Author: William Casarin <jb55@jb55.com>
Date: Thu, 26 Sep 2024 14:01:12 -0700
Merge multi-subscriber #287
kernelkind (2):
multi subscriber impl
implement multi_subscriber for Thread
Diffstat:
4 files changed, 160 insertions(+), 165 deletions(-)
diff --git a/src/actionbar.rs b/src/actionbar.rs
@@ -1,4 +1,5 @@
use crate::{
+ multi_subscriber::MultiSubscriber,
note::NoteRef,
notecache::NoteCache,
route::{Route, Router},
@@ -6,8 +7,6 @@ use crate::{
};
use enostr::{NoteId, RelayPool};
use nostrdb::{Ndb, Transaction};
-use tracing::{error, info};
-use uuid::Uuid;
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum BarAction {
@@ -70,42 +69,15 @@ fn open_thread(
ThreadResult::Fresh(thread) => (thread, None),
};
- // only start a subscription on nav and if we don't have
- // an active subscription for this thread.
- if thread.subscription().is_none() {
+ let multi_subscriber = if let Some(multi_subscriber) = &mut thread.multi_subscriber {
+ multi_subscriber
+ } else {
let filters = Thread::filters(root_id);
- *thread.subscription_mut() = ndb.subscribe(&filters).ok();
-
- if thread.remote_subscription().is_some() {
- error!("Found active remote subscription when it was not expected");
- } else {
- let subid = Uuid::new_v4().to_string();
- *thread.remote_subscription_mut() = Some(subid.clone());
- pool.subscribe(subid, filters);
- }
+ thread.multi_subscriber = Some(MultiSubscriber::new(filters));
+ thread.multi_subscriber.as_mut().unwrap()
+ };
- match thread.subscription() {
- Some(_sub) => {
- thread.subscribers += 1;
- info!(
- "Locally/remotely subscribing to thread. {} total active subscriptions, {} on this thread",
- ndb.subscription_count(),
- thread.subscribers,
- );
- }
- None => error!(
- "Error subscribing locally to selected note '{}''s thread",
- hex::encode(selected_note)
- ),
- }
- } else {
- thread.subscribers += 1;
- info!(
- "Re-using existing thread subscription. {} total active subscriptions, {} on this thread",
- ndb.subscription_count(),
- thread.subscribers,
- )
- }
+ multi_subscriber.subscribe(ndb, pool);
result
}
diff --git a/src/lib.rs b/src/lib.rs
@@ -21,6 +21,7 @@ mod key_parsing;
mod key_storage;
pub mod login_manager;
mod macos_key_storage;
+mod multi_subscriber;
mod nav;
mod note;
mod notecache;
diff --git a/src/multi_subscriber.rs b/src/multi_subscriber.rs
@@ -0,0 +1,133 @@
+use enostr::{Filter, RelayPool};
+use nostrdb::{Ndb, Note, Transaction};
+use tracing::{debug, error, info};
+use uuid::Uuid;
+
+use crate::{filter::UnifiedSubscription, note::NoteRef, Error};
+
+pub struct MultiSubscriber {
+ filters: Vec<Filter>,
+ sub: Option<UnifiedSubscription>,
+ subscribers: u32,
+}
+
+impl MultiSubscriber {
+ pub fn new(filters: Vec<Filter>) -> Self {
+ Self {
+ filters,
+ sub: None,
+ subscribers: 0,
+ }
+ }
+
+ fn real_subscribe(
+ ndb: &Ndb,
+ pool: &mut RelayPool,
+ filters: Vec<Filter>,
+ ) -> Option<UnifiedSubscription> {
+ let subid = Uuid::new_v4().to_string();
+ let sub = ndb.subscribe(&filters).ok()?;
+
+ pool.subscribe(subid.clone(), filters);
+
+ Some(UnifiedSubscription {
+ local: sub,
+ remote: subid,
+ })
+ }
+
+ pub fn unsubscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
+ if self.subscribers == 0 {
+ error!("No subscribers to unsubscribe from");
+ return;
+ }
+
+ self.subscribers -= 1;
+ if self.subscribers == 0 {
+ let sub = match self.sub {
+ Some(ref sub) => sub,
+ None => {
+ error!("No remote subscription to unsubscribe from");
+ return;
+ }
+ };
+ let local_sub = &sub.local;
+ if let Err(e) = ndb.unsubscribe(*local_sub) {
+ error!(
+ "failed to unsubscribe from object: {e}, subid:{}, {} active subscriptions",
+ local_sub.id(),
+ ndb.subscription_count()
+ );
+ } else {
+ info!(
+ "Unsubscribed from object subid:{}. {} active subscriptions",
+ local_sub.id(),
+ ndb.subscription_count()
+ );
+ }
+
+ // unsub from remote
+ pool.unsubscribe(sub.remote.clone());
+ self.sub = None;
+ } else {
+ info!(
+ "Locally unsubscribing. {} active ndb subscriptions. {} active subscriptions for this object",
+ ndb.subscription_count(),
+ self.subscribers,
+ );
+ }
+ }
+
+ pub fn subscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
+ self.subscribers += 1;
+ if self.subscribers == 1 {
+ if self.sub.is_some() {
+ error!("Object is first subscriber, but it already had remote subscription");
+ return;
+ }
+
+ self.sub = Self::real_subscribe(ndb, pool, self.filters.clone());
+ info!(
+ "Remotely subscribing to object. {} total active subscriptions, {} on this object",
+ ndb.subscription_count(),
+ self.subscribers,
+ );
+
+ if self.sub.is_none() {
+ error!("Error subscribing remotely to object");
+ }
+ } else {
+ info!(
+ "Locally subscribing. {} total active subscriptions, {} for this object",
+ ndb.subscription_count(),
+ self.subscribers,
+ )
+ }
+ }
+
+ pub fn poll_for_notes(&mut self, ndb: &Ndb, txn: &Transaction) -> Result<Vec<NoteRef>, Error> {
+ let sub = self.sub.as_ref().ok_or(Error::no_active_sub())?;
+ let new_note_keys = ndb.poll_for_notes(sub.local, 500);
+
+ if new_note_keys.is_empty() {
+ return Ok(vec![]);
+ } else {
+ debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys);
+ }
+
+ let mut notes: Vec<Note<'_>> = Vec::with_capacity(new_note_keys.len());
+ for key in new_note_keys {
+ let note = if let Ok(note) = ndb.get_note_by_key(txn, key) {
+ note
+ } else {
+ continue;
+ };
+
+ notes.push(note);
+ }
+
+ let note_refs: Vec<NoteRef> = notes.iter().map(|n| NoteRef::from_note(n)).collect();
+
+ Ok(note_refs)
+ }
+}
diff --git a/src/thread.rs b/src/thread.rs
@@ -1,27 +1,19 @@
use crate::{
+ multi_subscriber::MultiSubscriber,
note::NoteRef,
notecache::NoteCache,
timeline::{TimelineTab, ViewFilter},
Error, Result,
};
use enostr::RelayPool;
-use nostrdb::{Filter, FilterBuilder, Ndb, Note, Subscription, Transaction};
-use std::cmp::Ordering;
+use nostrdb::{Filter, FilterBuilder, Ndb, Transaction};
use std::collections::HashMap;
-use tracing::{debug, error, info, warn};
+use tracing::{debug, warn};
#[derive(Default)]
pub struct Thread {
view: TimelineTab,
- sub: Option<Subscription>,
- remote_sub: Option<String>,
- pub subscribers: i32,
-}
-
-#[derive(Debug, Eq, PartialEq, Copy, Clone)]
-pub enum DecrementResult {
- LastSubscriber(Subscription),
- ActiveSubscribers,
+ pub multi_subscriber: Option<MultiSubscriber>,
}
impl Thread {
@@ -32,15 +24,10 @@ impl Thread {
}
let mut view = TimelineTab::new_with_capacity(ViewFilter::NotesAndReplies, cap);
view.notes = notes;
- let sub: Option<Subscription> = None;
- let remote_sub: Option<String> = None;
- let subscribers: i32 = 0;
Thread {
view,
- sub,
- remote_sub,
- subscribers,
+ multi_subscriber: None,
}
}
@@ -53,37 +40,18 @@ impl Thread {
}
#[must_use = "UnknownIds::update_from_note_refs should be used on this result"]
- pub fn poll_notes_into_view<'a>(
- &mut self,
- txn: &'a Transaction,
- ndb: &Ndb,
- ) -> Result<Vec<Note<'a>>> {
- let sub = self.subscription().expect("thread subscription");
- let new_note_keys = ndb.poll_for_notes(sub, 500);
- if new_note_keys.is_empty() {
- return Ok(vec![]);
- } else {
- debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys);
- }
-
- let mut notes: Vec<Note<'a>> = Vec::with_capacity(new_note_keys.len());
- for key in new_note_keys {
- let note = if let Ok(note) = ndb.get_note_by_key(txn, key) {
- note
- } else {
- continue;
- };
-
- notes.push(note);
- }
-
- {
+ pub fn poll_notes_into_view(&mut self, txn: &Transaction, ndb: &Ndb) -> Result<()> {
+ if let Some(multi_subscriber) = &mut self.multi_subscriber {
let reversed = true;
- let note_refs: Vec<NoteRef> = notes.iter().map(|n| NoteRef::from_note(n)).collect();
+ let note_refs: Vec<NoteRef> = multi_subscriber.poll_for_notes(ndb, txn)?;
self.view.insert(¬e_refs, reversed);
+ } else {
+ return Err(Error::Generic(
+ "Thread unexpectedly has no MultiSubscriber".to_owned(),
+ ));
}
- Ok(notes)
+ Ok(())
}
/// Look for new thread notes since our last fetch
@@ -112,38 +80,6 @@ impl Thread {
}
}
- pub fn decrement_sub(&mut self) -> Result<DecrementResult> {
- self.subscribers -= 1;
-
- match self.subscribers.cmp(&0) {
- Ordering::Equal => {
- if let Some(sub) = self.subscription() {
- Ok(DecrementResult::LastSubscriber(sub))
- } else {
- Err(Error::no_active_sub())
- }
- }
- Ordering::Less => Err(Error::unexpected_sub_count(self.subscribers)),
- Ordering::Greater => Ok(DecrementResult::ActiveSubscribers),
- }
- }
-
- pub fn subscription(&self) -> Option<Subscription> {
- self.sub
- }
-
- pub fn remote_subscription(&self) -> &Option<String> {
- &self.remote_sub
- }
-
- pub fn remote_subscription_mut(&mut self) -> &mut Option<String> {
- &mut self.remote_sub
- }
-
- pub fn subscription_mut(&mut self) -> &mut Option<Subscription> {
- &mut self.sub
- }
-
fn filters_raw(root: &[u8; 32]) -> Vec<FilterBuilder> {
vec![
nostrdb::Filter::new().kinds([1]).event(root),
@@ -253,59 +189,12 @@ pub fn thread_unsubscribe(
note_cache: &mut NoteCache,
id: &[u8; 32],
) {
- let (unsubscribe, remote_subid) = {
- let txn = Transaction::new(ndb).expect("txn");
- let root_id = crate::note::root_note_id_from_selected_id(ndb, note_cache, &txn, id);
-
- let thread = threads.thread_mut(ndb, &txn, root_id).get_ptr();
- let unsub = thread.decrement_sub();
-
- let mut remote_subid: Option<String> = None;
- if let Ok(DecrementResult::LastSubscriber(_subid)) = unsub {
- *thread.subscription_mut() = None;
- remote_subid = thread.remote_subscription().to_owned();
- *thread.remote_subscription_mut() = None;
- }
+ let txn = Transaction::new(ndb).expect("txn");
+ let root_id = crate::note::root_note_id_from_selected_id(ndb, note_cache, &txn, id);
- (unsub, remote_subid)
- };
+ let thread = threads.thread_mut(ndb, &txn, root_id).get_ptr();
- match unsubscribe {
- Ok(DecrementResult::LastSubscriber(sub)) => {
- if let Err(e) = ndb.unsubscribe(sub) {
- error!(
- "failed to unsubscribe from thread: {e}, subid:{}, {} active subscriptions",
- sub.id(),
- ndb.subscription_count()
- );
- } else {
- info!(
- "Unsubscribed from thread subid:{}. {} active subscriptions",
- sub.id(),
- ndb.subscription_count()
- );
- }
-
- // unsub from remote
- if let Some(subid) = remote_subid {
- pool.unsubscribe(subid);
- }
- }
-
- Ok(DecrementResult::ActiveSubscribers) => {
- info!(
- "Keeping thread subscription. {} active subscriptions.",
- ndb.subscription_count()
- );
- // do nothing
- }
-
- Err(e) => {
- // something is wrong!
- error!(
- "Thread unsubscribe error: {e}. {} active subsciptions.",
- ndb.subscription_count()
- );
- }
+ if let Some(multi_subscriber) = &mut thread.multi_subscriber {
+ multi_subscriber.unsubscribe(ndb, pool);
}
}