commit 2d28a95ff7c5f81d7d1de127381e9346e8433eb7
parent 8c93ef5bc28d3240d11c5fb87eda44799d900db9
Author: Greg Heartsfield <scsibug@imap.cc>
Date: Sat, 22 Jan 2022 21:29:15 -0600
feat: allow arbitrary tag queries
This is an experimental feature, outside of any NIP, that demonstrates
generic tag queries.
Instead of limiting subscription filters to just querying only "e" or
"p" tags (via `#e` or `#p` attributes), any tag can be queried.
As an example, consider an event which uses a tag "url". With this
modification, a subscription filter could add a top-level field
"#url", with an array of strings as the key. Exact matches would be
returned.
A NIP is forthcoming to formalize this.
Diffstat:
5 files changed, 248 insertions(+), 141 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
@@ -19,7 +19,7 @@ secp256k1 = {git = "https://github.com/rust-bitcoin/rust-secp256k1.git", rev = "
serde = { version = "^1.0", features = ["derive"] }
serde_json = {version = "^1.0", features = ["preserve_order"]}
hex = "^0.4"
-rusqlite = "^0.26"
+rusqlite = { version = "^0.26", features = ["limits"]}
lazy_static = "^1.4"
governor = "^0.4"
nonzero_ext = "^0.3"
diff --git a/src/db.rs b/src/db.rs
@@ -11,6 +11,8 @@ use rusqlite::Connection;
use rusqlite::OpenFlags;
//use std::num::NonZeroU32;
use crate::config::SETTINGS;
+use rusqlite::limits::Limit;
+use rusqlite::types::ToSql;
use std::path::Path;
use std::thread;
use std::time::Instant;
@@ -34,7 +36,7 @@ PRAGMA journal_mode=WAL;
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
PRAGMA application_id = 1654008667;
-PRAGMA user_version = 2;
+PRAGMA user_version = 3;
-- Event Table
CREATE TABLE IF NOT EXISTS event (
@@ -54,6 +56,21 @@ CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
CREATE INDEX IF NOT EXISTS author_index ON event(author);
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);
+-- Tag Table
+-- Tag values are stored as either a BLOB (if they come in as a
+-- hex-string), or TEXT otherwise.
+-- This means that searches need to select the appropriate column.
+CREATE TABLE IF NOT EXISTS tag (
+id INTEGER PRIMARY KEY,
+event_id INTEGER NOT NULL, -- an event ID that contains a tag.
+name TEXT, -- the tag name ("p", "e", whatever)
+value TEXT, -- the tag value, if not hex.
+value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
+FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
+);
+CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
+CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
+
-- Event References Table
CREATE TABLE IF NOT EXISTS event_ref (
id INTEGER PRIMARY KEY,
@@ -80,19 +97,36 @@ CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey);
/// Upgrade DB to latest version, and execute pragma settings
pub fn upgrade_db(conn: &mut Connection) -> Result<()> {
// check the version.
- let curr_version = db_version(conn)?;
+ let mut curr_version = db_version(conn)?;
info!("DB version = {:?}", curr_version);
+ debug!(
+ "SQLite max query parameters: {}",
+ conn.limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER)
+ );
+ debug!(
+ "SQLite max table/blob/text length: {} MB",
+ (conn.limit(Limit::SQLITE_LIMIT_LENGTH) as f64 / (1024 * 1024) as f64).floor()
+ );
+ debug!(
+ "SQLite max SQL length: {} MB",
+ (conn.limit(Limit::SQLITE_LIMIT_SQL_LENGTH) as f64 / (1024 * 1024) as f64).floor()
+ );
+
// initialize from scratch
if curr_version == 0 {
match conn.execute_batch(INIT_SQL) {
- Ok(()) => info!("database pragma/schema initialized to v2, and ready"),
+ Ok(()) => {
+ info!("database pragma/schema initialized to v3, and ready");
+ //curr_version = 3;
+ }
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be initialized");
}
}
- } else if curr_version == 1 {
+ }
+ if curr_version == 1 {
// only change is adding a hidden column to events.
let upgrade_sql = r##"
ALTER TABLE event ADD hidden INTEGER;
@@ -100,19 +134,73 @@ UPDATE event SET hidden=FALSE;
PRAGMA user_version = 2;
"##;
match conn.execute_batch(upgrade_sql) {
- Ok(()) => info!("database schema upgraded v1 -> v2"),
+ Ok(()) => {
+ info!("database schema upgraded v1 -> v2");
+ curr_version = 2;
+ }
+ Err(err) => {
+ error!("update failed: {}", err);
+ panic!("database could not be upgraded");
+ }
+ }
+ }
+ if curr_version == 2 {
+ // this version lacks the tag column
+ debug!("database schema needs update from 2->3");
+ let upgrade_sql = r##"
+CREATE TABLE IF NOT EXISTS tag (
+id INTEGER PRIMARY KEY,
+event_id INTEGER NOT NULL, -- an event ID that contains a tag.
+name TEXT, -- the tag name ("p", "e", whatever)
+value TEXT, -- the tag value, if not hex.
+value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
+FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
+);
+CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
+CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
+PRAGMA user_version = 3;
+"##;
+ // TODO: load existing refs into tag table
+ match conn.execute_batch(upgrade_sql) {
+ Ok(()) => {
+ info!("database schema upgraded v2 -> v3");
+ //curr_version = 3;
+ }
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
- } else if curr_version == 2 {
+ info!("Starting transaction");
+ // iterate over every event/pubkey tag
+ let tx = conn.transaction()?;
+ {
+ let mut stmt = tx.prepare("select event_id, \"e\", lower(hex(referenced_event)) from event_ref union select event_id, \"p\", lower(hex(referenced_pubkey)) from pubkey_ref;")?;
+ let mut tag_rows = stmt.query([])?;
+ while let Some(row) = tag_rows.next()? {
+ // we want to capture the event_id that had the tag, the tag name, and the tag hex value.
+ let event_id: u64 = row.get(0)?;
+ let tag_name: String = row.get(1)?;
+ let tag_value: String = row.get(2)?;
+ // this will leave behind p/e tags that were non-hex, but they are invalid anyways.
+ if is_hex(&tag_value) {
+ tx.execute(
+ "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
+ params![event_id, tag_name, hex::decode(&tag_value).ok()],
+ )?;
+ }
+ }
+ }
+ tx.commit()?;
+ info!("Upgrade complete");
+ } else if curr_version == 3 {
debug!("Database version was already current");
- } else if curr_version > 2 {
+ } else if curr_version > 3 {
panic!("Database version is newer than supported by this executable");
}
// Setup PRAGMA
conn.execute_batch(STARTUP_SQL)?;
+ info!("Finished pragma");
Ok(())
}
@@ -134,6 +222,7 @@ pub async fn db_writer(
)?;
info!("opened database {:?} for writing", full_path);
upgrade_db(&mut conn)?;
+
// get rate limit settings
let rps_setting = config.limits.messages_per_sec;
let mut most_recent_rate_limit = Instant::now();
@@ -234,24 +323,24 @@ pub fn write_event(conn: &mut Connection, e: &Event) -> Result<usize> {
}
// remember primary key of the event most recently inserted.
let ev_id = tx.last_insert_rowid();
- // add all event tags into the event_ref table
- let etags = e.get_event_tags();
- if !etags.is_empty() {
- for etag in etags.iter() {
- tx.execute(
- "INSERT OR IGNORE INTO event_ref (event_id, referenced_event) VALUES (?1, ?2)",
- params![ev_id, hex::decode(&etag).ok()],
- )?;
- }
- }
- // add all event tags into the pubkey_ref table
- let ptags = e.get_pubkey_tags();
- if !ptags.is_empty() {
- for ptag in ptags.iter() {
- tx.execute(
- "INSERT OR IGNORE INTO pubkey_ref (event_id, referenced_pubkey) VALUES (?1, ?2)",
- params![ev_id, hex::decode(&ptag).ok()],
- )?;
+ // add all tags to the tag table
+ for tag in e.tags.iter() {
+ // ensure we have 2 values.
+ if tag.len() >= 2 {
+ let tagname = &tag[0];
+ let tagval = &tag[1];
+ // if tagvalue is hex;
+ if is_hex(tagval) {
+ tx.execute(
+ "INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
+ params![ev_id, &tagname, hex::decode(&tagval).ok()],
+ )?;
+ } else {
+ tx.execute(
+ "INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
+ params![ev_id, &tagname, &tagval],
+ )?;
+ }
}
}
// if this event is for a metadata update, hide every other kind=0
@@ -294,14 +383,27 @@ fn is_hex(s: &str) -> bool {
s.chars().all(|x| char::is_ascii_hexdigit(&x))
}
-/// Create a dynamic SQL query string from a subscription.
-fn query_from_sub(sub: &Subscription) -> String {
+fn repeat_vars(count: usize) -> String {
+ if count == 0 {
+ return "".to_owned();
+ }
+ let mut s = "?,".repeat(count);
+ // Remove trailing comma
+ s.pop();
+ s
+}
+
+/// Create a dynamic SQL query string and params from a subscription.
+fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
// build a dynamic SQL query. all user-input is either an integer
// (sqli-safe), or a string that is filtered to only contain
- // hexadecimal characters.
+ // hexadecimal characters. Strings that require escaping (tag
+ // names/values) use parameters.
let mut query =
- "SELECT DISTINCT(e.content) FROM event e LEFT JOIN event_ref er ON e.id=er.event_id LEFT JOIN pubkey_ref pr ON e.id=pr.event_id "
- .to_owned();
+ "SELECT DISTINCT(e.content) FROM event e LEFT JOIN tag t ON e.id=t.event_id ".to_owned();
+ // parameters
+ let mut params: Vec<Box<dyn ToSql>> = vec![];
+
// for every filter in the subscription, generate a where clause
let mut filter_clauses: Vec<String> = Vec::new();
for f in sub.filters.iter() {
@@ -340,33 +442,33 @@ fn query_from_sub(sub: &Subscription) -> String {
let id_clause = format!("event_hash IN ({})", ids_escaped.join(", "));
filter_components.push(id_clause);
}
- // Query for referenced event
- if f.events.is_some() {
- let events_escaped: Vec<String> = f
- .events
- .as_ref()
- .unwrap()
- .iter()
- .filter(|&x| is_hex(x))
- .map(|x| format!("x'{}'", x))
- .collect();
- let events_clause = format!("referenced_event IN ({})", events_escaped.join(", "));
- filter_components.push(events_clause);
- }
- // Query for referenced pubkey
- if f.pubkeys.is_some() {
- let pubkeys_escaped: Vec<String> = f
- .pubkeys
- .as_ref()
- .unwrap()
- .iter()
- .filter(|&x| is_hex(x))
- .map(|x| format!("x'{}'", x))
- .collect();
- let pubkeys_clause = format!("referenced_pubkey IN ({})", pubkeys_escaped.join(", "));
- filter_components.push(pubkeys_clause);
+ // Query for tags
+ if let Some(map) = &f.tags {
+ for (key, val) in map.iter() {
+ let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
+ let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
+ for v in val {
+ if is_hex(v) {
+ if let Ok(h) = hex::decode(&v) {
+ blob_vals.push(Box::new(h));
+ }
+ } else {
+ str_vals.push(Box::new(v.to_owned()));
+ }
+ }
+ // create clauses with "?" params for each tag value being searched
+ let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
+ let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
+ let tag_clause = format!("(name=? AND ({} OR {}))", str_clause, blob_clause);
+ // add the tag name as the first parameter
+ params.push(Box::new(key.to_owned()));
+ // add all tag values that are plain strings as params
+ params.append(&mut str_vals);
+ // add all tag values that are blobs as params
+ params.append(&mut blob_vals);
+ filter_components.push(tag_clause);
+ }
}
-
// Query for timestamp
if f.since.is_some() {
let created_clause = format!("created_at > {}", f.since.unwrap());
@@ -398,7 +500,7 @@ fn query_from_sub(sub: &Subscription) -> String {
// add order clause
query.push_str(" ORDER BY created_at ASC");
debug!("query string: {}", query);
- query
+ (query, params)
}
/// Perform a database query using a subscription.
@@ -423,10 +525,10 @@ pub async fn db_query(
let mut row_count: usize = 0;
let start = Instant::now();
// generate SQL query
- let q = query_from_sub(&sub);
+ let (q, p) = query_from_sub(&sub);
// execute the query
let mut stmt = conn.prepare(&q)?;
- let mut event_rows = stmt.query([])?;
+ let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
while let Some(row) = event_rows.next()? {
// check if this is still active (we could do this every N rows)
if abandon_query_rx.try_recv().is_ok() {
@@ -448,6 +550,6 @@ pub async fn db_query(
start.elapsed()
);
let ok: Result<()> = Ok(());
- return ok;
+ ok
});
}
diff --git a/src/event.rs b/src/event.rs
@@ -193,39 +193,8 @@ impl Event {
serde_json::Value::Array(tags)
}
- /// Get a list of event tags.
- pub fn get_event_tags(&self) -> Vec<&str> {
- let mut etags = vec![];
- for t in self.tags.iter() {
- if t.len() >= 2 && t.get(0).unwrap() == "e" {
- etags.push(&t.get(1).unwrap()[..]);
- }
- }
- etags
- }
-
- /// Get a list of pubkey/petname tags.
- pub fn get_pubkey_tags(&self) -> Vec<&str> {
- let mut ptags = vec![];
- for t in self.tags.iter() {
- if t.len() >= 2 && t.get(0).unwrap() == "p" {
- ptags.push(&t.get(1).unwrap()[..]);
- }
- }
- ptags
- }
-
- /// Check if a given event is referenced in an event tag.
- pub fn event_tag_match(&self, eventid: &str) -> bool {
- self.get_event_tags().contains(&eventid)
- }
-
- /// Check if a given event is referenced in an event tag.
- pub fn pubkey_tag_match(&self, pubkey: &str) -> bool {
- self.get_pubkey_tags().contains(&pubkey)
- }
-
/// Generic tag match
+ // TODO: is this used anywhere?
pub fn generic_tag_match(&self, tagname: &str, tagvalue: &str) -> bool {
match &self.tagidx {
Some(idx) => {
diff --git a/src/protostream.rs b/src/protostream.rs
@@ -81,8 +81,14 @@ impl Stream for NostrStream {
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(v)) => match v {
Ok(Message::Text(vs)) => Poll::Ready(Some(convert(vs))),
+ Ok(Message::Ping(_x)) => {
+ debug!("client ping");
+ //Pin::new(&mut self.ws_stream).start_send(Message::Pong(x));
+ //info!("sent pong");
+ Poll::Pending
+ }
Ok(Message::Binary(_)) => Poll::Ready(Some(Err(Error::ProtoParseError))),
- Ok(Message::Pong(_)) | Ok(Message::Ping(_)) => Poll::Pending,
+ Ok(Message::Pong(_)) => Poll::Pending,
Ok(Message::Close(_)) => Poll::Ready(None),
Err(WsError::AlreadyClosed) | Err(WsError::ConnectionClosed) => Poll::Ready(None),
Err(_) => Poll::Ready(Some(Err(Error::ConnError))),
diff --git a/src/subscription.rs b/src/subscription.rs
@@ -1,7 +1,10 @@
//! Subscription and filter parsing
use crate::error::Result;
use crate::event::Event;
+use serde::de::Unexpected;
use serde::{Deserialize, Deserializer, Serialize};
+use serde_json::Value;
+use std::collections::HashMap;
use std::collections::HashSet;
/// Subscription identifier and set of request filters
@@ -16,30 +19,76 @@ pub struct Subscription {
/// Corresponds to client-provided subscription request elements. Any
/// element can be present if it should be used in filtering, or
/// absent ([`None`]) if it should be ignored.
-#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
+#[derive(Serialize, PartialEq, Debug, Clone)]
pub struct ReqFilter {
/// Event hashes
pub ids: Option<Vec<String>>,
/// Event kinds
pub kinds: Option<Vec<u64>>,
- /// Referenced event hash
- #[serde(rename = "#e")]
- pub events: Option<Vec<String>>,
- /// Referenced public key for a petname
- #[serde(rename = "#p")]
- pub pubkeys: Option<Vec<String>>,
/// Events published after this time
pub since: Option<u64>,
/// Events published before this time
pub until: Option<u64>,
/// List of author public keys
pub authors: Option<Vec<String>>,
- /// Set of event tags, for quick indexing
- #[serde(skip)]
- event_tag_set: Option<HashSet<String>>,
- /// Set of pubkey tags, for quick indexing
+ /// Set of tags
#[serde(skip)]
- pubkey_tag_set: Option<HashSet<String>>,
+ pub tags: Option<HashMap<String, HashSet<String>>>,
+}
+
+impl<'de> Deserialize<'de> for ReqFilter {
+ fn deserialize<D>(deserializer: D) -> Result<ReqFilter, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let received: Value = Deserialize::deserialize(deserializer)?;
+ let filter = received.as_object().ok_or_else(|| {
+ serde::de::Error::invalid_type(
+ Unexpected::Other("reqfilter is not an object"),
+ &"a json object",
+ )
+ })?;
+ let mut rf = ReqFilter {
+ ids: None,
+ kinds: None,
+ since: None,
+ until: None,
+ authors: None,
+ tags: None,
+ };
+ let mut ts = None;
+ // iterate through each key, and assign values that exist
+ for (key, val) in filter.into_iter() {
+ // ids
+ if key == "ids" {
+ rf.ids = Deserialize::deserialize(val).ok();
+ } else if key == "kinds" {
+ rf.kinds = Deserialize::deserialize(val).ok();
+ } else if key == "since" {
+ rf.since = Deserialize::deserialize(val).ok();
+ } else if key == "until" {
+ rf.until = Deserialize::deserialize(val).ok();
+ } else if key == "authors" {
+ rf.authors = Deserialize::deserialize(val).ok();
+ } else if key.starts_with('#') && key.len() > 1 && val.is_array() {
+ // remove the prefix
+ let tagname = &key[1..];
+ if ts.is_none() {
+ // Initialize the tag if necessary
+ ts = Some(HashMap::new());
+ }
+ if let Some(m) = ts.as_mut() {
+ let tag_vals: Option<Vec<String>> = Deserialize::deserialize(val).ok();
+ if let Some(v) = tag_vals {
+ let hs = HashSet::from_iter(v.into_iter());
+ m.insert(tagname.to_owned(), hs);
+ }
+ };
+ }
+ }
+ rf.tags = ts;
+ Ok(rf)
+ }
}
impl<'de> Deserialize<'de> for Subscription {
@@ -49,7 +98,7 @@ impl<'de> Deserialize<'de> for Subscription {
where
D: Deserializer<'de>,
{
- let mut v: serde_json::Value = Deserialize::deserialize(deserializer)?;
+ let mut v: Value = Deserialize::deserialize(deserializer)?;
// this shoud be a 3-or-more element array.
// verify the first element is a String, REQ
// get the subscription from the second element.
@@ -82,10 +131,9 @@ impl<'de> Deserialize<'de> for Subscription {
let mut filters = vec![];
for fv in i {
- let mut f: ReqFilter = serde_json::from_value(fv.take())
+ let f: ReqFilter = serde_json::from_value(fv.take())
.map_err(|_| serde::de::Error::custom("could not parse filter"))?;
// create indexes
- f.update_tag_indexes();
filters.push(f);
}
Ok(Subscription {
@@ -113,15 +161,6 @@ impl Subscription {
}
impl ReqFilter {
- /// Update pubkey and event indexes
- fn update_tag_indexes(&mut self) {
- if let Some(event_vec) = &self.events {
- self.event_tag_set = Some(HashSet::from_iter(event_vec.iter().cloned()));
- }
- if let Some(pubkey_vec) = &self.pubkeys {
- self.pubkey_tag_set = Some(HashSet::from_iter(pubkey_vec.iter().cloned()));
- }
- }
/// Check for a match within the authors list.
fn ids_match(&self, event: &Event) -> bool {
self.ids
@@ -137,28 +176,20 @@ impl ReqFilter {
.unwrap_or(true)
}
- /// Check if this filter either matches, or does not care about the event tags.
- fn event_match(&self, event: &Event) -> bool {
- // an event match is performed by looking at the ReqFilter events field, and sending a hashset to the event to intersect with.
- if let Some(es) = &self.event_tag_set {
- // if there exists event tags in this filter, find if any intersect.
- event.generic_tag_val_intersect("e", es)
- } else {
- // if no event tags were requested in a filter, we do match
- true
- }
- }
-
- /// Check if this filter either matches, or does not care about the event tags.
- fn pubkey_match(&self, event: &Event) -> bool {
- // an event match is performed by looking at the ReqFilter events field, and sending a hashset to the event to intersect with.
- if let Some(ps) = &self.pubkey_tag_set {
- // if there exists event tags in this filter, find if any intersect.
- event.generic_tag_val_intersect("p", ps)
- } else {
- // if no event tags were requested in a filter, we do match
- true
+ fn tag_match(&self, event: &Event) -> bool {
+ // get the hashset from the filter.
+ if let Some(map) = &self.tags {
+ for (key, val) in map.iter() {
+ let tag_match = event.generic_tag_val_intersect(key, val);
+ // if there is no match for this tag, the match fails.
+ if !tag_match {
+ return false;
+ }
+ // if there was a match, we move on to the next one.
+ }
}
+ // if the tag map is empty, the match succeeds (there was no filter)
+ true
}
/// Check if this filter either matches, or does not care about the kind.
@@ -176,8 +207,7 @@ impl ReqFilter {
&& self.since.map(|t| event.created_at > t).unwrap_or(true)
&& self.kind_match(event.kind)
&& self.authors_match(event)
- && self.pubkey_match(event)
- && self.event_match(event)
+ && self.tag_match(event)
}
}