session.rs (16397B)
1 use hashbrown::{hash_map::Entry, HashMap, HashSet}; 2 use nostrdb::Filter; 3 4 use crate::relay::{ 5 FullModificationTask, ModifyFiltersTask, ModifyRelaysTask, ModifyTask, NormRelayUrl, 6 OutboxSubId, OutboxTask, RelayUrlPkgs, SubscribeTask, 7 }; 8 9 /// OutboxSession records subscription mutations for the current frame before they 10 /// are applied to the relay coordinators. 11 #[derive(Default)] 12 pub struct OutboxSession { 13 pub tasks: HashMap<OutboxSubId, OutboxTask>, 14 } 15 16 impl OutboxSession { 17 #[profiling::function] 18 pub fn new_filters(&mut self, id: OutboxSubId, mut new_filters: Vec<Filter>) { 19 filters_prune_empty(&mut new_filters); 20 if new_filters.is_empty() { 21 self.unsubscribe(id); 22 return; 23 } 24 25 let entry = self.tasks.entry(id); 26 27 let mut entry = match entry { 28 Entry::Occupied(occupied_entry) => { 29 if matches!(occupied_entry.get(), OutboxTask::Oneshot(_)) { 30 // we don't modify oneshots 31 return; 32 } 33 occupied_entry 34 } 35 Entry::Vacant(vacant_entry) => { 36 vacant_entry.insert(OutboxTask::Modify(ModifyTask::Filters(ModifyFiltersTask( 37 new_filters, 38 )))); 39 return; 40 } 41 }; 42 43 match entry.get_mut() { 44 OutboxTask::Modify(modify) => match modify { 45 ModifyTask::Filters(_) => { 46 self.tasks.insert( 47 id, 48 OutboxTask::Modify(ModifyTask::Filters(ModifyFiltersTask(new_filters))), 49 ); 50 } 51 ModifyTask::Relays(modify_relays_task) => { 52 let relays = std::mem::take(&mut modify_relays_task.0); 53 *entry.get_mut() = OutboxTask::Modify(ModifyTask::Full(FullModificationTask { 54 filters: new_filters, 55 relays, 56 })); 57 } 58 ModifyTask::Full(full) => { 59 full.filters = new_filters; 60 } 61 }, 62 OutboxTask::Unsubscribe => { 63 self.tasks.insert( 64 id, 65 OutboxTask::Modify(ModifyTask::Filters(ModifyFiltersTask(new_filters))), 66 ); 67 } 68 OutboxTask::Oneshot(oneshot) => { 69 oneshot.filters = new_filters; 70 } 71 OutboxTask::Subscribe(subscribe_task) => { 72 subscribe_task.filters = new_filters; 73 } 74 } 75 } 76 #[profiling::function] 77 pub fn new_relays(&mut self, id: OutboxSubId, new_urls: HashSet<NormRelayUrl>) { 78 let entry = self.tasks.entry(id); 79 80 let mut entry = match entry { 81 Entry::Occupied(occupied_entry) => { 82 let task = occupied_entry.get(); 83 84 if matches!(task, OutboxTask::Oneshot(_)) { 85 // we don't modify oneshots 86 return; 87 } 88 89 occupied_entry 90 } 91 Entry::Vacant(vacant_entry) => { 92 vacant_entry.insert(OutboxTask::Modify(ModifyTask::Relays(ModifyRelaysTask( 93 new_urls, 94 )))); 95 return; 96 } 97 }; 98 99 match entry.get_mut() { 100 OutboxTask::Modify(modify) => { 101 match modify { 102 ModifyTask::Filters(modify_filters_task) => { 103 let filters = std::mem::take(&mut modify_filters_task.0); // moves out, leaves empty/default 104 *entry.get_mut() = 105 OutboxTask::Modify(ModifyTask::Full(FullModificationTask { 106 filters, 107 relays: new_urls, 108 })); 109 } 110 ModifyTask::Relays(_) => { 111 self.tasks.insert( 112 id, 113 OutboxTask::Modify(ModifyTask::Relays(ModifyRelaysTask(new_urls))), 114 ); 115 } 116 ModifyTask::Full(full_modification_task) => { 117 full_modification_task.relays = new_urls; 118 } 119 } 120 } 121 OutboxTask::Unsubscribe => { 122 self.tasks.insert( 123 id, 124 OutboxTask::Modify(ModifyTask::Relays(ModifyRelaysTask(new_urls))), 125 ); 126 } 127 OutboxTask::Oneshot(oneshot) => { 128 oneshot.relays.urls = new_urls; 129 } 130 OutboxTask::Subscribe(subscribe_task) => { 131 subscribe_task.relays.urls = new_urls; 132 } 133 } 134 } 135 136 pub fn subscribe(&mut self, id: OutboxSubId, mut filters: Vec<Filter>, urls: RelayUrlPkgs) { 137 filters_prune_empty(&mut filters); 138 if filters.is_empty() { 139 return; 140 } 141 142 self.tasks.insert( 143 id, 144 OutboxTask::Subscribe(SubscribeTask { 145 filters, 146 relays: urls, 147 }), 148 ); 149 } 150 151 pub fn oneshot(&mut self, id: OutboxSubId, mut filters: Vec<Filter>, urls: RelayUrlPkgs) { 152 filters_prune_empty(&mut filters); 153 if filters.is_empty() { 154 return; 155 } 156 157 self.tasks.insert( 158 id, 159 OutboxTask::Oneshot(SubscribeTask { 160 filters, 161 relays: urls, 162 }), 163 ); 164 } 165 166 pub fn unsubscribe(&mut self, id: OutboxSubId) { 167 self.tasks.insert(id, OutboxTask::Unsubscribe); 168 } 169 } 170 171 fn filters_prune_empty(filters: &mut Vec<Filter>) { 172 filters.retain(|f| f.num_elements() != 0); 173 } 174 175 #[cfg(test)] 176 mod tests { 177 use crate::relay::test_utils::{expect_task, trivial_filter}; 178 179 use super::*; 180 181 // ==================== OutboxSession tests ==================== 182 183 /// Verifies a freshly created session has no pending tasks. 184 #[test] 185 fn outbox_session_default_empty() { 186 let session = OutboxSession::default(); 187 assert!(session.tasks.is_empty()); 188 } 189 190 /// Drops subscribe/oneshot requests that lack meaningful filters/relays. 191 #[test] 192 fn outbox_session_subscribe_empty() { 193 let mut session = OutboxSession::default(); 194 let urls = RelayUrlPkgs::new(HashSet::new()); 195 196 session.subscribe(OutboxSubId(0), vec![Filter::new().build()], urls.clone()); 197 assert!(session.tasks.is_empty()); 198 199 session.subscribe(OutboxSubId(0), vec![], urls.clone()); 200 assert!(session.tasks.is_empty()); 201 202 session.oneshot(OutboxSubId(0), vec![Filter::new().build()], urls.clone()); 203 assert!(session.tasks.is_empty()); 204 205 session.oneshot(OutboxSubId(0), vec![], urls); 206 assert!(session.tasks.is_empty()); 207 } 208 209 /// Stores subscribe tasks when filters and relays are provided. 210 #[test] 211 fn outbox_session_subscribe() { 212 let mut session = OutboxSession::default(); 213 let urls = RelayUrlPkgs::new(HashSet::new()); 214 215 session.subscribe(OutboxSubId(0), trivial_filter(), urls); 216 217 assert!(matches!( 218 expect_task(&session, OutboxSubId(0)), 219 OutboxTask::Subscribe(_) 220 )); 221 } 222 223 /// Stores oneshot tasks when filters and relays are provided. 224 #[test] 225 fn outbox_session_oneshot() { 226 let mut session = OutboxSession::default(); 227 let urls = RelayUrlPkgs::new(HashSet::new()); 228 229 session.oneshot(OutboxSubId(0), trivial_filter(), urls); 230 231 assert!(matches!( 232 expect_task(&session, OutboxSubId(0)), 233 OutboxTask::Oneshot(_) 234 )); 235 } 236 237 /// Records unsubscribe operations on demand. 238 #[test] 239 fn outbox_session_unsubscribe() { 240 let mut session = OutboxSession::default(); 241 242 session.unsubscribe(OutboxSubId(42)); 243 244 assert!(matches!( 245 expect_task(&session, OutboxSubId(42)), 246 OutboxTask::Unsubscribe 247 )); 248 } 249 250 /// Pushing filters first results in a Modify(Filters) task. 251 #[test] 252 fn outbox_session_new_filters_creates_modify_filters() { 253 let mut session = OutboxSession::default(); 254 255 session.new_filters(OutboxSubId(0), trivial_filter()); 256 257 assert!(matches!( 258 expect_task(&session, OutboxSubId(0)), 259 OutboxTask::Modify(ModifyTask::Filters(_)) 260 )); 261 } 262 263 /// Pushing relays first results in a Modify(Relays) task. 264 #[test] 265 fn outbox_session_new_relays_creates_modify_relays() { 266 let mut session = OutboxSession::default(); 267 268 session.new_relays(OutboxSubId(0), HashSet::new()); 269 270 assert!(matches!( 271 expect_task(&session, OutboxSubId(0)), 272 OutboxTask::Modify(ModifyTask::Relays(_)) 273 )); 274 } 275 276 /// Mixing filters then relays converges to a Modify(Full) task. 277 #[test] 278 fn outbox_session_merges_filters_and_relays_to_full_modification() { 279 let mut session = OutboxSession::default(); 280 281 // First add filters 282 session.new_filters(OutboxSubId(0), trivial_filter()); 283 284 // Then add relays - should merge to Full modification 285 session.new_relays(OutboxSubId(0), HashSet::new()); 286 287 assert!(matches!( 288 expect_task(&session, OutboxSubId(0)), 289 OutboxTask::Modify(ModifyTask::Full(_)) 290 )); 291 } 292 293 /// Mixing relays then filters also converges to a Modify(Full) task. 294 #[test] 295 fn outbox_session_merges_relays_and_filters_to_full_modification() { 296 let mut session = OutboxSession::default(); 297 298 // First add relays 299 session.new_relays(OutboxSubId(0), HashSet::new()); 300 301 // Then add filters - should merge to Full modification 302 session.new_filters(OutboxSubId(0), trivial_filter()); 303 304 assert!(matches!( 305 expect_task(&session, OutboxSubId(0)), 306 OutboxTask::Modify(ModifyTask::Full(_)) 307 )); 308 } 309 310 // this should never happen in practice though 311 /// Subscribe commands override previously staged filter changes. 312 #[test] 313 fn outbox_session_subscribe_overwrites_modify_filters() { 314 let mut session = OutboxSession::default(); 315 let urls = RelayUrlPkgs::new(HashSet::new()); 316 317 session.new_filters(OutboxSubId(0), trivial_filter()); 318 session.subscribe( 319 OutboxSubId(0), 320 vec![Filter::new().kinds(vec![3]).build()], 321 urls, 322 ); 323 324 assert!(matches!( 325 expect_task(&session, OutboxSubId(0)), 326 OutboxTask::Subscribe(_) 327 )); 328 } 329 330 /// Unsubscribe issued after subscribe should take precedence. 331 #[test] 332 fn outbox_session_unsubscribe_after_subscribe() { 333 let mut session = OutboxSession::default(); 334 let urls = RelayUrlPkgs::new(HashSet::new()); 335 336 session.subscribe(OutboxSubId(0), trivial_filter(), urls); 337 session.unsubscribe(OutboxSubId(0)); 338 339 assert!(matches!( 340 expect_task(&session, OutboxSubId(0)), 341 OutboxTask::Unsubscribe 342 )); 343 } 344 345 /// Adding filters after an unsubscribe restarts the task as Modify(Filters). 346 #[test] 347 fn outbox_session_new_filters_after_unsubscribe() { 348 let mut session = OutboxSession::default(); 349 350 session.unsubscribe(OutboxSubId(0)); 351 session.new_filters(OutboxSubId(0), trivial_filter()); 352 353 // Filters should overwrite unsubscribe 354 assert!(matches!( 355 expect_task(&session, OutboxSubId(0)), 356 OutboxTask::Modify(ModifyTask::Filters(_)) 357 )); 358 } 359 360 /// Updating filters of a Full modification replaces its filter list. 361 #[test] 362 fn outbox_session_update_full_modification_filters() { 363 let mut session = OutboxSession::default(); 364 365 // Create full modification 366 session.new_filters(OutboxSubId(0), trivial_filter()); 367 session.new_relays(OutboxSubId(0), HashSet::new()); 368 369 // Update filters on the full modification 370 session.new_filters( 371 OutboxSubId(0), 372 vec![ 373 Filter::new().kinds(vec![3]).build(), 374 Filter::new().kinds(vec![1]).build(), 375 ], 376 ); 377 378 match expect_task(&session, OutboxSubId(0)) { 379 OutboxTask::Modify(ModifyTask::Full(fm)) => { 380 assert_eq!(fm.filters.len(), 2); 381 } 382 _ => panic!("Expected Modify(Full)"), 383 } 384 } 385 386 /// Updating relays of a Full modification replaces its relay set. 387 #[test] 388 fn outbox_session_update_full_modification_relays() { 389 let mut session = OutboxSession::default(); 390 391 // Create full modification 392 session.new_filters(OutboxSubId(0), trivial_filter()); 393 session.new_relays(OutboxSubId(0), HashSet::new()); 394 395 // Update relays on the full modification 396 let mut new_urls = HashSet::new(); 397 new_urls.insert(NormRelayUrl::new("wss://relay.example.com").unwrap()); 398 session.new_relays(OutboxSubId(0), new_urls); 399 400 match expect_task(&session, OutboxSubId(0)) { 401 OutboxTask::Modify(ModifyTask::Full(fm)) => { 402 assert!(!fm.relays.is_empty()); 403 } 404 _ => panic!("Expected Modify(Full)"), 405 } 406 } 407 408 /// Attempting to modify oneshot filters leaves them unchanged. 409 #[test] 410 fn outbox_session_update_oneshot_filters() { 411 let mut session = OutboxSession::default(); 412 let urls = RelayUrlPkgs::new(HashSet::new()); 413 414 session.oneshot(OutboxSubId(0), trivial_filter(), urls); 415 session.new_filters( 416 OutboxSubId(0), 417 vec![ 418 Filter::new().kinds([1]).build(), 419 Filter::new().kinds([3]).build(), 420 ], 421 ); 422 423 match expect_task(&session, OutboxSubId(0)) { 424 OutboxTask::Oneshot(task) => { 425 assert_eq!(task.filters.len(), 1); 426 } 427 _ => panic!("Expected Oneshot task"), 428 } 429 } 430 431 /// Updating filters on a Subscribe task replaces the stored filters. 432 #[test] 433 fn outbox_session_update_subscribe_filters() { 434 let mut session = OutboxSession::default(); 435 let urls = RelayUrlPkgs::new(HashSet::new()); 436 437 session.subscribe(OutboxSubId(0), trivial_filter(), urls); 438 session.new_filters( 439 OutboxSubId(0), 440 vec![ 441 Filter::new().kinds([1]).build(), 442 Filter::new().kinds([3]).build(), 443 ], 444 ); 445 446 match expect_task(&session, OutboxSubId(0)) { 447 OutboxTask::Subscribe(task) => { 448 assert_eq!(task.filters.len(), 2); 449 } 450 _ => panic!("Expected Subscribe task"), 451 } 452 } 453 454 /// Updating relays on a Subscribe task replaces the stored relays. 455 #[test] 456 fn outbox_session_update_subscribe_relays() { 457 let mut session = OutboxSession::default(); 458 let urls = RelayUrlPkgs::new(HashSet::new()); 459 460 session.subscribe(OutboxSubId(0), trivial_filter(), urls); 461 462 let mut new_urls = HashSet::new(); 463 new_urls.insert(NormRelayUrl::new("wss://relay.example.com").unwrap()); 464 session.new_relays(OutboxSubId(0), new_urls); 465 466 match expect_task(&session, OutboxSubId(0)) { 467 OutboxTask::Subscribe(task) => { 468 assert!(!task.relays.urls.is_empty()); 469 } 470 _ => panic!("Expected Subscribe task"), 471 } 472 } 473 474 /// Attempting to modify oneshot relays leaves them unchanged. 475 #[test] 476 fn outbox_session_update_oneshot_relays() { 477 let mut session = OutboxSession::default(); 478 let urls = RelayUrlPkgs::new(HashSet::new()); 479 480 session.oneshot(OutboxSubId(0), trivial_filter(), urls); 481 482 let mut new_urls = HashSet::new(); 483 new_urls.insert(NormRelayUrl::new("wss://relay.example.com").unwrap()); 484 session.new_relays(OutboxSubId(0), new_urls); 485 486 match expect_task(&session, OutboxSubId(0)) { 487 OutboxTask::Oneshot(task) => { 488 assert!( 489 task.relays.urls.is_empty(), 490 "cannot make modifications on oneshot" 491 ); 492 } 493 _ => panic!("Expected Oneshot task"), 494 } 495 } 496 }