nostr-rs-relay

My dev fork of nostr-rs-relay
git clone git://jb55.com/nostr-rs-relay
Log | Files | Refs | README | LICENSE

commit 035cf34673ae23407bda6656eef505b3178482ab
parent be8170342edb6af1cbd3974a22f65d53e5c306e0
Author: Greg Heartsfield <scsibug@imap.cc>
Date:   Wed, 17 Aug 2022 16:34:11 -0700

fix(NIP-12): correctly search for mixed-case hex-like tags

Only lowercase and even-length tag values are stored as binary BLOBs.
Previously there was an error which search results from being returned
if the tag value was mixed-case and could be interpreted as hex.

A new database migration has been created to repair the `tag` table
for existing relays.

fixes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/37

Diffstat:
Msrc/db.rs | 60++++++++++++++++++++++++++++++++++++------------------------
Msrc/event.rs | 6+++---
Msrc/main.rs | 23+++++++++++------------
Msrc/schema.rs | 75++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
Msrc/subscription.rs | 3---
Msrc/utils.rs | 18++++++++++++++++++
6 files changed, 132 insertions(+), 53 deletions(-)

diff --git a/src/db.rs b/src/db.rs @@ -2,14 +2,14 @@ use crate::config::SETTINGS; use crate::error::Error; use crate::error::Result; -use crate::event::Event; +use crate::event::{single_char_tagname, Event}; use crate::hexrange::hex_range; use crate::hexrange::HexSearch; use crate::nip05; use crate::schema::{upgrade_db, STARTUP_SQL}; use crate::subscription::ReqFilter; use crate::subscription::Subscription; -use crate::utils::is_hex; +use crate::utils::{is_hex, is_lower_hex}; use governor::clock::Clock; use governor::{Quota, RateLimiter}; use hex; @@ -300,17 +300,24 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> { if tag.len() >= 2 { let tagname = &tag[0]; let tagval = &tag[1]; - // if tagvalue is hex; - if is_hex(tagval) { - tx.execute( - "INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)", - params![ev_id, &tagname, hex::decode(&tagval).ok()], - )?; - } else { - tx.execute( - "INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)", - params![ev_id, &tagname, &tagval], - )?; + // only single-char tags are searchable + let tagchar_opt = single_char_tagname(tagname); + match &tagchar_opt { + Some(_) => { + // if tagvalue is lowercase hex; + if is_lower_hex(&tagval) && (tagval.len() % 2 == 0) { + tx.execute( + "INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)", + params![ev_id, &tagname, hex::decode(&tagval).ok()], + )?; + } else { + tx.execute( + "INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)", + params![ev_id, &tagname, &tagval], + )?; + } + } + None => {} } } } @@ -388,16 +395,13 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) { // if the filter is malformed, don't return anything. if f.force_no_match { let empty_query = - "SELECT DISTINCT(e.content), e.created_at FROM event e WHERE 1=0" - .to_owned(); + "SELECT DISTINCT(e.content), e.created_at FROM event e WHERE 1=0".to_owned(); // query parameters for SQLite let empty_params: Vec<Box<dyn ToSql>> = vec![]; return (empty_query, empty_params); } - let mut query = - "SELECT DISTINCT(e.content), e.created_at FROM event e " - .to_owned(); + let mut query = "SELECT DISTINCT(e.content), e.created_at FROM event e ".to_owned(); // query parameters for SQLite let mut params: Vec<Box<dyn ToSql>> = vec![]; @@ -470,7 +474,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) { let mut str_vals: Vec<Box<dyn ToSql>> = vec![]; let mut blob_vals: Vec<Box<dyn ToSql>> = vec![]; for v in val { - if is_hex(v) { + if (v.len()%2==0) && is_lower_hex(v) { if let Ok(h) = hex::decode(&v) { blob_vals.push(Box::new(h)); } @@ -481,7 +485,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) { // create clauses with "?" params for each tag value being searched let str_clause = format!("value IN ({})", repeat_vars(str_vals.len())); let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len())); - // find evidence of the target tag name/value existing for this event. + // find evidence of the target tag name/value existing for this event. let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause); // add the tag name as the first parameter params.push(Box::new(key.to_string())); @@ -552,6 +556,7 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) { /// query is immediately aborted. pub async fn db_query( sub: Subscription, + client_id: String, pool: SqlitePool, query_tx: tokio::sync::mpsc::Sender<QueryResult>, mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>, @@ -573,13 +578,18 @@ pub async fn db_query( let mut first_result = true; while let Some(row) = event_rows.next()? { if first_result { - debug!("time to first result: {:?}", start.elapsed()); + debug!( + "time to first result: {:?} (cid={}, sub={:?})", + start.elapsed(), + client_id, + sub.id + ); first_result = false; } // check if this is still active // TODO: check every N rows if abandon_query_rx.try_recv().is_ok() { - debug!("query aborted"); + debug!("query aborted (sub={:?})", sub.id); return Ok(()); } row_count += 1; @@ -598,9 +608,11 @@ pub async fn db_query( }) .ok(); debug!( - "query completed ({} rows) in {:?}", + "query completed ({} rows) in {:?} (cid={}, sub={:?})", row_count, - start.elapsed() + start.elapsed(), + client_id, + sub.id ); } else { warn!("Could not get a database connection for querying"); diff --git a/src/event.rs b/src/event.rs @@ -57,7 +57,7 @@ where } /// Attempt to form a single-char tag name. -fn single_char_tagname(tagname: &str) -> Option<char> { +pub fn single_char_tagname(tagname: &str) -> Option<char> { // We return the tag character if and only if the tagname consists // of a single char. let mut tagnamechars = tagname.chars(); @@ -301,7 +301,7 @@ mod tests { fn empty_event_tag_match() -> Result<()> { let event = simple_event(); assert!(!event - .generic_tag_val_intersect("e", &HashSet::from(["foo".to_owned(), "bar".to_owned()]))); + .generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()]))); Ok(()) } @@ -312,7 +312,7 @@ mod tests { event.build_index(); assert_eq!( event.generic_tag_val_intersect( - "e", + 'e', &HashSet::from(["foo".to_owned(), "bar".to_owned()]) ), true diff --git a/src/main.rs b/src/main.rs @@ -1,4 +1,3 @@ - //! Server process use futures::SinkExt; use futures::StreamExt; @@ -133,19 +132,19 @@ async fn handle_web_request( let rinfo = RelayInfo::from(config.info.clone()); let b = Body::from(serde_json::to_string_pretty(&rinfo).unwrap()); return Ok(Response::builder() - .status(200) - .header("Content-Type", "application/nostr+json") - .header("Access-Control-Allow-Origin", "*") - .body(b) - .unwrap()); + .status(200) + .header("Content-Type", "application/nostr+json") + .header("Access-Control-Allow-Origin", "*") + .body(b) + .unwrap()); } } } - Ok(Response::builder() - .status(200) - .header("Content-Type", "text/plain") - .body(Body::from("Please use a Nostr client to connect.")).unwrap() - ) + Ok(Response::builder() + .status(200) + .header("Content-Type", "text/plain") + .body(Body::from("Please use a Nostr client to connect.")) + .unwrap()) } (_, _) => { //handle any other url @@ -533,7 +532,7 @@ async fn nostr_server( previous_query.send(()).ok(); } // start a database query - db::db_query(s, pool.clone(), query_tx.clone(), abandon_query_rx).await; + db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await; }, Err(e) => { info!("Subscription error: {}", e); diff --git a/src/schema.rs b/src/schema.rs @@ -1,13 +1,13 @@ //! Database schema and migrations use crate::db::PooledConnection; use crate::error::Result; -use crate::utils::is_hex; +use crate::event::{single_char_tagname, Event}; +use crate::utils::is_lower_hex; use log::*; use rusqlite::limits::Limit; use rusqlite::params; use rusqlite::Connection; - -// TODO: drop the pubkey_ref and event_ref tables +use std::time::Instant; /// Startup DB Pragmas pub const STARTUP_SQL: &str = r##" @@ -24,7 +24,7 @@ PRAGMA journal_mode=WAL; PRAGMA main.synchronous=NORMAL; PRAGMA foreign_keys = ON; PRAGMA application_id = 1654008667; -PRAGMA user_version = 5; +PRAGMA user_version = 6; -- Event Table CREATE TABLE IF NOT EXISTS event ( @@ -53,7 +53,7 @@ id INTEGER PRIMARY KEY, event_id INTEGER NOT NULL, -- an event ID that contains a tag. name TEXT, -- the tag name ("p", "e", whatever) value TEXT, -- the tag value, if not hex. -value_hex BLOB, -- the tag value, if it can be interpreted as a hex string. +value_hex BLOB, -- the tag value, if it can be interpreted as a lowercase hex string. FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value); @@ -103,7 +103,7 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> { if curr_version == 0 { match conn.execute_batch(INIT_SQL) { Ok(()) => { - info!("database pragma/schema initialized to v4, and ready"); + info!("database pragma/schema initialized to v6, and ready"); } Err(err) => { error!("update failed: {}", err); @@ -154,7 +154,6 @@ PRAGMA user_version = 3; panic!("database could not be upgraded"); } } - info!("Starting transaction"); // iterate over every event/pubkey tag let tx = conn.transaction()?; { @@ -166,7 +165,7 @@ PRAGMA user_version = 3; let tag_name: String = row.get(1)?; let tag_value: String = row.get(2)?; // this will leave behind p/e tags that were non-hex, but they are invalid anyways. - if is_hex(&tag_value) { + if is_lower_hex(&tag_value) { tx.execute( "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", params![event_id, tag_name, hex::decode(&tag_value).ok()], @@ -225,9 +224,63 @@ PRAGMA user_version=5; } } } else if curr_version == 5 { - debug!("Database version was already current"); - } else if curr_version > 5 { - panic!("Database version is newer than supported by this executable"); + info!("database schema needs update from 5->6"); + // We need to rebuild the tags table. iterate through the + // event table. build event from json, insert tags into a + // fresh tag table. This was needed due to a logic error in + // how hex-like tags got indexed. + let start = Instant::now(); + let tx = conn.transaction()?; + { + // Clear out table + tx.execute("DELETE FROM tag;", [])?; + let mut stmt = tx.prepare("select id, content from event order by id;")?; + let mut tag_rows = stmt.query([])?; + while let Some(row) = tag_rows.next()? { + // we want to capture the event_id that had the tag, the tag name, and the tag hex value. + let event_id: u64 = row.get(0)?; + let event_json: String = row.get(1)?; + let event: Event = serde_json::from_str(&event_json)?; + // look at each event, and each tag, creating new tag entries if appropriate. + for t in event.tags.iter().filter(|x| x.len() > 1) { + let tagname = t.get(0).unwrap(); + let tagnamechar_opt = single_char_tagname(tagname); + if tagnamechar_opt.is_none() { + continue; + } + // safe because len was > 1 + let tagval = t.get(1).unwrap(); + // insert as BLOB if we can restore it losslessly. + // this means it needs to be even length and lowercase. + if (tagval.len() % 2 == 0) && is_lower_hex(&tagval) { + tx.execute( + "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", + params![event_id, tagname, hex::decode(&tagval).ok()], + )?; + } else { + // otherwise, insert as text + tx.execute( + "INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);", + params![event_id, tagname, &tagval], + )?; + } + } + } + tx.execute("PRAGMA user_version = 6;", [])?; + } + tx.commit()?; + info!("database schema upgraded v5 -> v6 in {:?}", start.elapsed()); + // vacuum after large table modification + let start = Instant::now(); + conn.execute("VACUUM;", [])?; + info!("vacuumed DB after tags rebuild in {:?}", start.elapsed()); + } else if curr_version == 6 { + debug!("Database version was already current (v6)"); + } else if curr_version > 7 { + panic!( + "Database version is newer than supported by this executable (v{})", + curr_version + ); } // Setup PRAGMA diff --git a/src/subscription.rs b/src/subscription.rs @@ -1,7 +1,6 @@ //! Subscription and filter parsing use crate::error::Result; use crate::event::Event; -use log::*; use serde::de::Unexpected; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; @@ -80,9 +79,7 @@ impl<'de> Deserialize<'de> for ReqFilter { } else if key == "authors" { rf.authors = Deserialize::deserialize(val).ok(); } else if key.starts_with('#') && key.len() > 1 && val.is_array() { - info!("testing tag search char: {}", key); if let Some(tag_search) = tag_search_char_from_filter(key) { - info!("found a character from the tag search: {}", tag_search); if ts.is_none() { // Initialize the tag if necessary ts = Some(HashMap::new()); diff --git a/src/utils.rs b/src/utils.rs @@ -13,3 +13,21 @@ pub fn unix_time() -> u64 { pub fn is_hex(s: &str) -> bool { s.chars().all(|x| char::is_ascii_hexdigit(&x)) } + +/// Check if a string contains only lower-case hex chars. +pub fn is_lower_hex(s: &str) -> bool { + s.chars().all(|x| { + (char::is_ascii_lowercase(&x) || char::is_ascii_digit(&x)) && char::is_ascii_hexdigit(&x) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn lower_hex() { + let hexstr = "abcd0123"; + assert_eq!(is_lower_hex(hexstr), true); + } +}