conversation.rs (10186B)
1 use std::cmp::Ordering; 2 3 use crate::{ 4 cache::{ 5 message_store::NotePkg, 6 registry::{ 7 ConversationId, ConversationIdentifierUnowned, ConversationRegistry, 8 ParticipantSetUnowned, 9 }, 10 }, 11 convo_renderable::ConversationRenderable, 12 nip17::{chatroom_filter, conversation_filter, get_participants}, 13 }; 14 15 use super::message_store::MessageStore; 16 use enostr::Pubkey; 17 use hashbrown::HashMap; 18 use nostrdb::{Ndb, Note, NoteKey, QueryResult, Subscription, Transaction}; 19 use notedeck::{note::event_tag, NoteCache, NoteRef, UnknownIds}; 20 21 pub struct ConversationCache { 22 pub registry: ConversationRegistry, 23 conversations: HashMap<ConversationId, Conversation>, 24 order: Vec<ConversationOrder>, 25 pub state: ConversationListState, 26 pub active: Option<ConversationId>, 27 } 28 29 impl ConversationCache { 30 pub fn new() -> Self { 31 Self::default() 32 } 33 34 pub fn len(&self) -> usize { 35 self.conversations.len() 36 } 37 38 pub fn is_empty(&self) -> bool { 39 self.conversations.is_empty() 40 } 41 42 pub fn get(&self, id: ConversationId) -> Option<&Conversation> { 43 self.conversations.get(&id) 44 } 45 46 pub fn get_id_by_index(&self, i: usize) -> Option<&ConversationId> { 47 Some(&self.order.get(i)?.id) 48 } 49 50 pub fn get_active(&self) -> Option<&Conversation> { 51 self.conversations.get(&self.active?) 52 } 53 54 /// A conversation is "opened" when the user navigates to the conversation 55 #[profiling::function] 56 pub fn open_conversation( 57 &mut self, 58 ndb: &Ndb, 59 txn: &Transaction, 60 id: ConversationId, 61 note_cache: &mut NoteCache, 62 unknown_ids: &mut UnknownIds, 63 selected: &Pubkey, 64 ) { 65 let Some(conversation) = self.conversations.get_mut(&id) else { 66 return; 67 }; 68 69 let pubkeys = conversation.metadata.participants.clone(); 70 let participants: Vec<&[u8; 32]> = pubkeys.iter().map(|p| p.bytes()).collect(); 71 72 // We should try and get more messages... this isn't ideal 73 let chatroom_filter = chatroom_filter(participants, selected); 74 75 let mut updated = false; 76 { 77 profiling::scope!("chatroom_filter"); 78 let results = match ndb.query(txn, &chatroom_filter, 500) { 79 Ok(r) => r, 80 Err(e) => { 81 tracing::error!("problem with chatroom filter ndb::query: {e:?}"); 82 return; 83 } 84 }; 85 86 for res in results { 87 let participants = get_participants(&res.note); 88 let parts = ParticipantSetUnowned::new(participants); 89 let cur_id = self 90 .registry 91 .get_or_insert(ConversationIdentifierUnowned::Nip17(parts)); 92 93 if cur_id != id { 94 // this note isn't relevant to the current conversation, unfortunately... 95 continue; 96 } 97 98 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &res.note); 99 updated |= conversation.ingest_kind_14(res.note, res.note_key); 100 } 101 } 102 103 if updated { 104 let latest = conversation.last_activity(); 105 refresh_order(&mut self.order, id, LatestMessage::Latest(latest)); 106 } 107 108 self.active = Some(id); 109 tracing::info!("Set active to {id}"); 110 } 111 112 #[profiling::function] 113 pub fn init_conversations( 114 &mut self, 115 ndb: &Ndb, 116 txn: &Transaction, 117 cur_acc: &Pubkey, 118 note_cache: &mut NoteCache, 119 unknown_ids: &mut UnknownIds, 120 ) { 121 let Some(results) = get_conversations(ndb, txn, cur_acc) else { 122 tracing::warn!("Got no conversations from ndb"); 123 return; 124 }; 125 126 tracing::trace!("Received {} conversations from ndb", results.len()); 127 128 for res in results { 129 self.ingest_chatroom_msg(res.note, res.note_key, ndb, txn, note_cache, unknown_ids); 130 } 131 } 132 133 #[profiling::function] 134 pub fn ingest_chatroom_msg( 135 &mut self, 136 note: Note, 137 key: NoteKey, 138 ndb: &Ndb, 139 txn: &Transaction, 140 note_cache: &mut NoteCache, 141 unknown_ids: &mut UnknownIds, 142 ) { 143 let participants = get_participants(¬e); 144 145 let id = self 146 .registry 147 .get_or_insert(ConversationIdentifierUnowned::Nip17( 148 ParticipantSetUnowned::new(participants.clone()), 149 )); 150 151 let conversation = self.conversations.entry(id).or_insert_with(|| { 152 let participants: Vec<Pubkey> = 153 participants.into_iter().map(|p| Pubkey::new(*p)).collect(); 154 155 Conversation::new(id, participants) 156 }); 157 158 tracing::trace!("ingesting into conversation id {id}: {:?}", note.json()); 159 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, ¬e); 160 if conversation.ingest_kind_14(note, key) { 161 let latest = conversation.last_activity(); 162 refresh_order(&mut self.order, id, LatestMessage::Latest(latest)); 163 } 164 } 165 166 pub fn initialize_conversation(&mut self, id: ConversationId, participants: Vec<Pubkey>) { 167 if self.conversations.contains_key(&id) { 168 return; 169 } 170 171 self.conversations 172 .insert(id, Conversation::new(id, participants)); 173 174 refresh_order(&mut self.order, id, LatestMessage::NoMessages); 175 } 176 177 pub fn first_convo_id(&self) -> Option<ConversationId> { 178 Some(self.order.first()?.id) 179 } 180 } 181 182 fn refresh_order(order: &mut Vec<ConversationOrder>, id: ConversationId, latest: LatestMessage) { 183 if let Some(pos) = order.iter().position(|entry| entry.id == id) { 184 order.remove(pos); 185 } 186 187 let entry = ConversationOrder { id, latest }; 188 let idx = match order.binary_search(&entry) { 189 Ok(idx) | Err(idx) => idx, 190 }; 191 order.insert(idx, entry); 192 } 193 194 #[derive(Clone, Copy, Debug)] 195 struct ConversationOrder { 196 id: ConversationId, 197 latest: LatestMessage, 198 } 199 200 #[derive(Clone, Copy, Debug, PartialEq, Eq)] 201 enum LatestMessage { 202 NoMessages, 203 Latest(u64), 204 } 205 206 impl PartialOrd for LatestMessage { 207 fn partial_cmp(&self, other: &Self) -> Option<Ordering> { 208 Some(self.cmp(other)) 209 } 210 } 211 212 impl Ord for LatestMessage { 213 fn cmp(&self, other: &Self) -> Ordering { 214 match (self, other) { 215 (LatestMessage::Latest(a), LatestMessage::Latest(b)) => a.cmp(b), 216 (LatestMessage::NoMessages, LatestMessage::NoMessages) => Ordering::Equal, 217 (LatestMessage::NoMessages, _) => Ordering::Greater, 218 (_, LatestMessage::NoMessages) => Ordering::Less, 219 } 220 } 221 } 222 223 impl PartialEq for ConversationOrder { 224 fn eq(&self, other: &Self) -> bool { 225 self.id == other.id 226 } 227 } 228 229 impl Eq for ConversationOrder {} 230 231 impl PartialOrd for ConversationOrder { 232 fn partial_cmp(&self, other: &Self) -> Option<Ordering> { 233 Some(self.cmp(other)) 234 } 235 } 236 237 impl Ord for ConversationOrder { 238 fn cmp(&self, other: &Self) -> Ordering { 239 // newer first 240 match other.latest.cmp(&self.latest) { 241 Ordering::Equal => self.id.cmp(&other.id), 242 non_eq => non_eq, 243 } 244 } 245 } 246 247 pub struct Conversation { 248 pub id: ConversationId, 249 pub messages: MessageStore, 250 pub metadata: ConversationMetadata, 251 pub renderable: ConversationRenderable, 252 } 253 254 impl Conversation { 255 pub fn new(id: ConversationId, participants: Vec<Pubkey>) -> Self { 256 Self { 257 id, 258 messages: MessageStore::default(), 259 metadata: ConversationMetadata::new(participants), 260 renderable: ConversationRenderable::new(&[]), 261 } 262 } 263 264 fn last_activity(&self) -> u64 { 265 self.messages.newest_timestamp().unwrap_or(0) 266 } 267 268 pub fn ingest_kind_14(&mut self, note: Note, key: NoteKey) -> bool { 269 if note.kind() != 14 { 270 tracing::error!("tried to ingest a non-kind 14 note..."); 271 return false; 272 } 273 274 if let Some(title) = event_tag(¬e, "subject") { 275 let created = note.created_at(); 276 277 if self 278 .metadata 279 .title 280 .as_ref() 281 .is_none_or(|cur| created > cur.last_modified) 282 { 283 self.metadata.title = Some(TitleMetadata { 284 title: title.to_string(), 285 last_modified: created, 286 }); 287 } 288 } 289 290 let inserted = self.messages.insert(NotePkg { 291 note_ref: NoteRef { 292 key, 293 created_at: note.created_at(), 294 }, 295 author: Pubkey::new(*note.pubkey()), 296 }); 297 298 if inserted { 299 self.renderable = ConversationRenderable::new(&self.messages.messages_ordered); 300 } 301 302 inserted 303 } 304 } 305 306 impl Default for ConversationCache { 307 fn default() -> Self { 308 Self { 309 registry: ConversationRegistry::default(), 310 conversations: HashMap::new(), 311 order: Vec::new(), 312 state: Default::default(), 313 active: None, 314 } 315 } 316 } 317 318 #[profiling::function] 319 fn get_conversations<'a>( 320 ndb: &Ndb, 321 txn: &'a Transaction, 322 cur_acc: &Pubkey, 323 ) -> Option<Vec<QueryResult<'a>>> { 324 match ndb.query(txn, &conversation_filter(cur_acc), 500) { 325 Ok(r) => Some(r), 326 Err(e) => { 327 tracing::error!("error fetching kind 14 messages: {e}"); 328 None 329 } 330 } 331 } 332 333 #[derive(Clone, Debug, Default)] 334 pub struct ConversationMetadata { 335 pub title: Option<TitleMetadata>, 336 pub participants: Vec<Pubkey>, 337 } 338 339 #[derive(Clone, Debug)] 340 pub struct TitleMetadata { 341 pub title: String, 342 pub last_modified: u64, 343 } 344 345 impl ConversationMetadata { 346 pub fn new(participants: Vec<Pubkey>) -> Self { 347 Self { 348 title: None, 349 participants, 350 } 351 } 352 } 353 354 #[derive(Default)] 355 pub enum ConversationListState { 356 #[default] 357 Initializing, 358 Initialized(Option<Subscription>), // conversation list filter 359 }