notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit 21c00a41e0b6b62db22327c89cee7c06dbab5057
parent bf34175560076e3b112e9186943cf105e8d94726
Author: William Casarin <jb55@jb55.com>
Date:   Mon,  2 Sep 2024 17:47:54 -0700

Merge contact list fetching

William Casarin (15):
      cli: add --pub support for watchonly accounts
      column: extract into_timeline logic into ColumnKind
      contacts: fix hashtags in filter_from_tags
      docs: fix comment in the wrong spot
      fetch contact lists
      filter: create filter from contact list
      nostrdb: bump version
      perf: coordinate unknown id lookups
      refactor: move args to its own file
      tidy: move ColumnKind to its own file
      tidy: move parse_args to Args::parse
      tidy: organize bools
      tidy: remove some crate:: namespaces
      timeline: initial contact queries

Fixes: https://github.com/damus-io/notedeck/issues/236
Fixes: https://github.com/damus-io/notedeck/issues/6

Diffstat:
MCargo.lock | 2+-
MCargo.toml | 2+-
Menostr/Cargo.toml | 2+-
Menostr/src/keypair.rs | 4++--
Menostr/src/pubkey.rs | 13++++++++++---
Msrc/actionbar.rs | 2+-
Msrc/app.rs | 704++++++++++++++++++++++++++++++++++++-------------------------------------------
Asrc/args.rs | 189+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/column.rs | 97+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/error.rs | 35+++++++++++++++++++++++++++++++++--
Msrc/filter.rs | 167++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Msrc/lib.rs | 4++++
Asrc/subscriptions.rs | 23+++++++++++++++++++++++
Msrc/thread.rs | 12++++++------
Msrc/timeline.rs | 81++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------
Msrc/ui/profile/picture.rs | 2+-
Msrc/ui/thread.rs | 10+++-------
Asrc/unknowns.rs | 278+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
18 files changed, 1183 insertions(+), 444 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -2296,7 +2296,7 @@ dependencies = [ [[package]] name = "nostrdb" version = "0.3.4" -source = "git+https://github.com/damus-io/nostrdb-rs?rev=4c89dcbca13168758eb41752225b4e486dbc9d20#4c89dcbca13168758eb41752225b4e486dbc9d20" +source = "git+https://github.com/damus-io/nostrdb-rs?rev=6d22af6d5159be4c9e4579f8c9d3af836e0d470a#6d22af6d5159be4c9e4579f8c9d3af836e0d470a" dependencies = [ "bindgen", "cc", diff --git a/Cargo.toml b/Cargo.toml @@ -28,7 +28,7 @@ serde_derive = "1" serde = { version = "1", features = ["derive"] } # You only need this if you want app persistence tracing = "0.1.40" #wasm-bindgen = "0.2.83" -nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "4c89dcbca13168758eb41752225b4e486dbc9d20" } +nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "6d22af6d5159be4c9e4579f8c9d3af836e0d470a" } #nostrdb = { path = "/Users/jb55/dev/github/damus-io/nostrdb-rs" } #nostrdb = "0.3.4" enostr = { path = "enostr" } diff --git a/enostr/Cargo.toml b/enostr/Cargo.toml @@ -11,7 +11,7 @@ serde_derive = "1" serde = { version = "1", features = ["derive"] } # You only need this if you want app persistence serde_json = "1.0.89" nostr = { version = "0.30.0" } -nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "4c89dcbca13168758eb41752225b4e486dbc9d20" } +nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "6d22af6d5159be4c9e4579f8c9d3af836e0d470a" } hex = "0.4.3" tracing = "0.1.40" env_logger = "0.11.1" diff --git a/enostr/src/keypair.rs b/enostr/src/keypair.rs @@ -16,7 +16,7 @@ impl Keypair { let cloned_secret_key = secret_key.clone(); let nostr_keys = nostr::Keys::new(secret_key); Keypair { - pubkey: Pubkey::new(&nostr_keys.public_key().to_bytes()), + pubkey: Pubkey::new(nostr_keys.public_key().to_bytes()), secret_key: Some(cloned_secret_key), } } @@ -61,7 +61,7 @@ impl FullKeypair { let (xopk, _) = secret_key.x_only_public_key(&nostr::SECP256K1); let secret_key = nostr::SecretKey::from(*secret_key); FullKeypair { - pubkey: Pubkey::new(&xopk.serialize()), + pubkey: Pubkey::new(xopk.serialize()), secret_key, } } diff --git a/enostr/src/pubkey.rs b/enostr/src/pubkey.rs @@ -5,14 +5,14 @@ use nostr::bech32::Hrp; use std::fmt; use tracing::debug; -#[derive(Debug, Eq, PartialEq, Clone, Hash)] +#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash)] pub struct Pubkey([u8; 32]); static HRP_NPUB: Hrp = Hrp::parse_unchecked("npub"); impl Pubkey { - pub fn new(data: &[u8; 32]) -> Self { - Self(*data) + pub fn new(data: [u8; 32]) -> Self { + Self(data) } pub fn hex(&self) -> String { @@ -23,6 +23,13 @@ impl Pubkey { &self.0 } + pub fn parse(s: &str) -> Result<Self, Error> { + match Pubkey::from_hex(s) { + Ok(pk) => Ok(pk), + Err(_) => Pubkey::try_from_bech32_string(s, false), + } + } + pub fn from_hex(hex_str: &str) -> Result<Self, Error> { Ok(Pubkey(hex::decode(hex_str)?.as_slice().try_into()?)) } diff --git a/src/actionbar.rs b/src/actionbar.rs @@ -76,7 +76,7 @@ fn open_thread( // an active subscription for this thread. if thread.subscription().is_none() { let filters = Thread::filters(root_id); - *thread.subscription_mut() = app.ndb.subscribe(filters.clone()).ok(); + *thread.subscription_mut() = app.ndb.subscribe(&filters).ok(); if thread.remote_subscription().is_some() { error!("Found active remote subscription when it was not expected"); diff --git a/src/app.rs b/src/app.rs @@ -2,7 +2,11 @@ use crate::account_manager::AccountManager; use crate::actionbar::BarResult; use crate::app_creation::setup_cc; use crate::app_style::user_requested_visuals_change; +use crate::args::Args; +use crate::column::ColumnKind; use crate::draft::Drafts; +use crate::error::{Error, FilterError}; +use crate::filter::FilterState; use crate::frame_history::FrameHistory; use crate::imgcache::ImageCache; use crate::key_storage::KeyStorageType; @@ -10,24 +14,26 @@ use crate::note::NoteRef; use crate::notecache::{CachedNote, NoteCache}; use crate::relay_pool_manager::RelayPoolManager; use crate::route::Route; +use crate::subscriptions::{SubKind, Subscriptions}; use crate::thread::{DecrementResult, Threads}; use crate::timeline::{Timeline, TimelineSource, ViewFilter}; use crate::ui::note::PostAction; use crate::ui::{self, AccountSelectionWidget, DesktopGlobalPopup}; use crate::ui::{DesktopSidePanel, RelayView, View}; -use crate::Result; +use crate::unknowns::UnknownIds; +use crate::{filter, Result}; use egui_nav::{Nav, NavAction}; -use enostr::{ClientMessage, Keypair, RelayEvent, RelayMessage, RelayPool, SecretKey}; +use enostr::{ClientMessage, RelayEvent, RelayMessage, RelayPool}; use std::cell::RefCell; use std::rc::Rc; +use uuid::Uuid; use egui::{Context, Frame, Style}; use egui_extras::{Size, StripBuilder}; -use nostrdb::{BlockType, Config, Filter, Mention, Ndb, Note, NoteKey, Transaction}; +use nostrdb::{Config, Filter, Ndb, Note, Transaction}; -use std::collections::HashSet; -use std::hash::Hash; +use std::collections::HashMap; use std::path::Path; use std::time::Duration; use tracing::{debug, error, info, trace, warn}; @@ -41,26 +47,29 @@ pub enum DamusState { /// We derive Deserialize/Serialize so we can persist app state on shutdown. pub struct Damus { state: DamusState, - //compose: String, note_cache: NoteCache, pub pool: RelayPool, - is_mobile: bool, - pub since_optimize: bool, /// global navigation for account management popups, etc. pub global_nav: Vec<Route>, - pub textmode: bool, pub timelines: Vec<Timeline>, pub selected_timeline: i32, pub ndb: Ndb, + pub unknown_ids: UnknownIds, pub drafts: Drafts, pub threads: Threads, pub img_cache: ImageCache, pub account_manager: AccountManager, + pub subscriptions: Subscriptions, frame_history: crate::frame_history::FrameHistory, + + // TODO: make these flags + is_mobile: bool, + pub since_optimize: bool, + pub textmode: bool, pub show_account_switcher: bool, pub show_global_popup: bool, } @@ -90,47 +99,90 @@ fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) { } } -/// Should we since optimize? Not always. For examplem if we only have a few -/// notes locally. One way to determine this is by looking at the current filter -/// and seeing what its limit is. If we have less notes than the limit, -/// we might want to backfill older notes -fn send_initial_filters(damus: &mut Damus, relay_url: &str) { - info!("Sending initial filters to {}", relay_url); - let mut c: u32 = 1; - +fn send_initial_timeline_filter(damus: &mut Damus, timeline: usize) { let can_since_optimize = damus.since_optimize; - for relay in &mut damus.pool.relays { - let relay = &mut relay.relay; - if relay.url == relay_url { - for timeline in &damus.timelines { - let filter = timeline.filter.clone(); - let new_filters = filter.into_iter().map(|f| { - // limit the size of remote filters - let default_limit = crate::filter::default_remote_limit(); - let mut lim = f.limit().unwrap_or(default_limit); - let mut filter = f; - if lim > default_limit { - lim = default_limit; - filter = filter.limit_mut(lim); - } - let notes = timeline.notes(ViewFilter::NotesAndReplies); - if can_since_optimize && crate::filter::should_since_optimize(lim, notes.len()) { - filter = crate::filter::since_optimize_filter(filter, notes); - } else { - warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter); - } + let filter_state = damus.timelines[timeline].filter.clone(); - filter - }).collect(); - relay.subscribe(format!("initial{}", c), new_filters); - c += 1; - } - return; + match filter_state { + FilterState::Broken(err) => { + error!( + "FetchingRemote state in broken state when sending initial timeline filter? {err}" + ); + } + + FilterState::FetchingRemote(_unisub) => { + error!("FetchingRemote state when sending initial timeline filter?"); + } + + FilterState::GotRemote(_sub) => { + error!("GotRemote state when sending initial timeline filter?"); + } + + FilterState::Ready(filter) => { + let filter = filter.to_owned(); + let new_filters = filter.into_iter().map(|f| { + // limit the size of remote filters + let default_limit = filter::default_remote_limit(); + let mut lim = f.limit().unwrap_or(default_limit); + let mut filter = f; + if lim > default_limit { + lim = default_limit; + filter = filter.limit_mut(lim); + } + + let notes = damus.timelines[timeline].notes(ViewFilter::NotesAndReplies); + + // Should we since optimize? Not always. For example + // if we only have a few notes locally. One way to + // determine this is by looking at the current filter + // and seeing what its limit is. If we have less + // notes than the limit, we might want to backfill + // older notes + if can_since_optimize && filter::should_since_optimize(lim, notes.len()) { + filter = filter::since_optimize_filter(filter, notes); + } else { + warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter); + } + + filter + }).collect(); + + let sub_id = Uuid::new_v4().to_string(); + damus + .subscriptions() + .insert(sub_id.clone(), SubKind::Initial); + + damus.pool.subscribe(sub_id, new_filters); + } + + // we need some data first + FilterState::NeedsRemote(filter) => { + let sub_id = Uuid::new_v4().to_string(); + let uid = damus.timelines[timeline].uid; + let local_sub = damus.ndb.subscribe(&filter).expect("sub"); + + damus.timelines[timeline].filter = + FilterState::fetching_remote(sub_id.clone(), local_sub); + + damus + .subscriptions() + .insert(sub_id.clone(), SubKind::FetchingContactList(uid)); + + damus.pool.subscribe(sub_id, filter.to_owned()); } } } +fn send_initial_filters(damus: &mut Damus, relay_url: &str) { + info!("Sending initial filters to {}", relay_url); + let timelines = damus.timelines.len(); + + for i in 0..timelines { + send_initial_timeline_filter(damus, i); + } +} + enum ContextAction { SetPixelsPerPoint(f32), } @@ -211,137 +263,89 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> { } } - let txn = Transaction::new(&damus.ndb)?; - let mut unknown_ids: HashSet<UnknownId> = HashSet::new(); for timeline in 0..damus.timelines.len() { let src = TimelineSource::column(timeline); - if let Err(err) = src.poll_notes_into_view(damus, &txn, &mut unknown_ids) { - error!("{}", err); + + if let Ok(true) = is_timeline_ready(damus, timeline) { + if let Err(err) = src.poll_notes_into_view(damus) { + error!("poll_notes_into_view: {err}"); + } + } else { + // TODO: show loading? } } - let unknown_ids: Vec<UnknownId> = unknown_ids.into_iter().collect(); - if let Some(filters) = get_unknown_ids_filter(&unknown_ids) { - info!( - "Getting {} unknown author profiles from relays", - unknown_ids.len() - ); - let msg = ClientMessage::req("unknown_ids".to_string(), filters); - damus.pool.send(&msg); + if damus.unknown_ids.ready_to_send() { + unknown_id_send(damus); } Ok(()) } -#[derive(Hash, Clone, Copy, PartialEq, Eq)] -pub enum UnknownId<'a> { - Pubkey(&'a [u8; 32]), - Id(&'a [u8; 32]), +fn unknown_id_send(damus: &mut Damus) { + let filter = damus.unknown_ids.filter().expect("filter"); + info!( + "Getting {} unknown ids from relays", + damus.unknown_ids.ids().len() + ); + let msg = ClientMessage::req("unknownids".to_string(), filter); + damus.unknown_ids.clear(); + damus.pool.send(&msg); } -impl<'a> UnknownId<'a> { - pub fn is_pubkey(&self) -> Option<&'a [u8; 32]> { - match self { - UnknownId::Pubkey(pk) => Some(pk), - _ => None, - } - } +/// Check our timeline filter and see if we have any filter data ready. +/// Our timelines may require additional data before it is functional. For +/// example, when we have to fetch a contact list before we do the actual +/// following list query. +fn is_timeline_ready(damus: &mut Damus, timeline: usize) -> Result<bool> { + let sub = match &damus.timelines[timeline].filter { + FilterState::GotRemote(sub) => *sub, + FilterState::Ready(_f) => return Ok(true), + _ => return Ok(false), + }; - pub fn is_id(&self) -> Option<&'a [u8; 32]> { - match self { - UnknownId::Id(id) => Some(id), - _ => None, - } + // We got at least one eose for our filter request. Let's see + // if nostrdb is done processing it yet. + let res = damus.ndb.poll_for_notes(sub, 1); + if res.is_empty() { + debug!("check_timeline_filter_state: no notes found (yet?) for timeline {timeline}"); + return Ok(false); } -} -/// Look for missing notes in various parts of notes that we see: -/// -/// - pubkeys and notes mentioned inside the note -/// - notes being replied to -/// -/// We return all of this in a HashSet so that we can fetch these from -/// remote relays. -/// -pub fn get_unknown_note_ids<'a>( - ndb: &Ndb, - cached_note: &CachedNote, - txn: &'a Transaction, - note: &Note<'a>, - note_key: NoteKey, - ids: &mut HashSet<UnknownId<'a>>, -) -> Result<()> { - // the author pubkey - - if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { - ids.insert(UnknownId::Pubkey(note.pubkey())); - } + info!("notes found for contact timeline after GotRemote!"); - // pull notes that notes are replying to - if cached_note.reply.root.is_some() { - let note_reply = cached_note.reply.borrow(note.tags()); - if let Some(root) = note_reply.root() { - if ndb.get_note_by_id(txn, root.id).is_err() { - ids.insert(UnknownId::Id(root.id)); - } - } + let note_key = res[0]; - if !note_reply.is_reply_to_root() { - if let Some(reply) = note_reply.reply() { - if ndb.get_note_by_id(txn, reply.id).is_err() { - ids.insert(UnknownId::Id(reply.id)); - } - } - } - } + let filter = { + let txn = Transaction::new(&damus.ndb).expect("txn"); + let note = damus.ndb.get_note_by_key(&txn, note_key).expect("note"); + filter::filter_from_tags(&note).map(|f| f.into_follow_filter()) + }; - let blocks = ndb.get_blocks_by_key(txn, note_key)?; - for block in blocks.iter(note) { - if block.blocktype() != BlockType::MentionBech32 { - continue; + // TODO: into_follow_filter is hardcoded to contact lists, let's generalize + match filter { + Err(Error::Filter(e)) => { + error!("got broken when building filter {e}"); + damus.timelines[timeline].filter = FilterState::broken(e); } - - match block.as_mention().unwrap() { - Mention::Pubkey(npub) => { - if ndb.get_profile_by_pubkey(txn, npub.pubkey()).is_err() { - ids.insert(UnknownId::Pubkey(npub.pubkey())); - } - } - Mention::Profile(nprofile) => { - if ndb.get_profile_by_pubkey(txn, nprofile.pubkey()).is_err() { - ids.insert(UnknownId::Pubkey(nprofile.pubkey())); - } - } - Mention::Event(ev) => match ndb.get_note_by_id(txn, ev.id()) { - Err(_) => { - ids.insert(UnknownId::Id(ev.id())); - if let Some(pk) = ev.pubkey() { - if ndb.get_profile_by_pubkey(txn, pk).is_err() { - ids.insert(UnknownId::Pubkey(pk)); - } - } - } - Ok(note) => { - if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { - ids.insert(UnknownId::Pubkey(note.pubkey())); - } - } - }, - Mention::Note(note) => match ndb.get_note_by_id(txn, note.id()) { - Err(_) => { - ids.insert(UnknownId::Id(note.id())); - } - Ok(note) => { - if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { - ids.insert(UnknownId::Pubkey(note.pubkey())); - } - } - }, - _ => {} + Err(err) => { + error!("got broken when building filter {err}"); + damus.timelines[timeline].filter = FilterState::broken(FilterError::EmptyContactList); + return Err(err); + } + Ok(filter) => { + // we just switched to the ready state, we should send initial + // queries and setup the local subscription + info!("Found contact list! Setting up local and remote contact list query"); + setup_initial_timeline(damus, timeline, &filter).expect("setup init"); + damus.timelines[timeline].filter = FilterState::ready(filter.clone()); + + let subid = Uuid::new_v4().to_string(); + damus.pool.subscribe(subid, filter) } } - Ok(()) + Ok(true) } #[cfg(feature = "profiling")] @@ -349,45 +353,61 @@ fn setup_profiling() { puffin::set_scopes_on(true); // tell puffin to collect data } +fn setup_initial_timeline(damus: &mut Damus, timeline: usize, filters: &[Filter]) -> Result<()> { + damus.timelines[timeline].subscription = Some(damus.ndb.subscribe(filters)?); + let txn = Transaction::new(&damus.ndb)?; + debug!( + "querying nostrdb sub {:?} {:?}", + damus.timelines[timeline].subscription, damus.timelines[timeline].filter + ); + let lim = filters[0].limit().unwrap_or(crate::filter::default_limit()) as i32; + let results = damus.ndb.query(&txn, filters, lim)?; + + let filters = { + let views = &damus.timelines[timeline].views; + let filters: Vec<fn(&CachedNote, &Note) -> bool> = + views.iter().map(|v| v.filter.filter()).collect(); + filters + }; + + for result in results { + for (view, filter) in filters.iter().enumerate() { + if filter( + damus + .note_cache_mut() + .cached_note_or_insert_mut(result.note_key, &result.note), + &result.note, + ) { + damus.timelines[timeline].views[view].notes.push(NoteRef { + key: result.note_key, + created_at: result.note.created_at(), + }) + } + } + } + + Ok(()) +} + fn setup_initial_nostrdb_subs(damus: &mut Damus) -> Result<()> { let timelines = damus.timelines.len(); for i in 0..timelines { - let filters = damus.timelines[i].filter.clone(); - damus.timelines[i].subscription = Some(damus.ndb.subscribe(filters.clone())?); - let txn = Transaction::new(&damus.ndb)?; - debug!( - "querying nostrdb sub {} {:?}", - damus.timelines[i].subscription.as_ref().unwrap().id, - damus.timelines[i].filter - ); - let results = damus.ndb.query( - &txn, - filters, - damus.timelines[i].filter[0] - .limit() - .unwrap_or(crate::filter::default_limit()) as i32, - )?; - - let filters = { - let views = &damus.timelines[i].views; - let filters: Vec<fn(&CachedNote, &Note) -> bool> = - views.iter().map(|v| v.filter.filter()).collect(); - filters - }; + let filter = damus.timelines[i].filter.clone(); + match filter { + FilterState::Ready(filters) => setup_initial_timeline(damus, i, &filters)?, - for result in results { - for (j, filter) in filters.iter().enumerate() { - if filter( - damus - .note_cache_mut() - .cached_note_or_insert_mut(result.note_key, &result.note), - &result.note, - ) { - damus.timelines[i].views[j].notes.push(NoteRef { - key: result.note_key, - created_at: result.note.created_at(), - }) - } + FilterState::Broken(err) => { + error!("FetchingRemote state broken in setup_initial_nostr_subs: {err}") + } + FilterState::FetchingRemote(_) => { + error!("FetchingRemote state in setup_initial_nostr_subs") + } + FilterState::GotRemote(_) => { + error!("GotRemote state in setup_initial_nostr_subs") + } + FilterState::NeedsRemote(_filters) => { + // can't do anything yet, we defer to first connect to send + // remote filters } } } @@ -401,6 +421,10 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) { setup_profiling(); damus.state = DamusState::Initialized; + // this lets our eose handler know to close unknownids right away + damus + .subscriptions() + .insert("unknownids".to_string(), SubKind::OneShot); setup_initial_nostrdb_subs(damus).expect("home subscription failed"); } @@ -419,82 +443,75 @@ fn process_event(damus: &mut Damus, _subid: &str, event: &str) { } } -fn get_unknown_ids<'a>(txn: &'a Transaction, damus: &mut Damus) -> Result<Vec<UnknownId<'a>>> { - #[cfg(feature = "profiling")] - puffin::profile_function!(); - - let mut ids: HashSet<UnknownId> = HashSet::new(); - let mut new_cached_notes: Vec<(NoteKey, CachedNote)> = vec![]; - - for timeline in &damus.timelines { - for noteref in timeline.notes(ViewFilter::NotesAndReplies) { - let note = damus.ndb.get_note_by_key(txn, noteref.key)?; - let note_key = note.key().unwrap(); - let cached_note = damus.note_cache().cached_note(noteref.key); - let cached_note = if let Some(cn) = cached_note { - cn.clone() - } else { - let new_cached_note = CachedNote::new(&note); - new_cached_notes.push((note_key, new_cached_note.clone())); - new_cached_note - }; +fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> { + let sub_kind = if let Some(sub_kind) = damus.subscriptions().get(subid) { + sub_kind + } else { + let n_subids = damus.subscriptions().len(); + warn!( + "got unknown eose subid {}, {} tracked subscriptions", + subid, n_subids + ); + return Ok(()); + }; - let _ = get_unknown_note_ids( - &damus.ndb, - &cached_note, - txn, - &note, - note.key().unwrap(), - &mut ids, - ); + match *sub_kind { + SubKind::Initial => { + let txn = Transaction::new(&damus.ndb)?; + UnknownIds::update(&txn, damus); + // this is possible if this is the first time + if damus.unknown_ids.ready_to_send() { + unknown_id_send(damus); + } } - } - - // This is mainly done to avoid the double mutable borrow that would happen - // if we tried to update the note_cache mutably in the loop above - for (note_key, note) in new_cached_notes { - damus.note_cache_mut().cache_mut().insert(note_key, note); - } - Ok(ids.into_iter().collect()) -} - -fn get_unknown_ids_filter(ids: &[UnknownId<'_>]) -> Option<Vec<Filter>> { - if ids.is_empty() { - return None; - } - - let mut filters: Vec<Filter> = vec![]; - - let pks: Vec<&[u8; 32]> = ids.iter().flat_map(|id| id.is_pubkey()).collect(); - if !pks.is_empty() { - let pk_filter = Filter::new().authors(pks).kinds([0]).build(); + // oneshot subs just close when they're done + SubKind::OneShot => { + let msg = ClientMessage::close(subid.to_string()); + damus.pool.send_to(&msg, relay_url); + } - filters.push(pk_filter); - } + SubKind::FetchingContactList(timeline_uid) => { + let timeline_ind = if let Some(i) = damus.find_timeline(timeline_uid) { + i + } else { + error!( + "timeline uid:{} not found for FetchingContactList", + timeline_uid + ); + return Ok(()); + }; - let note_ids: Vec<&[u8; 32]> = ids.iter().flat_map(|id| id.is_id()).collect(); - if !note_ids.is_empty() { - filters.push(Filter::new().ids(note_ids).build()); - } + let local_sub = if let FilterState::FetchingRemote(unisub) = + &damus.timelines[timeline_ind].filter + { + unisub.local + } else { + // TODO: we could have multiple contact list results, we need + // to check to see if this one is newer and use that instead + warn!( + "Expected timeline to have FetchingRemote state but was {:?}", + damus.timelines[timeline_ind].filter + ); + return Ok(()); + }; - Some(filters) -} + damus.timelines[timeline_ind].filter = FilterState::got_remote(local_sub); -fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> { - if subid.starts_with("initial") { - let txn = Transaction::new(&damus.ndb)?; - let ids = get_unknown_ids(&txn, damus)?; - if let Some(filters) = get_unknown_ids_filter(&ids) { - info!("Getting {} unknown ids from {}", ids.len(), relay_url); - let msg = ClientMessage::req("unknown_ids".to_string(), filters); - damus.pool.send_to(&msg, relay_url); + /* + // see if we're fast enough to catch a processed contact list + let note_keys = damus.ndb.poll_for_notes(local_sub, 1); + if !note_keys.is_empty() { + debug!("fast! caught contact list from {relay_url} right away"); + let txn = Transaction::new(&damus.ndb)?; + let note_key = note_keys[0]; + let nr = damus.ndb.get_note_by_key(&txn, note_key)?; + let filter = filter::filter_from_tags(&nr)?.into_follow_filter(); + setup_initial_timeline(damus, timeline, &filter) + damus.timelines[timeline_ind].filter = FilterState::ready(filter); + } + */ } - } else if subid == "unknown_ids" { - let msg = ClientMessage::close("unknown_ids".to_string()); - damus.pool.send_to(&msg, relay_url); - } else { - warn!("got unknown eose subid {}", subid); } Ok(()) @@ -526,128 +543,6 @@ fn render_damus(damus: &mut Damus, ctx: &Context) { puffin_egui::profiler_window(ctx); } -struct Args { - timelines: Vec<Timeline>, - relays: Vec<String>, - is_mobile: Option<bool>, - keys: Vec<Keypair>, - since_optimize: bool, - light: bool, - dbpath: Option<String>, -} - -fn parse_args(args: &[String]) -> Args { - let mut res = Args { - timelines: vec![], - relays: vec![], - is_mobile: None, - keys: vec![], - light: false, - since_optimize: true, - dbpath: None, - }; - - let mut i = 0; - let len = args.len(); - while i < len { - let arg = &args[i]; - - if arg == "--mobile" { - res.is_mobile = Some(true); - } else if arg == "--light" { - res.light = true; - } else if arg == "--dark" { - res.light = false; - } else if arg == "--pub" || arg == "npub" { - // TODO: npub watch-only accounts - } else if arg == "--sec" || arg == "--nsec" { - i += 1; - let secstr = if let Some(next_arg) = args.get(i) { - next_arg - } else { - error!("sec argument missing?"); - continue; - }; - - if let Ok(sec) = SecretKey::parse(secstr) { - res.keys.push(Keypair::from_secret(sec)); - } else { - error!( - "failed to parse {} argument. Make sure to use hex or nsec.", - arg - ); - } - } else if arg == "--no-since-optimize" { - res.since_optimize = false; - } else if arg == "--filter" { - i += 1; - let filter = if let Some(next_arg) = args.get(i) { - next_arg - } else { - error!("filter argument missing?"); - continue; - }; - - if let Ok(filter) = Filter::from_json(filter) { - res.timelines.push(Timeline::new(vec![filter])); - } else { - error!("failed to parse filter '{}'", filter); - } - } else if arg == "--dbpath" { - i += 1; - let path = if let Some(next_arg) = args.get(i) { - next_arg - } else { - error!("dbpath argument missing?"); - continue; - }; - res.dbpath = Some(path.clone()); - } else if arg == "-r" || arg == "--relay" { - i += 1; - let relay = if let Some(next_arg) = args.get(i) { - next_arg - } else { - error!("relay argument missing?"); - continue; - }; - res.relays.push(relay.clone()); - } else if arg == "--filter-file" || arg == "-f" { - i += 1; - let filter_file = if let Some(next_arg) = args.get(i) { - next_arg - } else { - error!("filter file argument missing?"); - continue; - }; - - let data = if let Ok(data) = std::fs::read(filter_file) { - data - } else { - error!("failed to read filter file '{}'", filter_file); - continue; - }; - - if let Some(filter) = std::str::from_utf8(&data) - .ok() - .and_then(|s| Filter::from_json(s).ok()) - { - res.timelines.push(Timeline::new(vec![filter])); - } else { - error!("failed to parse filter in '{}'", filter_file); - } - } - - i += 1; - } - - if res.timelines.is_empty() { - let filter = Filter::from_json(include_str!("../queries/timeline.json")).unwrap(); - res.timelines.push(Timeline::new(vec![filter])); - } - - res -} - /* fn determine_key_storage_type() -> KeyStorageType { #[cfg(target_os = "macos")] @@ -675,7 +570,7 @@ impl Damus { args: Vec<String>, ) -> Self { // arg parsing - let parsed_args = parse_args(&args); + let parsed_args = Args::parse(&args); let is_mobile = parsed_args.is_mobile.unwrap_or(ui::is_compiled_as_mobile()); setup_cc(cc, is_mobile, parsed_args.light); @@ -728,9 +623,24 @@ impl Damus { pool }; + let account = account_manager + .get_selected_account() + .as_ref() + .map(|a| a.pubkey.bytes()); + let ndb = Ndb::new(&dbpath, &config).expect("ndb"); + + let mut timelines: Vec<Timeline> = Vec::with_capacity(parsed_args.columns.len()); + for col in parsed_args.columns { + if let Some(timeline) = col.into_timeline(&ndb, account) { + timelines.push(timeline); + } + } + Self { pool, is_mobile, + unknown_ids: UnknownIds::default(), + subscriptions: Subscriptions::default(), since_optimize: parsed_args.since_optimize, threads: Threads::default(), drafts: Drafts::default(), @@ -738,11 +648,10 @@ impl Damus { img_cache: ImageCache::new(imgcache_dir), note_cache: NoteCache::default(), selected_timeline: 0, - timelines: parsed_args.timelines, + timelines, textmode: false, - ndb: Ndb::new(&dbpath, &config).expect("ndb"), + ndb, account_manager, - //compose: "".to_string(), frame_history: FrameHistory::default(), show_account_switcher: false, show_global_popup: false, @@ -753,7 +662,10 @@ impl Damus { pub fn mock<P: AsRef<Path>>(data_path: P, is_mobile: bool) -> Self { let mut timelines: Vec<Timeline> = vec![]; let filter = Filter::from_json(include_str!("../queries/global.json")).unwrap(); - timelines.push(Timeline::new(vec![filter])); + timelines.push(Timeline::new( + ColumnKind::Universe, + FilterState::ready(vec![filter]), + )); let imgcache_dir = data_path.as_ref().join(ImageCache::rel_datadir()); let _ = std::fs::create_dir_all(imgcache_dir.clone()); @@ -762,6 +674,8 @@ impl Damus { config.set_ingester_threads(2); Self { is_mobile, + unknown_ids: UnknownIds::default(), + subscriptions: Subscriptions::default(), since_optimize: true, threads: Threads::default(), drafts: Drafts::default(), @@ -781,6 +695,20 @@ impl Damus { } } + pub fn find_timeline(&self, uid: u32) -> Option<usize> { + for (i, timeline) in self.timelines.iter().enumerate() { + if timeline.uid == uid { + return Some(i); + } + } + + None + } + + pub fn subscriptions(&mut self) -> &mut HashMap<String, SubKind> { + &mut self.subscriptions.subs + } + pub fn note_cache_mut(&mut self) -> &mut NoteCache { &mut self.note_cache } @@ -927,13 +855,17 @@ fn thread_unsubscribe(app: &mut Damus, id: &[u8; 32]) { }; match unsubscribe { - Ok(DecrementResult::LastSubscriber(sub_id)) => { - if let Err(e) = app.ndb.unsubscribe(sub_id) { - error!("failed to unsubscribe from thread: {e}, subid:{sub_id}, {} active subscriptions", app.ndb.subscription_count()); + Ok(DecrementResult::LastSubscriber(sub)) => { + if let Err(e) = app.ndb.unsubscribe(sub) { + error!( + "failed to unsubscribe from thread: {e}, subid:{}, {} active subscriptions", + sub.id(), + app.ndb.subscription_count() + ); } else { info!( "Unsubscribed from thread subid:{}. {} active subscriptions", - sub_id, + sub.id(), app.ndb.subscription_count() ); } diff --git a/src/args.rs b/src/args.rs @@ -0,0 +1,189 @@ +use crate::column::{ColumnKind, PubkeySource}; +use crate::filter::FilterState; +use crate::timeline::Timeline; +use enostr::{Filter, Keypair, Pubkey, SecretKey}; +use nostrdb::Ndb; +use tracing::{error, info}; + +pub struct Args { + pub columns: Vec<ArgColumn>, + pub relays: Vec<String>, + pub is_mobile: Option<bool>, + pub keys: Vec<Keypair>, + pub since_optimize: bool, + pub light: bool, + pub dbpath: Option<String>, +} + +impl Args { + pub fn parse(args: &[String]) -> Self { + let mut res = Args { + columns: vec![], + relays: vec![], + is_mobile: None, + keys: vec![], + light: false, + since_optimize: true, + dbpath: None, + }; + + let mut i = 0; + let len = args.len(); + while i < len { + let arg = &args[i]; + + if arg == "--mobile" { + res.is_mobile = Some(true); + } else if arg == "--light" { + res.light = true; + } else if arg == "--dark" { + res.light = false; + } else if arg == "--pub" || arg == "--npub" { + i += 1; + let pubstr = if let Some(next_arg) = args.get(i) { + next_arg + } else { + error!("sec argument missing?"); + continue; + }; + + if let Ok(pk) = Pubkey::parse(pubstr) { + res.keys.push(Keypair::only_pubkey(pk)); + } else { + error!( + "failed to parse {} argument. Make sure to use hex or npub.", + arg + ); + } + } else if arg == "--sec" || arg == "--nsec" { + i += 1; + let secstr = if let Some(next_arg) = args.get(i) { + next_arg + } else { + error!("sec argument missing?"); + continue; + }; + + if let Ok(sec) = SecretKey::parse(secstr) { + res.keys.push(Keypair::from_secret(sec)); + } else { + error!( + "failed to parse {} argument. Make sure to use hex or nsec.", + arg + ); + } + } else if arg == "--no-since-optimize" { + res.since_optimize = false; + } else if arg == "--filter" { + i += 1; + let filter = if let Some(next_arg) = args.get(i) { + next_arg + } else { + error!("filter argument missing?"); + continue; + }; + + if let Ok(filter) = Filter::from_json(filter) { + res.columns.push(ArgColumn::Generic(vec![filter])); + } else { + error!("failed to parse filter '{}'", filter); + } + } else if arg == "--dbpath" { + i += 1; + let path = if let Some(next_arg) = args.get(i) { + next_arg + } else { + error!("dbpath argument missing?"); + continue; + }; + res.dbpath = Some(path.clone()); + } else if arg == "-r" || arg == "--relay" { + i += 1; + let relay = if let Some(next_arg) = args.get(i) { + next_arg + } else { + error!("relay argument missing?"); + continue; + }; + res.relays.push(relay.clone()); + } else if arg == "--column" || arg == "-c" { + i += 1; + let column_name = if let Some(next_arg) = args.get(i) { + next_arg + } else { + error!("column argument missing"); + continue; + }; + + if let Some(rest) = column_name.strip_prefix("contacts:") { + if let Ok(pubkey) = Pubkey::parse(rest) { + info!("got contact column for user {}", pubkey.hex()); + res.columns.push(ArgColumn::Column(ColumnKind::contact_list( + PubkeySource::Explicit(pubkey), + ))) + } else { + error!("error parsing contacts pubkey {}", &column_name[9..]); + continue; + } + } else if column_name == "contacts" { + res.columns.push(ArgColumn::Column(ColumnKind::contact_list( + PubkeySource::DeckAuthor, + ))) + } + } else if arg == "--filter-file" || arg == "-f" { + i += 1; + let filter_file = if let Some(next_arg) = args.get(i) { + next_arg + } else { + error!("filter file argument missing?"); + continue; + }; + + let data = if let Ok(data) = std::fs::read(filter_file) { + data + } else { + error!("failed to read filter file '{}'", filter_file); + continue; + }; + + if let Some(filter) = std::str::from_utf8(&data) + .ok() + .and_then(|s| Filter::from_json(s).ok()) + { + res.columns.push(ArgColumn::Generic(vec![filter])); + } else { + error!("failed to parse filter in '{}'", filter_file); + } + } + + i += 1; + } + + if res.columns.is_empty() { + let ck = ColumnKind::contact_list(PubkeySource::DeckAuthor); + info!("No columns set, setting up defaults: {:?}", ck); + res.columns.push(ArgColumn::Column(ck)); + } + + res + } +} + +/// A way to define columns from the commandline. Can be column kinds or +/// generic queries +pub enum ArgColumn { + Column(ColumnKind), + Generic(Vec<Filter>), +} + +impl ArgColumn { + pub fn into_timeline(self, ndb: &Ndb, user: Option<&[u8; 32]>) -> Option<Timeline> { + match self { + ArgColumn::Generic(filters) => Some(Timeline::new( + ColumnKind::Generic, + FilterState::ready(filters), + )), + ArgColumn::Column(ck) => ck.into_timeline(ndb, user), + } + } +} diff --git a/src/column.rs b/src/column.rs @@ -0,0 +1,97 @@ +use crate::error::FilterError; +use crate::filter::FilterState; +use crate::{timeline::Timeline, Error}; +use enostr::Pubkey; +use nostrdb::{Filter, Ndb, Transaction}; +use std::fmt::Display; +use tracing::{error, warn}; + +#[derive(Clone, Debug)] +pub enum PubkeySource { + Explicit(Pubkey), + DeckAuthor, +} + +#[derive(Debug)] +pub enum ListKind { + Contact(PubkeySource), +} + +/// +/// What kind of column is it? +/// - Follow List +/// - Notifications +/// - DM +/// - filter +/// - ... etc +#[derive(Debug)] +pub enum ColumnKind { + List(ListKind), + Universe, + + /// Generic filter + Generic, +} + +impl Display for ColumnKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ColumnKind::List(ListKind::Contact(_src)) => f.write_str("Contacts"), + ColumnKind::Generic => f.write_str("Timeline"), + ColumnKind::Universe => f.write_str("Universe"), + } + } +} + +impl ColumnKind { + pub fn contact_list(pk: PubkeySource) -> Self { + ColumnKind::List(ListKind::Contact(pk)) + } + + pub fn into_timeline(self, ndb: &Ndb, default_user: Option<&[u8; 32]>) -> Option<Timeline> { + match self { + ColumnKind::Universe => Some(Timeline::new( + ColumnKind::Universe, + FilterState::ready(vec![]), + )), + + ColumnKind::Generic => { + warn!("you can't convert a ColumnKind::Generic to a Timeline"); + None + } + + ColumnKind::List(ListKind::Contact(ref pk_src)) => { + let pk = match pk_src { + PubkeySource::DeckAuthor => default_user?, + PubkeySource::Explicit(pk) => pk.bytes(), + }; + + let contact_filter = Filter::new().authors([pk]).kinds([3]).limit(1).build(); + + let txn = Transaction::new(ndb).expect("txn"); + let results = ndb + .query(&txn, &[contact_filter.clone()], 1) + .expect("contact query failed?"); + + if results.is_empty() { + return Some(Timeline::new( + ColumnKind::contact_list(pk_src.to_owned()), + FilterState::needs_remote(vec![contact_filter.clone()]), + )); + } + + match Timeline::contact_list(&results[0].note) { + Err(Error::Filter(FilterError::EmptyContactList)) => Some(Timeline::new( + ColumnKind::contact_list(pk_src.to_owned()), + FilterState::needs_remote(vec![contact_filter]), + )), + Err(e) => { + error!("Unexpected error: {e}"); + None + } + Ok(tl) => Some(tl), + } + } + } + } +} diff --git a/src/error.rs b/src/error.rs @@ -1,5 +1,10 @@ use std::{fmt, io}; +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum FilterError { + EmptyContactList, +} + #[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum SubscriptionError { //#[error("No active subscriptions")] @@ -36,6 +41,7 @@ impl fmt::Display for SubscriptionError { #[derive(Debug)] pub enum Error { SubscriptionError(SubscriptionError), + Filter(FilterError), LoadFailed, Io(io::Error), Nostr(enostr::Error), @@ -44,15 +50,34 @@ pub enum Error { Generic(String), } +impl Error { + pub fn empty_contact_list() -> Self { + Error::Filter(FilterError::EmptyContactList) + } +} + +impl fmt::Display for FilterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::EmptyContactList => { + write!(f, "empty contact list") + } + } + } +} + impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::SubscriptionError(sub_err) => { - write!(f, "{sub_err}") + Self::SubscriptionError(e) => { + write!(f, "{e}") } Self::LoadFailed => { write!(f, "load failed") } + Self::Filter(e) => { + write!(f, "{e}") + } Self::Nostr(e) => write!(f, "{e}"), Self::Ndb(e) => write!(f, "{e}"), Self::Image(e) => write!(f, "{e}"), @@ -91,3 +116,9 @@ impl From<io::Error> for Error { Error::Io(err) } } + +impl From<FilterError> for Error { + fn from(err: FilterError) -> Self { + Error::Filter(err) + } +} diff --git a/src/filter.rs b/src/filter.rs @@ -1,5 +1,66 @@ +use crate::error::{Error, FilterError}; use crate::note::NoteRef; -use nostrdb::Filter; +use crate::Result; +use nostrdb::{Filter, FilterBuilder, Note, Subscription}; +use tracing::{debug, warn}; + +/// A unified subscription has a local and remote component. The remote subid +/// tracks data received remotely, and local +#[derive(Debug, Clone)] +pub struct UnifiedSubscription { + pub local: Subscription, + pub remote: String, +} + +/// We may need to fetch some data from relays before our filter is ready. +/// [`FilterState`] tracks this. +#[derive(Debug, Clone)] +pub enum FilterState { + NeedsRemote(Vec<Filter>), + FetchingRemote(UnifiedSubscription), + GotRemote(Subscription), + Ready(Vec<Filter>), + Broken(FilterError), +} + +impl FilterState { + /// We tried to fetch a filter but we wither got no data or the data + /// was corrupted, preventing us from getting to the Ready state. + /// Just mark the timeline as broken so that we can signal to the + /// user that something went wrong + pub fn broken(reason: FilterError) -> Self { + Self::Broken(reason) + } + + /// The filter is ready + pub fn ready(filter: Vec<Filter>) -> Self { + Self::Ready(filter) + } + + /// We need some data from relays before we can continue. Example: + /// for home timelines where we don't have a contact list yet. We + /// need to fetch the contact list before we have the right timeline + /// filter. + pub fn needs_remote(filter: Vec<Filter>) -> Self { + Self::NeedsRemote(filter) + } + + /// We got the remote data. Local data should be available to build + /// the filter for the [`FilterState::Ready`] state + pub fn got_remote(local_sub: Subscription) -> Self { + Self::GotRemote(local_sub) + } + + /// We have sent off a remote subscription to get data needed for the + /// filter. The string is the subscription id + pub fn fetching_remote(sub_id: String, local_sub: Subscription) -> Self { + let unified_sub = UnifiedSubscription { + local: local_sub, + remote: sub_id, + }; + Self::FetchingRemote(unified_sub) + } +} pub fn should_since_optimize(limit: u64, num_notes: usize) -> bool { // rough heuristic for bailing since optimization if we don't have enough notes @@ -30,3 +91,107 @@ pub fn default_limit() -> u64 { pub fn default_remote_limit() -> u64 { 150 } + +pub struct FilteredTags { + pub authors: Option<FilterBuilder>, + pub hashtags: Option<FilterBuilder>, +} + +impl FilteredTags { + pub fn into_follow_filter(self) -> Vec<Filter> { + self.into_filter([1]) + } + + // TODO: make this more general + pub fn into_filter<I>(self, kinds: I) -> Vec<Filter> + where + I: IntoIterator<Item = u64> + Copy, + { + let mut filters: Vec<Filter> = Vec::with_capacity(2); + + if let Some(authors) = self.authors { + filters.push(authors.kinds(kinds).build()) + } + + if let Some(hashtags) = self.hashtags { + filters.push(hashtags.kinds(kinds).build()) + } + + filters + } +} + +/// Create a filter from tags. This can be used to create a filter +/// from a contact list +pub fn filter_from_tags(note: &Note) -> Result<FilteredTags> { + let mut author_filter = Filter::new(); + let mut hashtag_filter = Filter::new(); + let mut author_res: Option<FilterBuilder> = None; + let mut hashtag_res: Option<FilterBuilder> = None; + let mut author_count = 0i32; + let mut hashtag_count = 0i32; + + let tags = note.tags(); + + author_filter.start_authors_field()?; + hashtag_filter.start_tags_field('t')?; + + for tag in tags { + if tag.count() < 2 { + continue; + } + + let t = if let Some(t) = tag.get_unchecked(0).variant().str() { + t + } else { + continue; + }; + + if t == "p" { + let author = if let Some(author) = tag.get_unchecked(1).variant().id() { + author + } else { + continue; + }; + + author_filter.add_id_element(author)?; + author_count += 1; + } else if t == "t" { + let hashtag = if let Some(hashtag) = tag.get_unchecked(1).variant().str() { + hashtag + } else { + continue; + }; + + hashtag_filter.add_str_element(hashtag)?; + hashtag_count += 1; + } + } + + author_filter.end_field(); + hashtag_filter.end_field(); + + if author_count == 0 && hashtag_count == 0 { + warn!("no authors or hashtags found in contact list"); + return Err(Error::empty_contact_list()); + } + + debug!( + "adding {} authors and {} hashtags to contact filter", + author_count, hashtag_count + ); + + // if we hit these ooms, we need to expand filter buffer size + if author_count > 0 { + author_res = Some(author_filter) + } + + if hashtag_count > 0 { + hashtag_res = Some(hashtag_filter) + } + + Ok(FilteredTags { + authors: author_res, + hashtags: hashtag_res, + }) +} diff --git a/src/lib.rs b/src/lib.rs @@ -8,7 +8,9 @@ pub mod account_manager; mod actionbar; pub mod app_creation; mod app_style; +mod args; mod colors; +mod column; mod draft; mod filter; mod fonts; @@ -26,12 +28,14 @@ mod profile; pub mod relay_pool_manager; mod result; mod route; +mod subscriptions; mod test_data; mod thread; mod time; mod timecache; mod timeline; pub mod ui; +mod unknowns; mod user_account; #[cfg(test)] diff --git a/src/subscriptions.rs b/src/subscriptions.rs @@ -0,0 +1,23 @@ +use std::collections::HashMap; + +pub enum SubKind { + /// Initial subscription. This is the first time we do a remote subscription + /// for a timeline + Initial, + + /// One shot requests, we can just close after we receive EOSE + OneShot, + + /// We are fetching a contact list so that we can use it for our follows + /// Filter. + // TODO: generalize this to any list? + FetchingContactList(u32), +} + +/// Subscriptions that need to be tracked at various stages. Sometimes we +/// need to do A, then B, then C. Tracking requests at various stages by +/// mapping uuid subids to explicit states happens here. +#[derive(Default)] +pub struct Subscriptions { + pub subs: HashMap<String, SubKind>, +} diff --git a/src/thread.rs b/src/thread.rs @@ -16,7 +16,7 @@ pub struct Thread { #[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum DecrementResult { - LastSubscriber(u64), + LastSubscriber(Subscription), ActiveSubscribers, } @@ -54,7 +54,7 @@ impl Thread { let last_note = notes[0]; let filters = Thread::filters_since(root_id, last_note.created_at + 1); - if let Ok(results) = ndb.query(txn, filters, 1000) { + if let Ok(results) = ndb.query(txn, &filters, 1000) { debug!("got {} results from thread update", results.len()); results .into_iter() @@ -72,7 +72,7 @@ impl Thread { match self.subscribers.cmp(&0) { Ordering::Equal => { if let Some(sub) = self.subscription() { - Ok(DecrementResult::LastSubscriber(sub.id)) + Ok(DecrementResult::LastSubscriber(sub)) } else { Err(Error::no_active_sub()) } @@ -82,8 +82,8 @@ impl Thread { } } - pub fn subscription(&self) -> Option<&Subscription> { - self.sub.as_ref() + pub fn subscription(&self) -> Option<Subscription> { + self.sub } pub fn remote_subscription(&self) -> &Option<String> { @@ -171,7 +171,7 @@ impl Threads { // we don't have the thread, query for it! let filters = Thread::filters(root_id); - let notes = if let Ok(results) = ndb.query(txn, filters, 1000) { + let notes = if let Ok(results) = ndb.query(txn, &filters, 1000) { results .into_iter() .map(NoteRef::from_query_result) diff --git a/src/timeline.rs b/src/timeline.rs @@ -1,15 +1,18 @@ -use crate::app::{get_unknown_note_ids, UnknownId}; +use crate::column::{ColumnKind, PubkeySource}; use crate::error::Error; use crate::note::NoteRef; use crate::notecache::CachedNote; +use crate::unknowns::UnknownIds; +use crate::{filter, filter::FilterState}; use crate::{Damus, Result}; +use std::sync::atomic::{AtomicU32, Ordering}; use crate::route::Route; use egui_virtual_list::VirtualList; -use nostrdb::{Filter, Note, Subscription, Transaction}; +use enostr::Pubkey; +use nostrdb::{Note, Subscription, Transaction}; use std::cell::RefCell; -use std::collections::HashSet; use std::rc::Rc; use tracing::{debug, error}; @@ -47,9 +50,9 @@ impl<'a> TimelineSource<'a> { } } - pub fn sub<'b>(self, app: &'b mut Damus, txn: &Transaction) -> Option<&'b Subscription> { + pub fn sub(self, app: &mut Damus, txn: &Transaction) -> Option<Subscription> { match self { - TimelineSource::Column { ind, .. } => app.timelines[ind].subscription.as_ref(), + TimelineSource::Column { ind, .. } => app.timelines[ind].subscription, TimelineSource::Thread(root_id) => { // TODO: replace all this with the raw entry api eventually @@ -64,45 +67,37 @@ impl<'a> TimelineSource<'a> { } } - pub fn poll_notes_into_view( - &self, - app: &mut Damus, - txn: &'a Transaction, - ids: &mut HashSet<UnknownId<'a>>, - ) -> Result<()> { - let sub_id = if let Some(sub_id) = self.sub(app, txn).map(|s| s.id) { - sub_id - } else { - return Err(Error::no_active_sub()); + /// Check local subscriptions for new notes and insert them into + /// timelines (threads, columns) + pub fn poll_notes_into_view(&self, app: &mut Damus) -> Result<()> { + let sub = { + let txn = Transaction::new(&app.ndb).expect("txn"); + if let Some(sub) = self.sub(app, &txn) { + sub + } else { + return Err(Error::no_active_sub()); + } }; - // - // TODO(BUG!): poll for these before the txn, otherwise we can hit - // a race condition where we hit the "no note??" expect below. This may - // require some refactoring due to the missing ids logic - // - let new_note_ids = app.ndb.poll_for_notes(sub_id, 100); + let new_note_ids = app.ndb.poll_for_notes(sub, 100); if new_note_ids.is_empty() { return Ok(()); } else { debug!("{} new notes! {:?}", new_note_ids.len(), new_note_ids); } + let txn = Transaction::new(&app.ndb).expect("txn"); let mut new_refs: Vec<(Note, NoteRef)> = Vec::with_capacity(new_note_ids.len()); for key in new_note_ids { - let note = if let Ok(note) = app.ndb.get_note_by_key(txn, key) { + let note = if let Ok(note) = app.ndb.get_note_by_key(&txn, key) { note } else { error!("hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", key); continue; }; - let cached_note = app - .note_cache_mut() - .cached_note_or_insert(key, &note) - .clone(); - let _ = get_unknown_note_ids(&app.ndb, &cached_note, txn, &note, key, ids); + UnknownIds::update_from_note(&txn, app, &note); let created_at = note.created_at(); new_refs.push((note, NoteRef { key, created_at })); @@ -120,7 +115,7 @@ impl<'a> TimelineSource<'a> { let refs: Vec<NoteRef> = new_refs.iter().map(|(_note, nr)| *nr).collect(); let reversed = false; - self.view(app, txn, ViewFilter::NotesAndReplies) + self.view(app, &txn, ViewFilter::NotesAndReplies) .insert(&refs, reversed); } @@ -139,7 +134,7 @@ impl<'a> TimelineSource<'a> { } } - self.view(app, txn, ViewFilter::Notes) + self.view(app, &txn, ViewFilter::Notes) .insert(&filtered_refs, reversed); } @@ -272,8 +267,13 @@ impl TimelineTab { } } +/// A column in a deck. Holds navigation state, loaded notes, column kind, etc. pub struct Timeline { - pub filter: Vec<Filter>, + pub uid: u32, + pub kind: ColumnKind, + // We may not have the filter loaded yet, so let's make it an option so + // that codepaths have to explicitly handle it + pub filter: FilterState, pub views: Vec<TimelineTab>, pub selected_view: i32, pub routes: Vec<Route>, @@ -285,17 +285,34 @@ pub struct Timeline { } impl Timeline { - pub fn new(filter: Vec<Filter>) -> Self { + /// Create a timeline from a contact list + pub fn contact_list(contact_list: &Note) -> Result<Self> { + let filter = filter::filter_from_tags(contact_list)?.into_follow_filter(); + let pk_src = PubkeySource::Explicit(Pubkey::new(*contact_list.pubkey())); + + Ok(Timeline::new( + ColumnKind::contact_list(pk_src), + FilterState::ready(filter), + )) + } + + pub fn new(kind: ColumnKind, filter: FilterState) -> Self { + // global unique id for all new timelines + static UIDS: AtomicU32 = AtomicU32::new(0); + let subscription: Option<Subscription> = None; let notes = TimelineTab::new(ViewFilter::Notes); let replies = TimelineTab::new(ViewFilter::NotesAndReplies); let views = vec![notes, replies]; let selected_view = 0; - let routes = vec![Route::Timeline("Timeline".to_string())]; + let routes = vec![Route::Timeline(format!("{}", kind))]; let navigating = false; let returning = false; + let uid = UIDS.fetch_add(1, Ordering::Relaxed); Timeline { + uid, + kind, navigating, returning, filter, diff --git a/src/ui/profile/picture.rs b/src/ui/profile/picture.rs @@ -144,7 +144,7 @@ mod preview { let mut pks = HashSet::new(); let mut keys = HashSet::new(); - for query_result in ndb.query(&txn, filters, 2000).unwrap() { + for query_result in ndb.query(&txn, &filters, 2000).unwrap() { pks.insert(query_result.note.pubkey()); } diff --git a/src/ui/thread.rs b/src/ui/thread.rs @@ -1,7 +1,6 @@ use crate::{actionbar::BarResult, timeline::TimelineSource, ui, Damus}; use nostrdb::{NoteKey, Transaction}; -use std::collections::HashSet; -use tracing::warn; +use tracing::{error, warn}; pub struct ThreadView<'a> { app: &'a mut Damus, @@ -72,11 +71,8 @@ impl<'a> ThreadView<'a> { }; // poll for new notes and insert them into our existing notes - { - let mut ids = HashSet::new(); - let _ = TimelineSource::Thread(root_id) - .poll_notes_into_view(self.app, &txn, &mut ids); - // TODO: do something with unknown ids + if let Err(e) = TimelineSource::Thread(root_id).poll_notes_into_view(self.app) { + error!("Thread::poll_notes_into_view: {e}"); } let (len, list) = { diff --git a/src/unknowns.rs b/src/unknowns.rs @@ -0,0 +1,278 @@ +use crate::notecache::CachedNote; +use crate::timeline::ViewFilter; +use crate::{Damus, Result}; +use enostr::{Filter, NoteId, Pubkey}; +use nostrdb::{BlockType, Mention, Ndb, Note, NoteKey, Transaction}; +use std::collections::HashSet; +use std::time::{Duration, Instant}; +use tracing::error; + +/// Unknown Id searcher +#[derive(Default)] +pub struct UnknownIds { + ids: HashSet<UnknownId>, + first_updated: Option<Instant>, + last_updated: Option<Instant>, +} + +impl UnknownIds { + /// Simple debouncer + pub fn ready_to_send(&self) -> bool { + if self.ids.is_empty() { + return false; + } + + // we trigger on first set + if self.first_updated == self.last_updated { + return true; + } + + let last_updated = if let Some(last) = self.last_updated { + last + } else { + // if we've + return true; + }; + + Instant::now() - last_updated >= Duration::from_secs(2) + } + + pub fn ids(&self) -> &HashSet<UnknownId> { + &self.ids + } + + pub fn ids_mut(&mut self) -> &mut HashSet<UnknownId> { + &mut self.ids + } + + pub fn clear(&mut self) { + self.ids = HashSet::default(); + } + + pub fn filter(&self) -> Option<Vec<Filter>> { + let ids: Vec<&UnknownId> = self.ids.iter().collect(); + get_unknown_ids_filter(&ids) + } + + /// We've updated some unknown ids, update the last_updated time to now + pub fn mark_updated(&mut self) { + let now = Instant::now(); + if self.first_updated.is_none() { + self.first_updated = Some(now); + } + self.last_updated = Some(now); + } + + pub fn update_from_note(txn: &Transaction, app: &mut Damus, note: &Note) -> bool { + let before = app.unknown_ids.ids().len(); + let key = note.key().expect("note key"); + let cached_note = app + .note_cache_mut() + .cached_note_or_insert(key, note) + .clone(); + if let Err(e) = + get_unknown_note_ids(&app.ndb, &cached_note, txn, note, app.unknown_ids.ids_mut()) + { + error!("UnknownIds::update_from_note {e}"); + } + let after = app.unknown_ids.ids().len(); + + if before != after { + app.unknown_ids.mark_updated(); + true + } else { + false + } + } + + pub fn update(txn: &Transaction, app: &mut Damus) -> bool { + let before = app.unknown_ids.ids().len(); + if let Err(e) = get_unknown_ids(txn, app) { + error!("UnknownIds::update {e}"); + } + let after = app.unknown_ids.ids().len(); + + if before != after { + app.unknown_ids.mark_updated(); + true + } else { + false + } + } +} + +#[derive(Hash, Clone, Copy, PartialEq, Eq)] +pub enum UnknownId { + Pubkey(Pubkey), + Id(NoteId), +} + +impl UnknownId { + pub fn is_pubkey(&self) -> Option<&Pubkey> { + match self { + UnknownId::Pubkey(pk) => Some(pk), + _ => None, + } + } + + pub fn is_id(&self) -> Option<&NoteId> { + match self { + UnknownId::Id(id) => Some(id), + _ => None, + } + } +} + +/// Look for missing notes in various parts of notes that we see: +/// +/// - pubkeys and notes mentioned inside the note +/// - notes being replied to +/// +/// We return all of this in a HashSet so that we can fetch these from +/// remote relays. +/// +pub fn get_unknown_note_ids<'a>( + ndb: &Ndb, + cached_note: &CachedNote, + txn: &'a Transaction, + note: &Note<'a>, + ids: &mut HashSet<UnknownId>, +) -> Result<()> { + // the author pubkey + + if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { + ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey()))); + } + + // pull notes that notes are replying to + if cached_note.reply.root.is_some() { + let note_reply = cached_note.reply.borrow(note.tags()); + if let Some(root) = note_reply.root() { + if ndb.get_note_by_id(txn, root.id).is_err() { + ids.insert(UnknownId::Id(NoteId::new(*root.id))); + } + } + + if !note_reply.is_reply_to_root() { + if let Some(reply) = note_reply.reply() { + if ndb.get_note_by_id(txn, reply.id).is_err() { + ids.insert(UnknownId::Id(NoteId::new(*reply.id))); + } + } + } + } + + let blocks = ndb.get_blocks_by_key(txn, note.key().expect("note key"))?; + for block in blocks.iter(note) { + if block.blocktype() != BlockType::MentionBech32 { + continue; + } + + match block.as_mention().unwrap() { + Mention::Pubkey(npub) => { + if ndb.get_profile_by_pubkey(txn, npub.pubkey()).is_err() { + ids.insert(UnknownId::Pubkey(Pubkey::new(*npub.pubkey()))); + } + } + Mention::Profile(nprofile) => { + if ndb.get_profile_by_pubkey(txn, nprofile.pubkey()).is_err() { + ids.insert(UnknownId::Pubkey(Pubkey::new(*nprofile.pubkey()))); + } + } + Mention::Event(ev) => match ndb.get_note_by_id(txn, ev.id()) { + Err(_) => { + ids.insert(UnknownId::Id(NoteId::new(*ev.id()))); + if let Some(pk) = ev.pubkey() { + if ndb.get_profile_by_pubkey(txn, pk).is_err() { + ids.insert(UnknownId::Pubkey(Pubkey::new(*pk))); + } + } + } + Ok(note) => { + if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { + ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey()))); + } + } + }, + Mention::Note(note) => match ndb.get_note_by_id(txn, note.id()) { + Err(_) => { + ids.insert(UnknownId::Id(NoteId::new(*note.id()))); + } + Ok(note) => { + if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { + ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey()))); + } + } + }, + _ => {} + } + } + + Ok(()) +} + +fn get_unknown_ids(txn: &Transaction, damus: &mut Damus) -> Result<()> { + #[cfg(feature = "profiling")] + puffin::profile_function!(); + + let mut new_cached_notes: Vec<(NoteKey, CachedNote)> = vec![]; + + for timeline in &damus.timelines { + for noteref in timeline.notes(ViewFilter::NotesAndReplies) { + let note = damus.ndb.get_note_by_key(txn, noteref.key)?; + let note_key = note.key().unwrap(); + let cached_note = damus.note_cache().cached_note(noteref.key); + let cached_note = if let Some(cn) = cached_note { + cn.clone() + } else { + let new_cached_note = CachedNote::new(&note); + new_cached_notes.push((note_key, new_cached_note.clone())); + new_cached_note + }; + + let _ = get_unknown_note_ids( + &damus.ndb, + &cached_note, + txn, + &note, + damus.unknown_ids.ids_mut(), + ); + } + } + + // This is mainly done to avoid the double mutable borrow that would happen + // if we tried to update the note_cache mutably in the loop above + for (note_key, note) in new_cached_notes { + damus.note_cache_mut().cache_mut().insert(note_key, note); + } + + Ok(()) +} + +fn get_unknown_ids_filter(ids: &[&UnknownId]) -> Option<Vec<Filter>> { + if ids.is_empty() { + return None; + } + + let ids = &ids[0..500.min(ids.len())]; + let mut filters: Vec<Filter> = vec![]; + + let pks: Vec<&[u8; 32]> = ids + .iter() + .flat_map(|id| id.is_pubkey().map(|pk| pk.bytes())) + .collect(); + if !pks.is_empty() { + let pk_filter = Filter::new().authors(pks).kinds([0]).build(); + filters.push(pk_filter); + } + + let note_ids: Vec<&[u8; 32]> = ids + .iter() + .flat_map(|id| id.is_id().map(|id| id.bytes())) + .collect(); + if !note_ids.is_empty() { + filters.push(Filter::new().ids(note_ids).build()); + } + + Some(filters) +}