subscription.rs (13884B)
1 use hashbrown::{HashMap, HashSet}; 2 use nostrdb::Filter; 3 4 use crate::relay::{MetadataFilters, NormRelayUrl, OutboxSubId, RelayType, RelayUrlPkgs}; 5 6 pub struct OutboxSubscription { 7 pub relays: HashSet<NormRelayUrl>, 8 pub filters: MetadataFilters, 9 json_size: usize, 10 pub is_oneshot: bool, 11 pub relay_type: RelayType, 12 } 13 14 impl OutboxSubscription { 15 pub fn see_all(&mut self, at: u64) { 16 for (_, meta) in self.filters.iter_mut() { 17 meta.last_seen = Some(at); 18 } 19 } 20 21 pub fn ingest_task(&mut self, task: ModifyTask) { 22 match task { 23 ModifyTask::Filters(modify_filters_task) => { 24 self.filters = MetadataFilters::new(modify_filters_task.0); 25 self.json_size = self.filters.json_size_sum(); 26 } 27 ModifyTask::Relays(modify_relays_task) => { 28 self.relays = modify_relays_task.0; 29 } 30 ModifyTask::Full(full_modification_task) => { 31 self.filters = MetadataFilters::new(full_modification_task.filters); 32 self.json_size = self.filters.json_size_sum(); 33 self.relays = full_modification_task.relays; 34 } 35 } 36 } 37 } 38 39 #[derive(Default)] 40 pub struct OutboxSubscriptions { 41 subs: HashMap<OutboxSubId, OutboxSubscription>, 42 } 43 44 impl OutboxSubscriptions { 45 pub fn view(&self, id: &OutboxSubId) -> Option<SubscriptionView<'_>> { 46 let sub = self.subs.get(id)?; 47 48 Some(SubscriptionView { 49 id: *id, 50 filters: &sub.filters, 51 json_size: sub.json_size, 52 is_oneshot: sub.is_oneshot, 53 }) 54 } 55 56 pub fn json_size(&self, id: &OutboxSubId) -> Option<usize> { 57 self.subs.get(id).map(|s| s.json_size) 58 } 59 60 pub fn subset_oneshot(&self, ids: &HashSet<OutboxSubId>) -> HashSet<OutboxSubId> { 61 ids.iter() 62 .filter(|id| self.subs.get(*id).is_some_and(|s| s.is_oneshot)) 63 .copied() 64 .collect() 65 } 66 67 pub fn is_oneshot(&self, id: &OutboxSubId) -> bool { 68 self.subs.get(id).is_some_and(|s| s.is_oneshot) 69 } 70 71 pub fn json_size_sum(&self, ids: &HashSet<OutboxSubId>) -> usize { 72 ids.iter() 73 .map(|id| self.subs.get(id).map_or(0, |s| s.json_size)) 74 .sum() 75 } 76 77 pub fn filters_all(&self, ids: &HashSet<OutboxSubId>) -> Vec<Filter> { 78 ids.iter() 79 .filter_map(|id| self.subs.get(id)) 80 .flat_map(|sub| sub.filters.filters.iter().cloned()) 81 .collect() 82 } 83 84 pub fn get_mut(&mut self, id: &OutboxSubId) -> Option<&mut OutboxSubscription> { 85 self.subs.get_mut(id) 86 } 87 88 pub fn get(&self, id: &OutboxSubId) -> Option<&OutboxSubscription> { 89 self.subs.get(id) 90 } 91 92 pub fn remove(&mut self, id: &OutboxSubId) { 93 self.subs.remove(id); 94 } 95 96 pub fn new_subscription(&mut self, id: OutboxSubId, task: SubscribeTask, is_oneshot: bool) { 97 let filters = MetadataFilters::new(task.filters); 98 let json_size = filters.json_size_sum(); 99 self.subs.insert( 100 id, 101 OutboxSubscription { 102 relays: task.relays.urls, 103 filters, 104 json_size, 105 is_oneshot, 106 relay_type: if task.relays.use_transparent { 107 RelayType::Transparent 108 } else { 109 RelayType::Compaction 110 }, 111 }, 112 ); 113 } 114 } 115 116 pub struct SubscriptionView<'a> { 117 pub id: OutboxSubId, 118 pub filters: &'a MetadataFilters, 119 #[allow(dead_code)] 120 pub json_size: usize, 121 #[allow(dead_code)] 122 pub is_oneshot: bool, 123 } 124 125 pub enum OutboxTask { 126 Modify(ModifyTask), 127 Subscribe(SubscribeTask), 128 Unsubscribe, 129 Oneshot(SubscribeTask), 130 } 131 132 pub enum ModifyTask { 133 Filters(ModifyFiltersTask), 134 Relays(ModifyRelaysTask), 135 Full(FullModificationTask), 136 } 137 138 #[derive(Default)] 139 pub struct ModifyFiltersTask(pub Vec<Filter>); 140 141 pub struct ModifyRelaysTask(pub HashSet<NormRelayUrl>); 142 143 pub struct FullModificationTask { 144 pub filters: Vec<Filter>, 145 pub relays: HashSet<NormRelayUrl>, 146 } 147 148 pub struct SubscribeTask { 149 pub filters: Vec<Filter>, 150 pub relays: RelayUrlPkgs, 151 } 152 153 #[cfg(test)] 154 mod tests { 155 use super::*; 156 use crate::relay::RelayUrlPkgs; 157 use crate::relay::{FullModificationTask, ModifyFiltersTask}; 158 159 fn subscribe_task(filters: Vec<Filter>, urls: RelayUrlPkgs) -> SubscribeTask { 160 SubscribeTask { 161 filters, 162 relays: urls, 163 } 164 } 165 166 fn relay_urls(url: &str) -> HashSet<NormRelayUrl> { 167 let mut urls = HashSet::new(); 168 let relay = NormRelayUrl::new(url).unwrap(); 169 urls.insert(relay); 170 urls 171 } 172 173 /// new_subscription should persist relay metadata and expose it via view(). 174 #[test] 175 fn new_subscription_records_metadata() { 176 let mut subs = OutboxSubscriptions::default(); 177 let mut pkgs = RelayUrlPkgs::new(relay_urls("wss://relay-meta.example.com")); 178 pkgs.use_transparent = true; 179 let filters = vec![Filter::new().kinds(vec![1]).limit(4).build()]; 180 let id = OutboxSubId(7); 181 182 subs.new_subscription(id, subscribe_task(filters.clone(), pkgs), true); 183 184 let view = subs.view(&id).expect("subscription view"); 185 assert_eq!(view.id, id); 186 assert!(view.is_oneshot); 187 assert_eq!(view.filters.get_filters().len(), filters.len()); 188 assert!(view.json_size > 0); 189 190 let sub = subs.get_mut(&id).expect("subscription metadata"); 191 assert_eq!(sub.relays.len(), 1); 192 assert_eq!(sub.relay_type, RelayType::Transparent); 193 } 194 195 /// subset_oneshot should only return IDs corresponding to oneshot subscriptions. 196 #[test] 197 fn subset_oneshot_filters_ids() { 198 let mut subs = OutboxSubscriptions::default(); 199 let filters = vec![Filter::new().kinds(vec![1]).build()]; 200 let id_a = OutboxSubId(1); 201 let id_b = OutboxSubId(2); 202 subs.new_subscription( 203 id_a, 204 subscribe_task( 205 filters.clone(), 206 RelayUrlPkgs::new(relay_urls("wss://relay-a.example")), 207 ), 208 false, 209 ); 210 subs.new_subscription( 211 id_b, 212 subscribe_task( 213 filters, 214 RelayUrlPkgs::new(relay_urls("wss://relay-b.example")), 215 ), 216 true, 217 ); 218 219 let mut ids = HashSet::new(); 220 ids.insert(id_a); 221 ids.insert(id_b); 222 223 let oneshots = subs.subset_oneshot(&ids); 224 let expected = { 225 let mut s = HashSet::new(); 226 s.insert(id_b); 227 s 228 }; 229 assert_eq!(oneshots, expected); 230 } 231 232 /// json_size_sum aggregates the JSON payload size for the requested subscriptions. 233 #[test] 234 fn json_size_sum_accumulates_sizes() { 235 let mut subs = OutboxSubscriptions::default(); 236 let filters = vec![Filter::new().kinds(vec![1]).build()]; 237 let id_a = OutboxSubId(1); 238 let id_b = OutboxSubId(2); 239 subs.new_subscription( 240 id_a, 241 subscribe_task( 242 filters.clone(), 243 RelayUrlPkgs::new(relay_urls("wss://relay-json-a.example")), 244 ), 245 false, 246 ); 247 subs.new_subscription( 248 id_b, 249 subscribe_task( 250 filters, 251 RelayUrlPkgs::new(relay_urls("wss://relay-json-b.example")), 252 ), 253 false, 254 ); 255 256 let mut ids = HashSet::new(); 257 ids.insert(id_a); 258 ids.insert(id_b); 259 260 let sum = subs.json_size_sum(&ids); 261 let expected = subs.json_size(&id_a).unwrap() + subs.json_size(&id_b).unwrap(); 262 assert_eq!(sum, expected); 263 } 264 265 /// see_all should mark every filter as seen at the provided timestamp. 266 #[test] 267 fn see_all_marks_filters() { 268 let mut subs = OutboxSubscriptions::default(); 269 let id = OutboxSubId(8); 270 subs.new_subscription( 271 id, 272 subscribe_task( 273 vec![ 274 Filter::new().kinds(vec![1]).limit(2).build(), 275 Filter::new().kinds(vec![4]).limit(1).build(), 276 ], 277 RelayUrlPkgs::new(relay_urls("wss://relay-see.example")), 278 ), 279 false, 280 ); 281 282 let timestamp = 12345; 283 let sub = subs.get_mut(&id).expect("subscription metadata"); 284 sub.see_all(timestamp); 285 286 assert!(sub 287 .filters 288 .iter() 289 .all(|(_, meta)| meta.last_seen == Some(timestamp))); 290 } 291 292 /// ingest_task should update json_size when filters are modified. 293 #[test] 294 fn ingest_task_updates_json_size_on_filter_change() { 295 let mut subs = OutboxSubscriptions::default(); 296 let id = OutboxSubId(9); 297 let small_filters = vec![Filter::new().kinds(vec![1]).build()]; 298 subs.new_subscription( 299 id, 300 subscribe_task( 301 small_filters, 302 RelayUrlPkgs::new(relay_urls("wss://relay-ingest.example")), 303 ), 304 false, 305 ); 306 307 let original_size = subs.json_size(&id).unwrap(); 308 309 // Modify with larger filters 310 let large_filters = vec![ 311 Filter::new().kinds(vec![1, 2, 3, 4, 5]).limit(100).build(), 312 Filter::new().kinds(vec![6, 7, 8]).limit(50).build(), 313 ]; 314 let sub = subs.get_mut(&id).unwrap(); 315 sub.ingest_task(ModifyTask::Filters(ModifyFiltersTask(large_filters))); 316 317 let new_size = subs.json_size(&id).unwrap(); 318 assert_ne!( 319 original_size, new_size, 320 "json_size should change after filter modification" 321 ); 322 assert!( 323 new_size > original_size, 324 "larger filters should have larger json_size" 325 ); 326 } 327 328 /// ingest_task with Full modification should update json_size. 329 #[test] 330 fn ingest_task_updates_json_size_on_full_change() { 331 let mut subs = OutboxSubscriptions::default(); 332 let id = OutboxSubId(10); 333 let small_filters = vec![Filter::new().kinds(vec![1]).build()]; 334 subs.new_subscription( 335 id, 336 subscribe_task( 337 small_filters, 338 RelayUrlPkgs::new(relay_urls("wss://relay-full.example")), 339 ), 340 false, 341 ); 342 343 let original_size = subs.json_size(&id).unwrap(); 344 345 // Full modification with larger filters 346 let large_filters = vec![ 347 Filter::new().kinds(vec![1, 2, 3, 4, 5]).limit(100).build(), 348 Filter::new().kinds(vec![6, 7, 8]).limit(50).build(), 349 ]; 350 let sub = subs.get_mut(&id).unwrap(); 351 sub.ingest_task(ModifyTask::Full(FullModificationTask { 352 filters: large_filters, 353 relays: relay_urls("wss://new-relay.example"), 354 })); 355 356 let new_size = subs.json_size(&id).unwrap(); 357 assert_ne!( 358 original_size, new_size, 359 "json_size should change after full modification" 360 ); 361 assert!( 362 new_size > original_size, 363 "larger filters should have larger json_size" 364 ); 365 } 366 367 fn filter_has_since(filter: &Filter, expected: u64) -> bool { 368 let json = filter.json().expect("filter json"); 369 json.contains(&format!("\"since\":{}", expected)) 370 } 371 372 /// Full flow: see_all sets last_seen, then since_optimize applies it to filters. 373 #[test] 374 fn see_all_then_since_optimize_applies_since_to_filters() { 375 let mut subs = OutboxSubscriptions::default(); 376 let id = OutboxSubId(11); 377 let filters = vec![ 378 Filter::new().kinds(vec![1]).build(), 379 Filter::new().kinds(vec![2]).build(), 380 ]; 381 subs.new_subscription( 382 id, 383 subscribe_task( 384 filters, 385 RelayUrlPkgs::new(relay_urls("wss://relay-since.example")), 386 ), 387 false, 388 ); 389 390 // Verify filters don't have since initially 391 let view = subs.view(&id).unwrap(); 392 for filter in view.filters.get_filters() { 393 let json = filter.json().expect("filter json"); 394 assert!( 395 !json.contains("\"since\""), 396 "filter should not have since initially" 397 ); 398 } 399 400 let timestamp = 1700000000u64; 401 let sub = subs.get_mut(&id).unwrap(); 402 sub.see_all(timestamp); 403 sub.filters.since_optimize(); 404 405 // Verify filters now have since 406 let view = subs.view(&id).unwrap(); 407 for filter in view.filters.get_filters() { 408 assert!( 409 filter_has_since(filter, timestamp), 410 "filter should have since after see_all + since_optimize" 411 ); 412 } 413 } 414 415 /// Filters accessed via view() should have since after optimization. 416 #[test] 417 fn view_returns_optimized_filters() { 418 let mut subs = OutboxSubscriptions::default(); 419 let id = OutboxSubId(12); 420 let filters = vec![Filter::new().kinds(vec![1]).build()]; 421 subs.new_subscription( 422 id, 423 subscribe_task( 424 filters, 425 RelayUrlPkgs::new(relay_urls("wss://relay-view.example")), 426 ), 427 false, 428 ); 429 430 let timestamp = 1234567890u64; 431 { 432 let sub = subs.get_mut(&id).unwrap(); 433 sub.see_all(timestamp); 434 sub.filters.since_optimize(); 435 } 436 437 // Access via view - should see the optimized filters 438 let view = subs.view(&id).unwrap(); 439 let filter = &view.filters.get_filters()[0]; 440 assert!( 441 filter_has_since(filter, timestamp), 442 "view should return filters with since applied" 443 ); 444 } 445 }