multi_subscriber.rs (15852B)
1 use egui_nav::ReturnType; 2 use enostr::{Filter, NoteId, RelayPool}; 3 use hashbrown::HashMap; 4 use nostrdb::{Ndb, Subscription}; 5 use notedeck::{filter::HybridFilter, UnifiedSubscription}; 6 use uuid::Uuid; 7 8 use crate::{subscriptions, timeline::ThreadSelection}; 9 10 type RootNoteId = NoteId; 11 12 #[derive(Default)] 13 pub struct ThreadSubs { 14 pub remotes: HashMap<RootNoteId, Remote>, 15 scopes: HashMap<MetaId, Vec<Scope>>, 16 } 17 18 // column id 19 type MetaId = usize; 20 21 pub struct Remote { 22 pub filter: Vec<Filter>, 23 subid: String, 24 dependers: usize, 25 } 26 27 impl std::fmt::Debug for Remote { 28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 29 f.debug_struct("Remote") 30 .field("subid", &self.subid) 31 .field("dependers", &self.dependers) 32 .finish() 33 } 34 } 35 36 struct Scope { 37 pub root_id: NoteId, 38 stack: Vec<Sub>, 39 } 40 41 pub struct Sub { 42 pub selected_id: NoteId, 43 pub sub: Subscription, 44 pub filter: Vec<Filter>, 45 } 46 47 impl ThreadSubs { 48 #[allow(clippy::too_many_arguments)] 49 pub fn subscribe( 50 &mut self, 51 ndb: &mut Ndb, 52 pool: &mut RelayPool, 53 meta_id: usize, 54 id: &ThreadSelection, 55 local_sub_filter: Vec<Filter>, 56 new_scope: bool, 57 remote_sub_filter: impl FnOnce() -> Vec<Filter>, 58 ) { 59 let cur_scopes = self.scopes.entry(meta_id).or_default(); 60 61 let new_subs = if new_scope || cur_scopes.is_empty() { 62 local_sub_new_scope(ndb, id, local_sub_filter, cur_scopes) 63 } else { 64 let cur_scope = cur_scopes.last_mut().expect("can't be empty"); 65 sub_current_scope(ndb, id, local_sub_filter, cur_scope) 66 }; 67 68 let remote = match self.remotes.raw_entry_mut().from_key(&id.root_id.bytes()) { 69 hashbrown::hash_map::RawEntryMut::Occupied(entry) => entry.into_mut(), 70 hashbrown::hash_map::RawEntryMut::Vacant(entry) => { 71 let (_, res) = entry.insert( 72 NoteId::new(*id.root_id.bytes()), 73 sub_remote(pool, remote_sub_filter, id), 74 ); 75 76 res 77 } 78 }; 79 80 remote.dependers = remote.dependers.saturating_add_signed(new_subs); 81 let num_dependers = remote.dependers; 82 tracing::debug!( 83 "Sub stats: num remotes: {}, num locals: {}, num remote dependers: {:?}", 84 self.remotes.len(), 85 self.scopes.len(), 86 num_dependers, 87 ); 88 } 89 90 pub fn unsubscribe( 91 &mut self, 92 ndb: &mut Ndb, 93 pool: &mut RelayPool, 94 meta_id: usize, 95 id: &ThreadSelection, 96 return_type: ReturnType, 97 ) { 98 let Some(scopes) = self.scopes.get_mut(&meta_id) else { 99 return; 100 }; 101 102 let Some(remote) = self.remotes.get_mut(&id.root_id.bytes()) else { 103 tracing::error!("somehow we're unsubscribing but we don't have a remote"); 104 return; 105 }; 106 107 match return_type { 108 ReturnType::Drag => { 109 if let Some(scope) = scopes.last_mut() { 110 let Some(cur_sub) = scope.stack.pop() else { 111 tracing::error!("expected a scope to be left"); 112 return; 113 }; 114 115 if scope.root_id.bytes() != id.root_id.bytes() { 116 tracing::error!( 117 "Somehow the current scope's root is not equal to the selected note's root. scope's root: {:?}, thread's root: {:?}", 118 scope.root_id.hex(), 119 id.root_id.bytes() 120 ); 121 } 122 123 if ndb_unsub(ndb, cur_sub.sub, id) { 124 remote.dependers = remote.dependers.saturating_sub(1); 125 } 126 127 if scope.stack.is_empty() { 128 scopes.pop(); 129 } 130 } 131 } 132 ReturnType::Click => { 133 let Some(scope) = scopes.pop() else { 134 tracing::error!("called unsubscribe but there aren't any scopes left"); 135 return; 136 }; 137 138 if scope.root_id.bytes() != id.root_id.bytes() { 139 tracing::error!( 140 "Somehow the current scope's root is not equal to the selected note's root. scope's root: {:?}, thread's root: {:?}", 141 scope.root_id.hex(), 142 id.root_id.bytes() 143 ); 144 } 145 for sub in scope.stack { 146 if ndb_unsub(ndb, sub.sub, id) { 147 remote.dependers = remote.dependers.saturating_sub(1); 148 } 149 } 150 } 151 } 152 153 if scopes.is_empty() { 154 self.scopes.remove(&meta_id); 155 } 156 157 let num_dependers = remote.dependers; 158 159 if remote.dependers == 0 { 160 let remote = self 161 .remotes 162 .remove(&id.root_id.bytes()) 163 .expect("code above should guarentee existence"); 164 tracing::debug!("Remotely unsubscribed: {}", remote.subid); 165 pool.unsubscribe(remote.subid); 166 } 167 168 tracing::debug!( 169 "unsub stats: num remotes: {}, num locals: {}, num remote dependers: {:?}", 170 self.remotes.len(), 171 self.scopes.len(), 172 num_dependers, 173 ); 174 } 175 176 pub fn get_local(&self, meta_id: usize) -> Option<&Sub> { 177 self.scopes 178 .get(&meta_id) 179 .as_ref() 180 .and_then(|s| s.last()) 181 .and_then(|s| s.stack.last()) 182 } 183 } 184 185 fn sub_current_scope( 186 ndb: &mut Ndb, 187 selection: &ThreadSelection, 188 local_sub_filter: Vec<Filter>, 189 cur_scope: &mut Scope, 190 ) -> isize { 191 let mut new_subs = 0; 192 193 if selection.root_id.bytes() != cur_scope.root_id.bytes() { 194 tracing::error!( 195 "Somehow the current scope's root is not equal to the selected note's root" 196 ); 197 } 198 199 if let Some(sub) = ndb_sub(ndb, &local_sub_filter, selection) { 200 cur_scope.stack.push(Sub { 201 selected_id: NoteId::new(*selection.selected_or_root()), 202 sub, 203 filter: local_sub_filter, 204 }); 205 new_subs += 1; 206 } 207 208 new_subs 209 } 210 211 fn ndb_sub(ndb: &Ndb, filter: &[Filter], id: impl std::fmt::Debug) -> Option<Subscription> { 212 match ndb.subscribe(filter) { 213 Ok(s) => Some(s), 214 Err(e) => { 215 tracing::error!("Failed to get subscription for {:?}: {e}", id); 216 None 217 } 218 } 219 } 220 221 fn ndb_unsub(ndb: &mut Ndb, sub: Subscription, id: impl std::fmt::Debug) -> bool { 222 match ndb.unsubscribe(sub) { 223 Ok(_) => true, 224 Err(e) => { 225 tracing::error!("Failed to unsub {:?}: {e}", id); 226 false 227 } 228 } 229 } 230 231 fn sub_remote( 232 pool: &mut RelayPool, 233 remote_sub_filter: impl FnOnce() -> Vec<Filter>, 234 id: impl std::fmt::Debug, 235 ) -> Remote { 236 let subid = Uuid::new_v4().to_string(); 237 238 let filter = remote_sub_filter(); 239 240 let remote = Remote { 241 filter: filter.clone(), 242 subid: subid.clone(), 243 dependers: 0, 244 }; 245 246 tracing::debug!("Remote subscribe for {:?}", id); 247 248 pool.subscribe(subid, filter); 249 250 remote 251 } 252 253 fn local_sub_new_scope( 254 ndb: &mut Ndb, 255 id: &ThreadSelection, 256 local_sub_filter: Vec<Filter>, 257 scopes: &mut Vec<Scope>, 258 ) -> isize { 259 let Some(sub) = ndb_sub(ndb, &local_sub_filter, id) else { 260 return 0; 261 }; 262 263 scopes.push(Scope { 264 root_id: id.root_id.to_note_id(), 265 stack: vec![Sub { 266 selected_id: NoteId::new(*id.selected_or_root()), 267 sub, 268 filter: local_sub_filter, 269 }], 270 }); 271 272 1 273 } 274 275 #[derive(Debug)] 276 pub struct TimelineSub { 277 filter: Option<HybridFilter>, 278 state: SubState, 279 } 280 281 #[derive(Debug, Clone)] 282 enum SubState { 283 NoSub { 284 dependers: usize, 285 }, 286 LocalOnly { 287 local: Subscription, 288 dependers: usize, 289 }, 290 RemoteOnly { 291 remote: String, 292 dependers: usize, 293 }, 294 Unified { 295 unified: UnifiedSubscription, 296 dependers: usize, 297 }, 298 } 299 300 impl Default for TimelineSub { 301 fn default() -> Self { 302 Self { 303 state: SubState::NoSub { dependers: 0 }, 304 filter: None, 305 } 306 } 307 } 308 309 impl TimelineSub { 310 pub fn try_add_local(&mut self, ndb: &Ndb, filter: &HybridFilter) { 311 let before = self.state.clone(); 312 match &mut self.state { 313 SubState::NoSub { dependers } => { 314 let Some(sub) = ndb_sub(ndb, filter.local(), "") else { 315 return; 316 }; 317 318 self.filter = Some(filter.to_owned()); 319 self.state = SubState::LocalOnly { 320 local: sub, 321 dependers: *dependers, 322 } 323 } 324 SubState::LocalOnly { 325 local: _, 326 dependers: _, 327 } => {} 328 SubState::RemoteOnly { remote, dependers } => { 329 let Some(local) = ndb_sub(ndb, filter.local(), "") else { 330 return; 331 }; 332 self.state = SubState::Unified { 333 unified: UnifiedSubscription { 334 local, 335 remote: remote.to_owned(), 336 }, 337 dependers: *dependers, 338 }; 339 } 340 SubState::Unified { 341 unified: _, 342 dependers: _, 343 } => {} 344 } 345 tracing::debug!( 346 "TimelineSub::try_add_local: {:?} => {:?}", 347 before, 348 self.state 349 ); 350 } 351 352 pub fn force_add_remote(&mut self, subid: String) { 353 let before = self.state.clone(); 354 match &mut self.state { 355 SubState::NoSub { dependers } => { 356 self.state = SubState::RemoteOnly { 357 remote: subid, 358 dependers: *dependers, 359 } 360 } 361 SubState::LocalOnly { local, dependers } => { 362 self.state = SubState::Unified { 363 unified: UnifiedSubscription { 364 local: *local, 365 remote: subid, 366 }, 367 dependers: *dependers, 368 } 369 } 370 SubState::RemoteOnly { 371 remote: _, 372 dependers: _, 373 } => {} 374 SubState::Unified { 375 unified: _, 376 dependers: _, 377 } => {} 378 } 379 tracing::debug!( 380 "TimelineSub::force_add_remote: {:?} => {:?}", 381 before, 382 self.state 383 ); 384 } 385 386 pub fn try_add_remote(&mut self, pool: &mut RelayPool, filter: &HybridFilter) { 387 let before = self.state.clone(); 388 match &mut self.state { 389 SubState::NoSub { dependers } => { 390 let subid = subscriptions::new_sub_id(); 391 pool.subscribe(subid.clone(), filter.remote().to_vec()); 392 self.filter = Some(filter.to_owned()); 393 self.state = SubState::RemoteOnly { 394 remote: subid, 395 dependers: *dependers, 396 }; 397 } 398 SubState::LocalOnly { local, dependers } => { 399 let subid = subscriptions::new_sub_id(); 400 pool.subscribe(subid.clone(), filter.remote().to_vec()); 401 self.filter = Some(filter.to_owned()); 402 self.state = SubState::Unified { 403 unified: UnifiedSubscription { 404 local: *local, 405 remote: subid, 406 }, 407 dependers: *dependers, 408 } 409 } 410 SubState::RemoteOnly { 411 remote: _, 412 dependers: _, 413 } => {} 414 SubState::Unified { 415 unified: _, 416 dependers: _, 417 } => {} 418 } 419 tracing::debug!( 420 "TimelineSub::try_add_remote: {:?} => {:?}", 421 before, 422 self.state 423 ); 424 } 425 426 pub fn increment(&mut self) { 427 let before = self.state.clone(); 428 match &mut self.state { 429 SubState::NoSub { dependers } => { 430 *dependers += 1; 431 } 432 SubState::LocalOnly { 433 local: _, 434 dependers, 435 } => { 436 *dependers += 1; 437 } 438 SubState::RemoteOnly { 439 remote: _, 440 dependers, 441 } => { 442 *dependers += 1; 443 } 444 SubState::Unified { 445 unified: _, 446 dependers, 447 } => { 448 *dependers += 1; 449 } 450 } 451 452 tracing::debug!("TimelineSub::increment: {:?} => {:?}", before, self.state); 453 } 454 455 pub fn get_local(&self) -> Option<Subscription> { 456 match &self.state { 457 SubState::NoSub { dependers: _ } => None, 458 SubState::LocalOnly { 459 local, 460 dependers: _, 461 } => Some(*local), 462 SubState::RemoteOnly { 463 remote: _, 464 dependers: _, 465 } => None, 466 SubState::Unified { 467 unified, 468 dependers: _, 469 } => Some(unified.local), 470 } 471 } 472 473 pub fn unsubscribe_or_decrement(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) { 474 let before = self.state.clone(); 475 's: { 476 match &mut self.state { 477 SubState::NoSub { dependers } => *dependers = dependers.saturating_sub(1), 478 SubState::LocalOnly { local, dependers } => { 479 if *dependers > 1 { 480 *dependers = dependers.saturating_sub(1); 481 break 's; 482 } 483 484 if let Err(e) = ndb.unsubscribe(*local) { 485 tracing::error!("Could not unsub ndb: {e}"); 486 break 's; 487 } 488 489 self.state = SubState::NoSub { dependers: 0 }; 490 } 491 SubState::RemoteOnly { remote, dependers } => { 492 if *dependers > 1 { 493 *dependers = dependers.saturating_sub(1); 494 break 's; 495 } 496 497 pool.unsubscribe(remote.to_owned()); 498 499 self.state = SubState::NoSub { dependers: 0 }; 500 } 501 SubState::Unified { unified, dependers } => { 502 if *dependers > 1 { 503 *dependers = dependers.saturating_sub(1); 504 break 's; 505 } 506 507 pool.unsubscribe(unified.remote.to_owned()); 508 509 if let Err(e) = ndb.unsubscribe(unified.local) { 510 tracing::error!("could not unsub ndb: {e}"); 511 self.state = SubState::LocalOnly { 512 local: unified.local, 513 dependers: *dependers, 514 } 515 } else { 516 self.state = SubState::NoSub { dependers: 0 }; 517 } 518 } 519 } 520 } 521 tracing::debug!( 522 "TimelineSub::unsubscribe_or_decrement: {:?} => {:?}", 523 before, 524 self.state 525 ); 526 } 527 528 pub fn get_filter(&self) -> Option<&HybridFilter> { 529 self.filter.as_ref() 530 } 531 532 pub fn no_sub(&self) -> bool { 533 matches!(self.state, SubState::NoSub { dependers: _ }) 534 } 535 }