nostr-rs-relay

My dev fork of nostr-rs-relay
git clone git://jb55.com/nostr-rs-relay
Log | Files | Refs | README | LICENSE

db.rs (26881B)


      1 //! Event persistence and querying
      2 //use crate::config::SETTINGS;
      3 use crate::config::Settings;
      4 use crate::error::{Error, Result};
      5 use crate::event::{single_char_tagname, Event};
      6 use crate::hexrange::hex_range;
      7 use crate::hexrange::HexSearch;
      8 use crate::nip05;
      9 use crate::schema::{upgrade_db, STARTUP_SQL};
     10 use crate::subscription::ReqFilter;
     11 use crate::subscription::Subscription;
     12 use crate::utils::{is_hex, is_lower_hex};
     13 use governor::clock::Clock;
     14 use governor::{Quota, RateLimiter};
     15 use hex;
     16 use r2d2;
     17 use r2d2_sqlite::SqliteConnectionManager;
     18 use rusqlite::params;
     19 use rusqlite::types::ToSql;
     20 use rusqlite::OpenFlags;
     21 use std::fmt::Write as _;
     22 use std::path::Path;
     23 use std::thread;
     24 use std::time::Duration;
     25 use std::time::Instant;
     26 use tokio::task;
     27 use tracing::{debug, info, trace, warn};
     28 
     29 pub type SqlitePool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
     30 pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
     31 
     32 /// Events submitted from a client, with a return channel for notices
     33 pub struct SubmittedEvent {
     34     pub event: Event,
     35     pub notice_tx: tokio::sync::mpsc::Sender<String>,
     36 }
     37 
     38 /// Database file
     39 pub const DB_FILE: &str = "nostr.db";
     40 
     41 /// Build a database connection pool.
     42 /// # Panics
     43 ///
     44 /// Will panic if the pool could not be created.
     45 #[must_use]
     46 pub fn build_pool(
     47     name: &str,
     48     settings: &Settings,
     49     flags: OpenFlags,
     50     min_size: u32,
     51     max_size: u32,
     52     wait_for_db: bool,
     53 ) -> SqlitePool {
     54     let db_dir = &settings.database.data_directory;
     55     let full_path = Path::new(db_dir).join(DB_FILE);
     56     // small hack; if the database doesn't exist yet, that means the
     57     // writer thread hasn't finished.  Give it a chance to work.  This
     58     // is only an issue with the first time we run.
     59     if !settings.database.in_memory {
     60         while !full_path.exists() && wait_for_db {
     61             debug!("Database reader pool is waiting on the database to be created...");
     62             thread::sleep(Duration::from_millis(500));
     63         }
     64     }
     65     let manager = if settings.database.in_memory {
     66         SqliteConnectionManager::memory()
     67             .with_flags(flags)
     68             .with_init(|c| c.execute_batch(STARTUP_SQL))
     69     } else {
     70         SqliteConnectionManager::file(&full_path)
     71             .with_flags(flags)
     72             .with_init(|c| c.execute_batch(STARTUP_SQL))
     73     };
     74     let pool: SqlitePool = r2d2::Pool::builder()
     75         .test_on_check_out(true) // no noticeable performance hit
     76         .min_idle(Some(min_size))
     77         .max_size(max_size)
     78         .build(manager)
     79         .unwrap();
     80     info!(
     81         "Built a connection pool {:?} (min={}, max={})",
     82         name, min_size, max_size
     83     );
     84     pool
     85 }
     86 
     87 /// Spawn a database writer that persists events to the SQLite store.
     88 pub async fn db_writer(
     89     settings: Settings,
     90     mut event_rx: tokio::sync::mpsc::Receiver<SubmittedEvent>,
     91     bcast_tx: tokio::sync::broadcast::Sender<Event>,
     92     metadata_tx: tokio::sync::broadcast::Sender<Event>,
     93     mut shutdown: tokio::sync::broadcast::Receiver<()>,
     94 ) -> tokio::task::JoinHandle<Result<()>> {
     95     // are we performing NIP-05 checking?
     96     let nip05_active = settings.verified_users.is_active();
     97     // are we requriing NIP-05 user verification?
     98     let nip05_enabled = settings.verified_users.is_enabled();
     99 
    100     task::spawn_blocking(move || {
    101         let db_dir = &settings.database.data_directory;
    102         let full_path = Path::new(db_dir).join(DB_FILE);
    103         // create a connection pool
    104         let pool = build_pool(
    105             "event writer",
    106             &settings,
    107             OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
    108             1,
    109             4,
    110             false,
    111         );
    112         if settings.database.in_memory {
    113             info!("using in-memory database, this will not persist a restart!");
    114         } else {
    115             info!("opened database {:?} for writing", full_path);
    116         }
    117         upgrade_db(&mut pool.get()?)?;
    118 
    119         // Make a copy of the whitelist
    120         let whitelist = &settings.authorization.pubkey_whitelist.clone();
    121 
    122         // get rate limit settings
    123         let rps_setting = settings.limits.messages_per_sec;
    124         let mut most_recent_rate_limit = Instant::now();
    125         let mut lim_opt = None;
    126         let clock = governor::clock::QuantaClock::default();
    127         if let Some(rps) = rps_setting {
    128             if rps > 0 {
    129                 info!("Enabling rate limits for event creation ({}/sec)", rps);
    130                 let quota = core::num::NonZeroU32::new(rps * 60).unwrap();
    131                 lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota)));
    132             }
    133         }
    134         loop {
    135             if shutdown.try_recv().is_ok() {
    136                 info!("shutting down database writer");
    137                 break;
    138             }
    139             // call blocking read on channel
    140             let next_event = event_rx.blocking_recv();
    141             // if the channel has closed, we will never get work
    142             if next_event.is_none() {
    143                 break;
    144             }
    145             // track if an event write occurred; this is used to
    146             // update the rate limiter
    147             let mut event_write = false;
    148             let subm_event = next_event.unwrap();
    149             let event = subm_event.event;
    150             let notice_tx = subm_event.notice_tx;
    151             // check if this event is authorized.
    152             if let Some(allowed_addrs) = whitelist {
    153                 // TODO: incorporate delegated pubkeys
    154                 // if the event address is not in allowed_addrs.
    155                 if !allowed_addrs.contains(&event.pubkey) {
    156                     info!(
    157                         "Rejecting event {}, unauthorized author",
    158                         event.get_event_id_prefix()
    159                     );
    160                     notice_tx
    161                         .try_send("pubkey is not allowed to publish to this relay".to_owned())
    162                         .ok();
    163                     continue;
    164                 }
    165             }
    166 
    167             // send any metadata events to the NIP-05 verifier
    168             if nip05_active && event.is_kind_metadata() {
    169                 // we are sending this prior to even deciding if we
    170                 // persist it.  this allows the nip05 module to
    171                 // inspect it, update if necessary, or persist a new
    172                 // event and broadcast it itself.
    173                 metadata_tx.send(event.clone()).ok();
    174             }
    175 
    176             // check for  NIP-05 verification
    177             if nip05_enabled {
    178                 match nip05::query_latest_user_verification(pool.get()?, event.pubkey.to_owned()) {
    179                     Ok(uv) => {
    180                         if uv.is_valid(&settings.verified_users) {
    181                             info!(
    182                                 "new event from verified author ({:?},{:?})",
    183                                 uv.name.to_string(),
    184                                 event.get_author_prefix()
    185                             );
    186                         } else {
    187                             info!("rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
    188                                   uv.name.to_string(),
    189                                   event.get_author_prefix()
    190                             );
    191                             notice_tx
    192                                 .try_send(
    193                                     "NIP-05 verification is no longer valid (expired/wrong domain)"
    194                                         .to_owned(),
    195                                 )
    196                                 .ok();
    197                             continue;
    198                         }
    199                     }
    200                     Err(Error::SqlError(rusqlite::Error::QueryReturnedNoRows)) => {
    201                         debug!(
    202                             "no verification records found for pubkey: {:?}",
    203                             event.get_author_prefix()
    204                         );
    205                         notice_tx
    206                             .try_send("NIP-05 verification needed to publish events".to_owned())
    207                             .ok();
    208                         continue;
    209                     }
    210                     Err(e) => {
    211                         warn!("checking nip05 verification status failed: {:?}", e);
    212                         continue;
    213                     }
    214                 }
    215             }
    216             // TODO: cache recent list of authors to remove a DB call.
    217             let start = Instant::now();
    218             if event.kind >= 20000 && event.kind < 30000 {
    219                 bcast_tx.send(event.clone()).ok();
    220                 info!(
    221                     "published ephemeral event: {:?} from: {:?} in: {:?}",
    222                     event.get_event_id_prefix(),
    223                     event.get_author_prefix(),
    224                     start.elapsed()
    225                 );
    226                 event_write = true
    227             } else {
    228                 match write_event(&mut pool.get()?, &event) {
    229                     Ok(updated) => {
    230                         if updated == 0 {
    231                             trace!("ignoring duplicate or deleted event");
    232                         } else {
    233                             info!(
    234                                 "persisted event: {:?} from: {:?} in: {:?}",
    235                                 event.get_event_id_prefix(),
    236                                 event.get_author_prefix(),
    237                                 start.elapsed()
    238                             );
    239                             event_write = true;
    240                             // send this out to all clients
    241                             bcast_tx.send(event.clone()).ok();
    242                         }
    243                     }
    244                     Err(err) => {
    245                         warn!("event insert failed: {:?}", err);
    246                         notice_tx
    247                             .try_send(
    248                                 "relay experienced an error trying to publish the latest event"
    249                                     .to_owned(),
    250                             )
    251                             .ok();
    252                     }
    253                 }
    254             }
    255 
    256             // use rate limit, if defined, and if an event was actually written.
    257             if event_write {
    258                 if let Some(ref lim) = lim_opt {
    259                     if let Err(n) = lim.check() {
    260                         let wait_for = n.wait_time_from(clock.now());
    261                         // check if we have recently logged rate
    262                         // limits, but print out a message only once
    263                         // per second.
    264                         if most_recent_rate_limit.elapsed().as_secs() > 10 {
    265                             warn!(
    266                                 "rate limit reached for event creation (sleep for {:?}) (suppressing future messages for 10 seconds)",
    267                                 wait_for
    268                             );
    269                             // reset last rate limit message
    270                             most_recent_rate_limit = Instant::now();
    271                         }
    272                         // block event writes, allowing them to queue up
    273                         thread::sleep(wait_for);
    274                         continue;
    275                     }
    276                 }
    277             }
    278         }
    279         info!("database connection closed");
    280         Ok(())
    281     })
    282 }
    283 
    284 /// Persist an event to the database, returning rows added.
    285 pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
    286     // start transaction
    287     let tx = conn.transaction()?;
    288     // get relevant fields from event and convert to blobs.
    289     let id_blob = hex::decode(&e.id).ok();
    290     let pubkey_blob: Option<Vec<u8>> = hex::decode(&e.pubkey).ok();
    291     let delegator_blob: Option<Vec<u8>> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok());
    292     let event_str = serde_json::to_string(&e).ok();
    293     // ignore if the event hash is a duplicate.
    294     let mut ins_count = tx.execute(
    295         "INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);",
    296         params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str]
    297     )?;
    298     if ins_count == 0 {
    299         // if the event was a duplicate, no need to insert event or
    300         // pubkey references.  This will abort the txn.
    301         return Ok(ins_count);
    302     }
    303     // remember primary key of the event most recently inserted.
    304     let ev_id = tx.last_insert_rowid();
    305     // add all tags to the tag table
    306     for tag in e.tags.iter() {
    307         // ensure we have 2 values.
    308         if tag.len() >= 2 {
    309             let tagname = &tag[0];
    310             let tagval = &tag[1];
    311             // only single-char tags are searchable
    312             let tagchar_opt = single_char_tagname(tagname);
    313             match &tagchar_opt {
    314                 Some(_) => {
    315                     // if tagvalue is lowercase hex;
    316                     if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
    317                         tx.execute(
    318 			    "INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
    319 			    params![ev_id, &tagname, hex::decode(tagval).ok()],
    320 			)?;
    321                     } else {
    322                         tx.execute(
    323                             "INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
    324                             params![ev_id, &tagname, &tagval],
    325                         )?;
    326                     }
    327                 }
    328                 None => {}
    329             }
    330         }
    331     }
    332     // if this event is replaceable update, hide every other replaceable
    333     // event with the same kind from the same author that was issued
    334     // earlier than this.
    335     if e.kind == 0 || e.kind == 3 || (e.kind >= 10000 && e.kind < 20000) {
    336         let update_count = tx.execute(
    337             "UPDATE event SET hidden=TRUE WHERE id!=? AND kind=? AND author=? AND created_at <= ? and hidden!=TRUE",
    338             params![ev_id, e.kind, hex::decode(&e.pubkey).ok(), e.created_at],
    339         )?;
    340         if update_count > 0 {
    341             info!(
    342                 "hid {} older replaceable kind {} events for author: {:?}",
    343                 update_count,
    344                 e.kind,
    345                 e.get_author_prefix()
    346             );
    347         }
    348     }
    349     // if this event is a deletion, hide the referenced events from the same author.
    350     if e.kind == 5 {
    351         let event_candidates = e.tag_values_by_name("e");
    352         // first parameter will be author
    353         let mut params: Vec<Box<dyn ToSql>> = vec![Box::new(hex::decode(&e.pubkey)?)];
    354         event_candidates
    355             .iter()
    356             .filter(|x| is_hex(x) && x.len() == 64)
    357             .filter_map(|x| hex::decode(x).ok())
    358             .for_each(|x| params.push(Box::new(x)));
    359         let query = format!(
    360             "UPDATE event SET hidden=TRUE WHERE kind!=5 AND author=? AND event_hash IN ({})",
    361             repeat_vars(params.len() - 1)
    362         );
    363         let mut stmt = tx.prepare(&query)?;
    364         let update_count = stmt.execute(rusqlite::params_from_iter(params))?;
    365         info!(
    366             "hid {} deleted events for author {:?}",
    367             update_count,
    368             e.get_author_prefix()
    369         );
    370     } else {
    371         // check if a deletion has already been recorded for this event.
    372         // Only relevant for non-deletion events
    373         let del_count = tx.query_row(
    374 	    "SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1;",
    375 	    params![pubkey_blob, id_blob], |row| row.get::<usize, usize>(0));
    376         // check if a the query returned a result, meaning we should
    377         // hid the current event
    378         if del_count.ok().is_some() {
    379             // a deletion already existed, mark original event as hidden.
    380             info!(
    381                 "hid event: {:?} due to existing deletion by author: {:?}",
    382                 e.get_event_id_prefix(),
    383                 e.get_author_prefix()
    384             );
    385             let _update_count =
    386                 tx.execute("UPDATE event SET hidden=TRUE WHERE id=?", params![ev_id])?;
    387             // event was deleted, so let caller know nothing new
    388             // arrived, preventing this from being sent to active
    389             // subscriptions
    390             ins_count = 0;
    391         }
    392     }
    393     tx.commit()?;
    394     Ok(ins_count)
    395 }
    396 
    397 /// Serialized event associated with a specific subscription request.
    398 #[derive(PartialEq, Eq, Debug, Clone)]
    399 pub struct QueryResult {
    400     /// Subscription identifier
    401     pub sub_id: String,
    402     /// Serialized event
    403     pub event: String,
    404 }
    405 
    406 /// Produce a arbitrary list of '?' parameters.
    407 fn repeat_vars(count: usize) -> String {
    408     if count == 0 {
    409         return "".to_owned();
    410     }
    411     let mut s = "?,".repeat(count);
    412     // Remove trailing comma
    413     s.pop();
    414     s
    415 }
    416 
    417 /// Create a dynamic SQL subquery and params from a subscription filter.
    418 fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
    419     // build a dynamic SQL query.  all user-input is either an integer
    420     // (sqli-safe), or a string that is filtered to only contain
    421     // hexadecimal characters.  Strings that require escaping (tag
    422     // names/values) use parameters.
    423 
    424     // if the filter is malformed, don't return anything.
    425     if f.force_no_match {
    426         let empty_query =
    427             "SELECT DISTINCT(e.content), e.created_at FROM event e WHERE 1=0".to_owned();
    428         // query parameters for SQLite
    429         let empty_params: Vec<Box<dyn ToSql>> = vec![];
    430         return (empty_query, empty_params);
    431     }
    432 
    433     let mut query = "SELECT DISTINCT(e.content), e.created_at FROM event e  ".to_owned();
    434     // query parameters for SQLite
    435     let mut params: Vec<Box<dyn ToSql>> = vec![];
    436 
    437     // individual filter components (single conditions such as an author or event ID)
    438     let mut filter_components: Vec<String> = Vec::new();
    439     // Query for "authors", allowing prefix matches
    440     if let Some(authvec) = &f.authors {
    441         // take each author and convert to a hexsearch
    442         let mut auth_searches: Vec<String> = vec![];
    443         for auth in authvec {
    444             match hex_range(auth) {
    445                 Some(HexSearch::Exact(ex)) => {
    446                     auth_searches.push("author=? OR delegated_by=?".to_owned());
    447                     params.push(Box::new(ex.clone()));
    448                     params.push(Box::new(ex));
    449                 }
    450                 Some(HexSearch::Range(lower, upper)) => {
    451                     auth_searches.push(
    452                         "(author>? AND author<?) OR (delegated_by>? AND delegated_by<?)".to_owned(),
    453                     );
    454                     params.push(Box::new(lower.clone()));
    455                     params.push(Box::new(upper.clone()));
    456                     params.push(Box::new(lower));
    457                     params.push(Box::new(upper));
    458                 }
    459                 Some(HexSearch::LowerOnly(lower)) => {
    460                     auth_searches.push("author>? OR delegated_by>?".to_owned());
    461                     params.push(Box::new(lower.clone()));
    462                     params.push(Box::new(lower));
    463                 }
    464                 None => {
    465                     info!("Could not parse hex range from author {:?}", auth);
    466                 }
    467             }
    468         }
    469         let authors_clause = format!("({})", auth_searches.join(" OR "));
    470         filter_components.push(authors_clause);
    471     }
    472     // Query for Kind
    473     if let Some(ks) = &f.kinds {
    474         // kind is number, no escaping needed
    475         let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect();
    476         let kind_clause = format!("kind IN ({})", str_kinds.join(", "));
    477         filter_components.push(kind_clause);
    478     }
    479     // Query for event, allowing prefix matches
    480     if let Some(idvec) = &f.ids {
    481         // take each author and convert to a hexsearch
    482         let mut id_searches: Vec<String> = vec![];
    483         for id in idvec {
    484             match hex_range(id) {
    485                 Some(HexSearch::Exact(ex)) => {
    486                     id_searches.push("event_hash=?".to_owned());
    487                     params.push(Box::new(ex));
    488                 }
    489                 Some(HexSearch::Range(lower, upper)) => {
    490                     id_searches.push("(event_hash>? AND event_hash<?)".to_owned());
    491                     params.push(Box::new(lower));
    492                     params.push(Box::new(upper));
    493                 }
    494                 Some(HexSearch::LowerOnly(lower)) => {
    495                     id_searches.push("event_hash>?".to_owned());
    496                     params.push(Box::new(lower));
    497                 }
    498                 None => {
    499                     info!("Could not parse hex range from id {:?}", id);
    500                 }
    501             }
    502         }
    503         let id_clause = format!("({})", id_searches.join(" OR "));
    504         filter_components.push(id_clause);
    505     }
    506     // Query for tags
    507     if let Some(map) = &f.tags {
    508         for (key, val) in map.iter() {
    509             let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
    510             let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
    511             for v in val {
    512                 if (v.len() % 2 == 0) && is_lower_hex(v) {
    513                     if let Ok(h) = hex::decode(v) {
    514                         blob_vals.push(Box::new(h));
    515                     }
    516                 } else {
    517                     str_vals.push(Box::new(v.to_owned()));
    518                 }
    519             }
    520             // create clauses with "?" params for each tag value being searched
    521             let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
    522             let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
    523             // find evidence of the target tag name/value existing for this event.
    524             let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause);
    525             // add the tag name as the first parameter
    526             params.push(Box::new(key.to_string()));
    527             // add all tag values that are plain strings as params
    528             params.append(&mut str_vals);
    529             // add all tag values that are blobs as params
    530             params.append(&mut blob_vals);
    531             filter_components.push(tag_clause);
    532         }
    533     }
    534     // Query for timestamp
    535     if f.since.is_some() {
    536         let created_clause = format!("created_at > {}", f.since.unwrap());
    537         filter_components.push(created_clause);
    538     }
    539     // Query for timestamp
    540     if f.until.is_some() {
    541         let until_clause = format!("created_at < {}", f.until.unwrap());
    542         filter_components.push(until_clause);
    543     }
    544     // never display hidden events
    545     query.push_str(" WHERE hidden!=TRUE");
    546     // build filter component conditions
    547     if !filter_components.is_empty() {
    548         query.push_str(" AND ");
    549         query.push_str(&filter_components.join(" AND "));
    550     }
    551     // Apply per-filter limit to this subquery.
    552     // The use of a LIMIT implies a DESC order, to capture only the most recent events.
    553     if let Some(lim) = f.limit {
    554         let _ = write!(query, " ORDER BY e.created_at DESC LIMIT {}", lim);
    555     } else {
    556         query.push_str(" ORDER BY e.created_at ASC")
    557     }
    558     (query, params)
    559 }
    560 
    561 /// Create a dynamic SQL query string and params from a subscription.
    562 fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
    563     // build a dynamic SQL query for an entire subscription, based on
    564     // SQL subqueries for filters.
    565     let mut subqueries: Vec<String> = Vec::new();
    566     // subquery params
    567     let mut params: Vec<Box<dyn ToSql>> = vec![];
    568     // for every filter in the subscription, generate a subquery
    569     for f in sub.filters.iter() {
    570         let (f_subquery, mut f_params) = query_from_filter(f);
    571         subqueries.push(f_subquery);
    572         params.append(&mut f_params);
    573     }
    574     // encapsulate subqueries into select statements
    575     let subqueries_selects: Vec<String> = subqueries
    576         .iter()
    577         .map(|s| format!("SELECT content, created_at FROM ({})", s))
    578         .collect();
    579     let query: String = subqueries_selects.join(" UNION ");
    580     trace!("final query string: {}", query);
    581     (query, params)
    582 }
    583 
    584 /// Perform a database query using a subscription.
    585 ///
    586 /// The [`Subscription`] is converted into a SQL query.  Each result
    587 /// is published on the `query_tx` channel as it is returned.  If a
    588 /// message becomes available on the `abandon_query_rx` channel, the
    589 /// query is immediately aborted.
    590 pub async fn db_query(
    591     sub: Subscription,
    592     client_id: String,
    593     pool: SqlitePool,
    594     query_tx: tokio::sync::mpsc::Sender<QueryResult>,
    595     mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
    596 ) {
    597     task::spawn_blocking(move || {
    598         trace!("going to query for: {:?}", sub);
    599         let mut row_count: usize = 0;
    600         let start = Instant::now();
    601         // generate SQL query
    602         let (q, p) = query_from_sub(&sub);
    603         trace!("SQL generated in {:?}", start.elapsed());
    604         // show pool stats
    605         debug!("DB pool stats: {:?}", pool.state());
    606         let start = Instant::now();
    607         if let Ok(conn) = pool.get() {
    608             // execute the query. Don't cache, since queries vary so much.
    609             let mut stmt = conn.prepare(&q)?;
    610             let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
    611             let mut first_result = true;
    612             while let Some(row) = event_rows.next()? {
    613                 if first_result {
    614                     debug!(
    615                         "time to first result: {:?} (cid={}, sub={:?})",
    616                         start.elapsed(),
    617                         client_id,
    618                         sub.id
    619                     );
    620                     first_result = false;
    621                 }
    622                 // check if this is still active
    623                 // TODO:  check every N rows
    624                 if abandon_query_rx.try_recv().is_ok() {
    625                     debug!("query aborted (sub={:?})", sub.id);
    626                     return Ok(());
    627                 }
    628                 row_count += 1;
    629                 let event_json = row.get(0)?;
    630                 query_tx
    631                     .blocking_send(QueryResult {
    632                         sub_id: sub.get_id(),
    633                         event: event_json,
    634                     })
    635                     .ok();
    636             }
    637             query_tx
    638                 .blocking_send(QueryResult {
    639                     sub_id: sub.get_id(),
    640                     event: "EOSE".to_string(),
    641                 })
    642                 .ok();
    643             debug!(
    644                 "query completed ({} rows) in {:?} (cid={}, sub={:?})",
    645                 row_count,
    646                 start.elapsed(),
    647                 client_id,
    648                 sub.id
    649             );
    650         } else {
    651             warn!("Could not get a database connection for querying");
    652         }
    653         let ok: Result<()> = Ok(());
    654         ok
    655     });
    656 }