multi_subscriber.rs (17359B)
1 use egui_nav::ReturnType; 2 use enostr::{Filter, NoteId, RelayPool}; 3 use hashbrown::HashMap; 4 use nostrdb::{Ndb, Subscription}; 5 use notedeck::{filter::HybridFilter, UnifiedSubscription}; 6 use uuid::Uuid; 7 8 use crate::{subscriptions, timeline::ThreadSelection}; 9 10 type RootNoteId = NoteId; 11 12 #[derive(Default)] 13 pub struct ThreadSubs { 14 pub remotes: HashMap<RootNoteId, Remote>, 15 scopes: HashMap<MetaId, Vec<Scope>>, 16 } 17 18 // column id 19 type MetaId = usize; 20 21 pub struct Remote { 22 pub filter: Vec<Filter>, 23 subid: String, 24 dependers: usize, 25 } 26 27 impl std::fmt::Debug for Remote { 28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 29 f.debug_struct("Remote") 30 .field("subid", &self.subid) 31 .field("dependers", &self.dependers) 32 .finish() 33 } 34 } 35 36 struct Scope { 37 pub root_id: NoteId, 38 stack: Vec<Sub>, 39 } 40 41 pub struct Sub { 42 pub selected_id: NoteId, 43 pub sub: Subscription, 44 pub filter: Vec<Filter>, 45 } 46 47 impl ThreadSubs { 48 #[allow(clippy::too_many_arguments)] 49 pub fn subscribe( 50 &mut self, 51 ndb: &mut Ndb, 52 pool: &mut RelayPool, 53 meta_id: usize, 54 id: &ThreadSelection, 55 local_sub_filter: Vec<Filter>, 56 new_scope: bool, 57 remote_sub_filter: impl FnOnce() -> Vec<Filter>, 58 ) { 59 let cur_scopes = self.scopes.entry(meta_id).or_default(); 60 61 let new_subs = if new_scope || cur_scopes.is_empty() { 62 local_sub_new_scope(ndb, id, local_sub_filter, cur_scopes) 63 } else { 64 let cur_scope = cur_scopes.last_mut().expect("can't be empty"); 65 sub_current_scope(ndb, id, local_sub_filter, cur_scope) 66 }; 67 68 let remote = match self.remotes.raw_entry_mut().from_key(&id.root_id.bytes()) { 69 hashbrown::hash_map::RawEntryMut::Occupied(entry) => entry.into_mut(), 70 hashbrown::hash_map::RawEntryMut::Vacant(entry) => { 71 let (_, res) = entry.insert( 72 NoteId::new(*id.root_id.bytes()), 73 sub_remote(pool, remote_sub_filter, id), 74 ); 75 76 res 77 } 78 }; 79 80 remote.dependers = remote.dependers.saturating_add_signed(new_subs); 81 let num_dependers = remote.dependers; 82 tracing::debug!( 83 "Sub stats: num remotes: {}, num locals: {}, num remote dependers: {:?}", 84 self.remotes.len(), 85 self.scopes.len(), 86 num_dependers, 87 ); 88 } 89 90 pub fn unsubscribe( 91 &mut self, 92 ndb: &mut Ndb, 93 pool: &mut RelayPool, 94 meta_id: usize, 95 id: &ThreadSelection, 96 return_type: ReturnType, 97 ) { 98 let Some(scopes) = self.scopes.get_mut(&meta_id) else { 99 return; 100 }; 101 102 let Some(remote) = self.remotes.get_mut(&id.root_id.bytes()) else { 103 tracing::error!("somehow we're unsubscribing but we don't have a remote"); 104 return; 105 }; 106 107 match return_type { 108 ReturnType::Drag => { 109 if let Some(scope) = scopes.last_mut() { 110 let Some(cur_sub) = scope.stack.pop() else { 111 tracing::error!("expected a scope to be left"); 112 return; 113 }; 114 115 if scope.root_id.bytes() != id.root_id.bytes() { 116 tracing::error!( 117 "Somehow the current scope's root is not equal to the selected note's root. scope's root: {:?}, thread's root: {:?}", 118 scope.root_id.hex(), 119 id.root_id.bytes() 120 ); 121 } 122 123 if ndb_unsub(ndb, cur_sub.sub, id) { 124 remote.dependers = remote.dependers.saturating_sub(1); 125 } 126 127 if scope.stack.is_empty() { 128 scopes.pop(); 129 } 130 } 131 } 132 ReturnType::Click => { 133 let Some(scope) = scopes.pop() else { 134 tracing::error!("called unsubscribe but there aren't any scopes left"); 135 return; 136 }; 137 138 if scope.root_id.bytes() != id.root_id.bytes() { 139 tracing::error!( 140 "Somehow the current scope's root is not equal to the selected note's root. scope's root: {:?}, thread's root: {:?}", 141 scope.root_id.hex(), 142 id.root_id.bytes() 143 ); 144 } 145 for sub in scope.stack { 146 if ndb_unsub(ndb, sub.sub, id) { 147 remote.dependers = remote.dependers.saturating_sub(1); 148 } 149 } 150 } 151 } 152 153 if scopes.is_empty() { 154 self.scopes.remove(&meta_id); 155 } 156 157 let num_dependers = remote.dependers; 158 159 if remote.dependers == 0 { 160 let remote = self 161 .remotes 162 .remove(&id.root_id.bytes()) 163 .expect("code above should guarentee existence"); 164 tracing::debug!("Remotely unsubscribed: {}", remote.subid); 165 pool.unsubscribe(remote.subid); 166 } 167 168 tracing::debug!( 169 "unsub stats: num remotes: {}, num locals: {}, num remote dependers: {:?}", 170 self.remotes.len(), 171 self.scopes.len(), 172 num_dependers, 173 ); 174 } 175 176 pub fn get_local(&self, meta_id: usize) -> Option<&Sub> { 177 self.scopes 178 .get(&meta_id) 179 .as_ref() 180 .and_then(|s| s.last()) 181 .and_then(|s| s.stack.last()) 182 } 183 } 184 185 fn sub_current_scope( 186 ndb: &mut Ndb, 187 selection: &ThreadSelection, 188 local_sub_filter: Vec<Filter>, 189 cur_scope: &mut Scope, 190 ) -> isize { 191 let mut new_subs = 0; 192 193 if selection.root_id.bytes() != cur_scope.root_id.bytes() { 194 tracing::error!( 195 "Somehow the current scope's root is not equal to the selected note's root" 196 ); 197 } 198 199 if let Some(sub) = ndb_sub(ndb, &local_sub_filter, selection) { 200 cur_scope.stack.push(Sub { 201 selected_id: NoteId::new(*selection.selected_or_root()), 202 sub, 203 filter: local_sub_filter, 204 }); 205 new_subs += 1; 206 } 207 208 new_subs 209 } 210 211 fn ndb_sub(ndb: &Ndb, filter: &[Filter], id: impl std::fmt::Debug) -> Option<Subscription> { 212 match ndb.subscribe(filter) { 213 Ok(s) => Some(s), 214 Err(e) => { 215 tracing::error!("Failed to get subscription for {:?}: {e}", id); 216 None 217 } 218 } 219 } 220 221 fn ndb_unsub(ndb: &mut Ndb, sub: Subscription, id: impl std::fmt::Debug) -> bool { 222 match ndb.unsubscribe(sub) { 223 Ok(_) => true, 224 Err(e) => { 225 tracing::error!("Failed to unsub {:?}: {e}", id); 226 false 227 } 228 } 229 } 230 231 fn sub_remote( 232 pool: &mut RelayPool, 233 remote_sub_filter: impl FnOnce() -> Vec<Filter>, 234 id: impl std::fmt::Debug, 235 ) -> Remote { 236 let subid = Uuid::new_v4().to_string(); 237 238 let filter = remote_sub_filter(); 239 240 let remote = Remote { 241 filter: filter.clone(), 242 subid: subid.clone(), 243 dependers: 0, 244 }; 245 246 tracing::debug!("Remote subscribe for {:?}", id); 247 248 pool.subscribe(subid, filter); 249 250 remote 251 } 252 253 fn local_sub_new_scope( 254 ndb: &mut Ndb, 255 id: &ThreadSelection, 256 local_sub_filter: Vec<Filter>, 257 scopes: &mut Vec<Scope>, 258 ) -> isize { 259 let Some(sub) = ndb_sub(ndb, &local_sub_filter, id) else { 260 return 0; 261 }; 262 263 scopes.push(Scope { 264 root_id: id.root_id.to_note_id(), 265 stack: vec![Sub { 266 selected_id: NoteId::new(*id.selected_or_root()), 267 sub, 268 filter: local_sub_filter, 269 }], 270 }); 271 272 1 273 } 274 275 #[derive(Debug)] 276 pub struct TimelineSub { 277 filter: Option<HybridFilter>, 278 state: SubState, 279 } 280 281 #[derive(Debug, Clone)] 282 enum SubState { 283 NoSub { 284 dependers: usize, 285 }, 286 LocalOnly { 287 local: Subscription, 288 dependers: usize, 289 }, 290 RemoteOnly { 291 remote: String, 292 dependers: usize, 293 }, 294 Unified { 295 unified: UnifiedSubscription, 296 dependers: usize, 297 }, 298 } 299 300 impl Default for TimelineSub { 301 fn default() -> Self { 302 Self { 303 state: SubState::NoSub { dependers: 0 }, 304 filter: None, 305 } 306 } 307 } 308 309 impl TimelineSub { 310 /// Reset the subscription state, properly unsubscribing from ndb and 311 /// relay pool before clearing. 312 /// 313 /// Used when the contact list changes and we need to rebuild the 314 /// timeline with a new filter. Preserves the depender count so that 315 /// shared subscription reference counting remains correct. 316 pub fn reset(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) { 317 let before = self.state.clone(); 318 319 let dependers = match &self.state { 320 SubState::NoSub { dependers } => *dependers, 321 322 SubState::LocalOnly { local, dependers } => { 323 if let Err(e) = ndb.unsubscribe(*local) { 324 tracing::error!("TimelineSub::reset: failed to unsubscribe from ndb: {e}"); 325 } 326 *dependers 327 } 328 329 SubState::RemoteOnly { remote, dependers } => { 330 pool.unsubscribe(remote.to_owned()); 331 *dependers 332 } 333 334 SubState::Unified { unified, dependers } => { 335 pool.unsubscribe(unified.remote.to_owned()); 336 if let Err(e) = ndb.unsubscribe(unified.local) { 337 tracing::error!("TimelineSub::reset: failed to unsubscribe from ndb: {e}"); 338 } 339 *dependers 340 } 341 }; 342 343 self.state = SubState::NoSub { dependers }; 344 self.filter = None; 345 346 tracing::debug!("TimelineSub::reset: {:?} => {:?}", before, self.state); 347 } 348 349 pub fn try_add_local(&mut self, ndb: &Ndb, filter: &HybridFilter) { 350 let before = self.state.clone(); 351 match &mut self.state { 352 SubState::NoSub { dependers } => { 353 let Some(sub) = ndb_sub(ndb, &filter.local().combined(), "") else { 354 return; 355 }; 356 357 self.filter = Some(filter.to_owned()); 358 self.state = SubState::LocalOnly { 359 local: sub, 360 dependers: *dependers, 361 } 362 } 363 SubState::LocalOnly { 364 local: _, 365 dependers: _, 366 } => {} 367 SubState::RemoteOnly { remote, dependers } => { 368 let Some(local) = ndb_sub(ndb, &filter.local().combined(), "") else { 369 return; 370 }; 371 self.state = SubState::Unified { 372 unified: UnifiedSubscription { 373 local, 374 remote: remote.to_owned(), 375 }, 376 dependers: *dependers, 377 }; 378 } 379 SubState::Unified { 380 unified: _, 381 dependers: _, 382 } => {} 383 } 384 tracing::debug!( 385 "TimelineSub::try_add_local: {:?} => {:?}", 386 before, 387 self.state 388 ); 389 } 390 391 pub fn force_add_remote(&mut self, subid: String) { 392 let before = self.state.clone(); 393 match &mut self.state { 394 SubState::NoSub { dependers } => { 395 self.state = SubState::RemoteOnly { 396 remote: subid, 397 dependers: *dependers, 398 } 399 } 400 SubState::LocalOnly { local, dependers } => { 401 self.state = SubState::Unified { 402 unified: UnifiedSubscription { 403 local: *local, 404 remote: subid, 405 }, 406 dependers: *dependers, 407 } 408 } 409 SubState::RemoteOnly { 410 remote: _, 411 dependers: _, 412 } => {} 413 SubState::Unified { 414 unified: _, 415 dependers: _, 416 } => {} 417 } 418 tracing::debug!( 419 "TimelineSub::force_add_remote: {:?} => {:?}", 420 before, 421 self.state 422 ); 423 } 424 425 pub fn try_add_remote(&mut self, pool: &mut RelayPool, filter: &HybridFilter) { 426 let before = self.state.clone(); 427 match &mut self.state { 428 SubState::NoSub { dependers } => { 429 let subid = subscriptions::new_sub_id(); 430 pool.subscribe(subid.clone(), filter.remote().to_vec()); 431 self.filter = Some(filter.to_owned()); 432 self.state = SubState::RemoteOnly { 433 remote: subid, 434 dependers: *dependers, 435 }; 436 } 437 SubState::LocalOnly { local, dependers } => { 438 let subid = subscriptions::new_sub_id(); 439 pool.subscribe(subid.clone(), filter.remote().to_vec()); 440 self.filter = Some(filter.to_owned()); 441 self.state = SubState::Unified { 442 unified: UnifiedSubscription { 443 local: *local, 444 remote: subid, 445 }, 446 dependers: *dependers, 447 } 448 } 449 SubState::RemoteOnly { 450 remote: _, 451 dependers: _, 452 } => {} 453 SubState::Unified { 454 unified: _, 455 dependers: _, 456 } => {} 457 } 458 tracing::debug!( 459 "TimelineSub::try_add_remote: {:?} => {:?}", 460 before, 461 self.state 462 ); 463 } 464 465 pub fn increment(&mut self) { 466 let before = self.state.clone(); 467 match &mut self.state { 468 SubState::NoSub { dependers } => { 469 *dependers += 1; 470 } 471 SubState::LocalOnly { 472 local: _, 473 dependers, 474 } => { 475 *dependers += 1; 476 } 477 SubState::RemoteOnly { 478 remote: _, 479 dependers, 480 } => { 481 *dependers += 1; 482 } 483 SubState::Unified { 484 unified: _, 485 dependers, 486 } => { 487 *dependers += 1; 488 } 489 } 490 491 tracing::debug!("TimelineSub::increment: {:?} => {:?}", before, self.state); 492 } 493 494 pub fn get_local(&self) -> Option<Subscription> { 495 match &self.state { 496 SubState::NoSub { dependers: _ } => None, 497 SubState::LocalOnly { 498 local, 499 dependers: _, 500 } => Some(*local), 501 SubState::RemoteOnly { 502 remote: _, 503 dependers: _, 504 } => None, 505 SubState::Unified { 506 unified, 507 dependers: _, 508 } => Some(unified.local), 509 } 510 } 511 512 pub fn unsubscribe_or_decrement(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) { 513 let before = self.state.clone(); 514 's: { 515 match &mut self.state { 516 SubState::NoSub { dependers } => *dependers = dependers.saturating_sub(1), 517 SubState::LocalOnly { local, dependers } => { 518 if *dependers > 1 { 519 *dependers = dependers.saturating_sub(1); 520 break 's; 521 } 522 523 if let Err(e) = ndb.unsubscribe(*local) { 524 tracing::error!("Could not unsub ndb: {e}"); 525 break 's; 526 } 527 528 self.state = SubState::NoSub { dependers: 0 }; 529 } 530 SubState::RemoteOnly { remote, dependers } => { 531 if *dependers > 1 { 532 *dependers = dependers.saturating_sub(1); 533 break 's; 534 } 535 536 pool.unsubscribe(remote.to_owned()); 537 538 self.state = SubState::NoSub { dependers: 0 }; 539 } 540 SubState::Unified { unified, dependers } => { 541 if *dependers > 1 { 542 *dependers = dependers.saturating_sub(1); 543 break 's; 544 } 545 546 pool.unsubscribe(unified.remote.to_owned()); 547 548 if let Err(e) = ndb.unsubscribe(unified.local) { 549 tracing::error!("could not unsub ndb: {e}"); 550 self.state = SubState::LocalOnly { 551 local: unified.local, 552 dependers: *dependers, 553 } 554 } else { 555 self.state = SubState::NoSub { dependers: 0 }; 556 } 557 } 558 } 559 } 560 tracing::debug!( 561 "TimelineSub::unsubscribe_or_decrement: {:?} => {:?}", 562 before, 563 self.state 564 ); 565 } 566 567 pub fn get_filter(&self) -> Option<&HybridFilter> { 568 self.filter.as_ref() 569 } 570 571 pub fn no_sub(&self) -> bool { 572 matches!(self.state, SubState::NoSub { dependers: _ }) 573 } 574 }