commit 6544d43d02b0528080436481ed9ff9b3175fd47c
parent 64ac06791af36fd36327be479cbccfee1aad148c
Author: kernelkind <kernelkind@gmail.com>
Date: Wed, 16 Jul 2025 12:47:31 -0400
replace `MultiSubscriber` with `TimelineSub`
Signed-off-by: kernelkind <kernelkind@gmail.com>
Diffstat:
3 files changed, 257 insertions(+), 182 deletions(-)
diff --git a/crates/notedeck_columns/src/multi_subscriber.rs b/crates/notedeck_columns/src/multi_subscriber.rs
@@ -2,151 +2,10 @@ use egui_nav::ReturnType;
use enostr::{Filter, NoteId, RelayPool};
use hashbrown::HashMap;
use nostrdb::{Ndb, Subscription};
-use tracing::{error, info};
+use notedeck::UnifiedSubscription;
use uuid::Uuid;
-use crate::timeline::ThreadSelection;
-
-#[derive(Debug)]
-pub struct MultiSubscriber {
- pub filters: Vec<Filter>,
- pub local_subid: Option<Subscription>,
- pub remote_subid: Option<String>,
- local_subscribers: u32,
- remote_subscribers: u32,
-}
-
-impl MultiSubscriber {
- /// Create a MultiSubscriber with an initial local subscription.
- pub fn with_initial_local_sub(sub: Subscription, filters: Vec<Filter>) -> Self {
- let mut msub = MultiSubscriber::new(filters);
- msub.local_subid = Some(sub);
- msub.local_subscribers = 1;
- msub
- }
-
- pub fn new(filters: Vec<Filter>) -> Self {
- Self {
- filters,
- local_subid: None,
- remote_subid: None,
- local_subscribers: 0,
- remote_subscribers: 0,
- }
- }
-
- fn unsubscribe_remote(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
- let remote_subid = if let Some(remote_subid) = &self.remote_subid {
- remote_subid
- } else {
- self.err_log(ndb, "unsubscribe_remote: nothing to unsubscribe from?");
- return;
- };
-
- pool.unsubscribe(remote_subid.clone());
-
- self.remote_subid = None;
- }
-
- /// Locally unsubscribe if we have one
- fn unsubscribe_local(&mut self, ndb: &mut Ndb) {
- let local_sub = if let Some(local_sub) = self.local_subid {
- local_sub
- } else {
- self.err_log(ndb, "unsubscribe_local: nothing to unsubscribe from?");
- return;
- };
-
- match ndb.unsubscribe(local_sub) {
- Err(e) => {
- self.err_log(ndb, &format!("Failed to unsubscribe: {e}"));
- }
- Ok(_) => {
- self.local_subid = None;
- }
- }
- }
-
- pub fn unsubscribe(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) -> bool {
- if self.local_subscribers == 0 && self.remote_subscribers == 0 {
- self.err_log(
- ndb,
- "Called multi_subscriber unsubscribe when both sub counts are 0",
- );
- return false;
- }
-
- self.local_subscribers = self.local_subscribers.saturating_sub(1);
- self.remote_subscribers = self.remote_subscribers.saturating_sub(1);
-
- if self.local_subscribers == 0 && self.remote_subscribers == 0 {
- self.info_log(ndb, "Locally unsubscribing");
- self.unsubscribe_local(ndb);
- self.unsubscribe_remote(ndb, pool);
- self.local_subscribers = 0;
- self.remote_subscribers = 0;
- true
- } else {
- false
- }
- }
-
- fn info_log(&self, ndb: &Ndb, msg: &str) {
- info!(
- "{msg}. {}/{}/{} active ndb/local/remote subscriptions.",
- ndb.subscription_count(),
- self.local_subscribers,
- self.remote_subscribers,
- );
- }
-
- fn err_log(&self, ndb: &Ndb, msg: &str) {
- error!(
- "{msg}. {}/{}/{} active ndb/local/remote subscriptions.",
- ndb.subscription_count(),
- self.local_subscribers,
- self.remote_subscribers,
- );
- }
-
- pub fn subscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
- self.local_subscribers += 1;
- self.remote_subscribers += 1;
-
- if self.remote_subscribers == 1 {
- if self.remote_subid.is_some() {
- self.err_log(
- ndb,
- "Object is first subscriber, but it already had a subscription",
- );
- return;
- } else {
- let subid = Uuid::new_v4().to_string();
- pool.subscribe(subid.clone(), self.filters.clone());
- self.info_log(ndb, "First remote subscription");
- self.remote_subid = Some(subid);
- }
- }
-
- if self.local_subscribers == 1 {
- if self.local_subid.is_some() {
- self.err_log(ndb, "Should not have a local subscription already");
- return;
- }
-
- match ndb.subscribe(&self.filters) {
- Ok(sub) => {
- self.info_log(ndb, "First local subscription");
- self.local_subid = Some(sub);
- }
-
- Err(err) => {
- error!("multi_subscriber: error subscribing locally: '{err}'")
- }
- }
- }
- }
-}
+use crate::{subscriptions, timeline::ThreadSelection};
type RootNoteId = NoteId;
@@ -404,3 +263,240 @@ fn local_sub_new_scope(
1
}
+
+#[derive(Debug)]
+pub struct TimelineSub {
+ filter: Option<Vec<Filter>>,
+ state: SubState,
+}
+
+#[derive(Debug)]
+enum SubState {
+ NoSub {
+ dependers: usize,
+ },
+ LocalOnly {
+ local: Subscription,
+ dependers: usize,
+ },
+ RemoteOnly {
+ remote: String,
+ dependers: usize,
+ },
+ Unified {
+ unified: UnifiedSubscription,
+ dependers: usize,
+ },
+}
+
+impl Default for TimelineSub {
+ fn default() -> Self {
+ Self {
+ state: SubState::NoSub { dependers: 0 },
+ filter: None,
+ }
+ }
+}
+
+impl TimelineSub {
+ pub fn try_add_local(&mut self, ndb: &Ndb, filter: &[Filter]) {
+ match &mut self.state {
+ SubState::NoSub { dependers } => {
+ let Some(sub) = ndb_sub(ndb, filter, "") else {
+ return;
+ };
+
+ self.filter = Some(filter.to_owned());
+ self.state = SubState::LocalOnly {
+ local: sub,
+ dependers: *dependers,
+ }
+ }
+ SubState::LocalOnly {
+ local: _,
+ dependers: _,
+ } => {}
+ SubState::RemoteOnly { remote, dependers } => {
+ let Some(local) = ndb_sub(ndb, filter, "") else {
+ return;
+ };
+ self.state = SubState::Unified {
+ unified: UnifiedSubscription {
+ local,
+ remote: remote.to_owned(),
+ },
+ dependers: *dependers,
+ };
+ }
+ SubState::Unified {
+ unified: _,
+ dependers: _,
+ } => {}
+ }
+ }
+
+ pub fn force_add_remote(&mut self, subid: String) {
+ match &mut self.state {
+ SubState::NoSub { dependers } => {
+ self.state = SubState::RemoteOnly {
+ remote: subid,
+ dependers: *dependers,
+ }
+ }
+ SubState::LocalOnly { local, dependers } => {
+ self.state = SubState::Unified {
+ unified: UnifiedSubscription {
+ local: *local,
+ remote: subid,
+ },
+ dependers: *dependers,
+ }
+ }
+ SubState::RemoteOnly {
+ remote: _,
+ dependers: _,
+ } => {}
+ SubState::Unified {
+ unified: _,
+ dependers: _,
+ } => {}
+ }
+ }
+
+ pub fn try_add_remote(&mut self, pool: &mut RelayPool, filter: &Vec<Filter>) {
+ match &mut self.state {
+ SubState::NoSub { dependers } => {
+ let subid = subscriptions::new_sub_id();
+ pool.subscribe(subid.clone(), filter.clone());
+ self.filter = Some(filter.to_owned());
+ self.state = SubState::RemoteOnly {
+ remote: subid,
+ dependers: *dependers,
+ };
+ }
+ SubState::LocalOnly { local, dependers } => {
+ let subid = subscriptions::new_sub_id();
+ pool.subscribe(subid.clone(), filter.clone());
+ self.filter = Some(filter.to_owned());
+ self.state = SubState::Unified {
+ unified: UnifiedSubscription {
+ local: *local,
+ remote: subid,
+ },
+ dependers: *dependers,
+ }
+ }
+ SubState::RemoteOnly {
+ remote: _,
+ dependers: _,
+ } => {}
+ SubState::Unified {
+ unified: _,
+ dependers: _,
+ } => {}
+ }
+ }
+
+ pub fn increment(&mut self) {
+ match &mut self.state {
+ SubState::NoSub { dependers } => {
+ *dependers += 1;
+ }
+ SubState::LocalOnly {
+ local: _,
+ dependers,
+ } => {
+ *dependers += 1;
+ }
+ SubState::RemoteOnly {
+ remote: _,
+ dependers,
+ } => {
+ *dependers += 1;
+ }
+ SubState::Unified {
+ unified: _,
+ dependers,
+ } => {
+ *dependers += 1;
+ }
+ }
+ }
+
+ pub fn get_local(&self) -> Option<Subscription> {
+ match &self.state {
+ SubState::NoSub { dependers: _ } => None,
+ SubState::LocalOnly {
+ local,
+ dependers: _,
+ } => Some(*local),
+ SubState::RemoteOnly {
+ remote: _,
+ dependers: _,
+ } => None,
+ SubState::Unified {
+ unified,
+ dependers: _,
+ } => Some(unified.local),
+ }
+ }
+
+ pub fn unsubscribe_or_decrement(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) {
+ match &mut self.state {
+ SubState::NoSub { dependers } => {
+ *dependers -= 1;
+ }
+ SubState::LocalOnly { local, dependers } => {
+ if *dependers > 1 {
+ *dependers -= 1;
+ return;
+ }
+
+ if let Err(e) = ndb.unsubscribe(*local) {
+ tracing::error!("Could not unsub ndb: {e}");
+ return;
+ }
+
+ self.state = SubState::NoSub { dependers: 0 };
+ }
+ SubState::RemoteOnly { remote, dependers } => {
+ if *dependers > 1 {
+ *dependers -= 1;
+ return;
+ }
+
+ pool.unsubscribe(remote.to_owned());
+
+ self.state = SubState::NoSub { dependers: 0 };
+ }
+ SubState::Unified { unified, dependers } => {
+ if *dependers > 1 {
+ *dependers -= 1;
+ return;
+ }
+
+ pool.unsubscribe(unified.remote.to_owned());
+
+ if let Err(e) = ndb.unsubscribe(unified.local) {
+ tracing::error!("could not unsub ndb: {e}");
+ self.state = SubState::LocalOnly {
+ local: unified.local,
+ dependers: *dependers,
+ }
+ } else {
+ self.state = SubState::NoSub {
+ dependers: *dependers,
+ };
+ }
+ }
+ }
+ }
+
+ pub fn get_filter(&self) -> Option<&Vec<Filter>> {
+ self.filter.as_ref()
+ }
+
+ pub fn no_sub(&self) -> bool {
+ matches!(self.state, SubState::NoSub { dependers: _ })
+ }
+}
diff --git a/crates/notedeck_columns/src/timeline/cache.rs b/crates/notedeck_columns/src/timeline/cache.rs
@@ -1,7 +1,6 @@
use crate::{
actionbar::TimelineOpenResult,
error::Error,
- multi_subscriber::MultiSubscriber,
//subscriptions::SubRefs,
timeline::{Timeline, TimelineKind},
};
@@ -55,20 +54,17 @@ impl TimelineCache {
return Err(Error::TimelineNotFound);
};
- if let Some(sub) = &mut timeline.subscription {
- // if this is the last subscriber, remove the timeline from cache
- if sub.unsubscribe(ndb, pool) {
- debug!(
- "popped last timeline {:?}, removing from timeline cache",
- id
- );
- self.timelines.remove(id);
- }
+ timeline.subscription.unsubscribe_or_decrement(ndb, pool);
- Ok(())
- } else {
- Err(Error::MissingSubscription)
+ if timeline.subscription.no_sub() {
+ debug!(
+ "popped last timeline {:?}, removing from timeline cache",
+ id
+ );
+ self.timelines.remove(id);
}
+
+ Ok(())
}
fn get_expected_mut(&mut self, key: &TimelineKind) -> &mut Timeline {
@@ -158,7 +154,7 @@ impl TimelineCache {
// The timeline cache is stale, let's update it
let notes = find_new_notes(
timeline.all_or_any_notes(),
- timeline.subscription.as_ref().map(|s| &s.filters)?,
+ timeline.subscription.get_filter()?,
txn,
ndb,
);
@@ -180,14 +176,10 @@ impl TimelineCache {
Vitality::Fresh(timeline) => (None, timeline),
};
- if let Some(multi_sub) = &mut timeline.subscription {
- debug!("got open with *old* subscription for {:?}", &timeline.kind);
- multi_sub.subscribe(ndb, pool);
- } else if let Some(filter) = timeline.filter.get_any_ready() {
+ if let Some(filter) = timeline.filter.get_any_ready() {
debug!("got open with *new* subscription for {:?}", &timeline.kind);
- let mut multi_sub = MultiSubscriber::new(filter.clone());
- multi_sub.subscribe(ndb, pool);
- timeline.subscription = Some(multi_sub);
+ timeline.subscription.try_add_local(ndb, filter);
+ timeline.subscription.try_add_remote(pool, filter);
} else {
// This should never happen reasoning, self.notes would have
// failed above if the filter wasn't ready
diff --git a/crates/notedeck_columns/src/timeline/mod.rs b/crates/notedeck_columns/src/timeline/mod.rs
@@ -1,6 +1,6 @@
use crate::{
error::Error,
- multi_subscriber::MultiSubscriber,
+ multi_subscriber::TimelineSub,
subscriptions::{self, SubKind, Subscriptions},
timeline::kind::ListKind,
Result,
@@ -198,7 +198,7 @@ pub struct Timeline {
pub views: Vec<TimelineTab>,
pub selected_view: usize,
- pub subscription: Option<MultiSubscriber>,
+ pub subscription: TimelineSub,
}
impl Timeline {
@@ -257,7 +257,7 @@ impl Timeline {
pub fn new(kind: TimelineKind, filter_state: FilterState, views: Vec<TimelineTab>) -> Self {
let filter = FilterStates::new(filter_state);
- let subscription: Option<MultiSubscriber> = None;
+ let subscription = TimelineSub::default();
let selected_view = 0;
Timeline {
@@ -405,8 +405,7 @@ impl Timeline {
let sub = self
.subscription
- .as_ref()
- .and_then(|s| s.local_subid)
+ .get_local()
.ok_or(Error::App(notedeck::Error::no_active_sub()))?;
let new_note_ids = ndb.poll_for_notes(sub, 500);
@@ -613,19 +612,7 @@ fn setup_initial_timeline(
) -> Result<()> {
// some timelines are one-shot and a refreshed, like last_per_pubkey algo feed
if timeline.kind.should_subscribe_locally() {
- let local_sub = ndb.subscribe(filters)?;
- match &mut timeline.subscription {
- None => {
- timeline.subscription = Some(MultiSubscriber::with_initial_local_sub(
- local_sub,
- filters.to_vec(),
- ));
- }
-
- Some(msub) => {
- msub.local_subid = Some(local_sub);
- }
- };
+ timeline.subscription.try_add_local(ndb, filters);
}
debug!(