nostr-rs-relay

My dev fork of nostr-rs-relay
git clone git://jb55.com/nostr-rs-relay
Log | Files | Refs | README | LICENSE

commit 2af5f9fbe88405c148ad20538d00013e5d410383
parent 2739e4936243e67d73d6fff4821d39b0890251b7
Author: Greg Heartsfield <scsibug@imap.cc>
Date:   Sun,  9 Oct 2022 08:24:01 -0500

fix: correct schema upgrade logic (and refactor)

Schema upgrades were buggy from 4->5 (the v5 would be skipped).  This
change also refactors the logic slightly so that future additions can
be clearer (no need to have if and else-if combinations).

Diffstat:
MCargo.lock | 27+++++++++++++++++++++++++++
MCargo.toml | 1+
Msrc/schema.rs | 306++++++++++++++++++++++++++++++++++++++++++-------------------------------------
3 files changed, 192 insertions(+), 142 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -287,6 +287,26 @@ dependencies = [ ] [[package]] +name = "const_format" +version = "0.2.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79f926bc2341a80e6bb5a16e18057c8c90ca3edbdeb9fa497bd0f82b1f4df4e6" +dependencies = [ + "const_format_proc_macros", +] + +[[package]] +name = "const_format_proc_macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef196d5d972878a48da7decb7686eded338b4858fbabeed513d63a7c98b2b82d" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] name = "core-foundation" version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1008,6 +1028,7 @@ dependencies = [ "bitcoin_hashes", "config", "console-subscriber", + "const_format", "futures", "futures-util", "governor", @@ -2217,6 +2238,12 @@ dependencies = [ ] [[package]] +name = "unicode-xid" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" + +[[package]] name = "url" version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/Cargo.toml b/Cargo.toml @@ -31,6 +31,7 @@ hyper-tls = "0.5" http = { version = "0.2" } parse_duration = "2" rand = "0.8" +const_format = "0.2.28" [dev-dependencies] anyhow = "1" diff --git a/src/schema.rs b/src/schema.rs @@ -3,9 +3,11 @@ use crate::db::PooledConnection; use crate::error::Result; use crate::event::{single_char_tagname, Event}; use crate::utils::is_lower_hex; +use const_format::formatcp; use rusqlite::limits::Limit; use rusqlite::params; use rusqlite::Connection; +use std::cmp::Ordering; use std::time::Instant; use tracing::{debug, error, info}; @@ -17,15 +19,19 @@ PRAGMA journal_size_limit=32768; pragma mmap_size = 536870912; -- 512MB of mmap "##; +/// Latest database version +pub const DB_VERSION: usize = 6; + /// Schema definition -const INIT_SQL: &str = r##" +const INIT_SQL: &str = formatcp!( + r##" -- Database settings PRAGMA encoding = "UTF-8"; PRAGMA journal_mode=WAL; PRAGMA main.synchronous=NORMAL; PRAGMA foreign_keys = ON; PRAGMA application_id = 1654008667; -PRAGMA user_version = 6; +PRAGMA user_version = {}; -- Event Table CREATE TABLE IF NOT EXISTS event ( @@ -72,7 +78,9 @@ FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CAS ); CREATE INDEX IF NOT EXISTS user_verification_name_index ON user_verification(name); CREATE INDEX IF NOT EXISTS user_verification_event_index ON user_verification(metadata_event); -"##; +"##, + DB_VERSION +); /// Determine the current application database schema version. pub fn db_version(conn: &mut Connection) -> Result<usize> { @@ -100,40 +108,45 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> { (conn.limit(Limit::SQLITE_LIMIT_SQL_LENGTH) as f64 / (1024 * 1024) as f64).floor() ); - // initialize from scratch - if curr_version == 0 { - match conn.execute_batch(INIT_SQL) { - Ok(()) => { - info!("database pragma/schema initialized to v6, and ready"); - } - Err(err) => { - error!("update failed: {}", err); - panic!("database could not be initialized"); + match curr_version.cmp(&DB_VERSION) { + // Database is new or not current + Ordering::Less => { + // initialize from scratch + if curr_version == 0 { + match conn.execute_batch(INIT_SQL) { + Ok(()) => { + info!("database pragma/schema initialized to v6, and ready"); + } + Err(err) => { + error!("update failed: {}", err); + panic!("database could not be initialized"); + } + } } - } - } - if curr_version == 1 { - // only change is adding a hidden column to events. - let upgrade_sql = r##" + + if curr_version == 1 { + // only change is adding a hidden column to events. + let upgrade_sql = r##" ALTER TABLE event ADD hidden INTEGER; UPDATE event SET hidden=FALSE; PRAGMA user_version = 2; "##; - match conn.execute_batch(upgrade_sql) { - Ok(()) => { - info!("database schema upgraded v1 -> v2"); - curr_version = 2; - } - Err(err) => { - error!("update failed: {}", err); - panic!("database could not be upgraded"); + match conn.execute_batch(upgrade_sql) { + Ok(()) => { + info!("database schema upgraded v1 -> v2"); + curr_version = 2; + } + Err(err) => { + error!("update failed: {}", err); + panic!("database could not be upgraded"); + } + } } - } - } - if curr_version == 2 { - // this version lacks the tag column - info!("database schema needs update from 2->3"); - let upgrade_sql = r##" + + if curr_version == 2 { + // this version lacks the tag column + info!("database schema needs update from 2->3"); + let upgrade_sql = r##" CREATE TABLE IF NOT EXISTS tag ( id INTEGER PRIMARY KEY, event_id INTEGER NOT NULL, -- an event ID that contains a tag. @@ -144,42 +157,43 @@ FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE ); PRAGMA user_version = 3; "##; - // TODO: load existing refs into tag table - match conn.execute_batch(upgrade_sql) { - Ok(()) => { - info!("database schema upgraded v2 -> v3"); - curr_version = 3; - } - Err(err) => { - error!("update failed: {}", err); - panic!("database could not be upgraded"); - } - } - // iterate over every event/pubkey tag - let tx = conn.transaction()?; - { - let mut stmt = tx.prepare("select event_id, \"e\", lower(hex(referenced_event)) from event_ref union select event_id, \"p\", lower(hex(referenced_pubkey)) from pubkey_ref;")?; - let mut tag_rows = stmt.query([])?; - while let Some(row) = tag_rows.next()? { - // we want to capture the event_id that had the tag, the tag name, and the tag hex value. - let event_id: u64 = row.get(0)?; - let tag_name: String = row.get(1)?; - let tag_value: String = row.get(2)?; - // this will leave behind p/e tags that were non-hex, but they are invalid anyways. - if is_lower_hex(&tag_value) { - tx.execute( - "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", - params![event_id, tag_name, hex::decode(&tag_value).ok()], - )?; + // TODO: load existing refs into tag table + match conn.execute_batch(upgrade_sql) { + Ok(()) => { + info!("database schema upgraded v2 -> v3"); + curr_version = 3; + } + Err(err) => { + error!("update failed: {}", err); + panic!("database could not be upgraded"); + } + } + // iterate over every event/pubkey tag + let tx = conn.transaction()?; + { + let mut stmt = tx.prepare("select event_id, \"e\", lower(hex(referenced_event)) from event_ref union select event_id, \"p\", lower(hex(referenced_pubkey)) from pubkey_ref;")?; + let mut tag_rows = stmt.query([])?; + while let Some(row) = tag_rows.next()? { + // we want to capture the event_id that had the tag, the tag name, and the tag hex value. + let event_id: u64 = row.get(0)?; + let tag_name: String = row.get(1)?; + let tag_value: String = row.get(2)?; + // this will leave behind p/e tags that were non-hex, but they are invalid anyways. + if is_lower_hex(&tag_value) { + tx.execute( + "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", + params![event_id, tag_name, hex::decode(&tag_value).ok()], + )?; + } + } } + tx.commit()?; + info!("Upgrade complete"); } - } - tx.commit()?; - info!("Upgrade complete"); - } - if curr_version == 3 { - info!("database schema needs update from 3->4"); - let upgrade_sql = r##" + + if curr_version == 3 { + info!("database schema needs update from 3->4"); + let upgrade_sql = r##" -- incoming metadata events with nip05 CREATE TABLE IF NOT EXISTS user_verification ( id INTEGER PRIMARY KEY, @@ -194,94 +208,102 @@ CREATE INDEX IF NOT EXISTS user_verification_name_index ON user_verification(nam CREATE INDEX IF NOT EXISTS user_verification_event_index ON user_verification(metadata_event); PRAGMA user_version = 4; "##; - match conn.execute_batch(upgrade_sql) { - Ok(()) => { - info!("database schema upgraded v3 -> v4"); - curr_version = 4; - } - Err(err) => { - error!("update failed: {}", err); - panic!("database could not be upgraded"); + match conn.execute_batch(upgrade_sql) { + Ok(()) => { + info!("database schema upgraded v3 -> v4"); + curr_version = 4; + } + Err(err) => { + error!("update failed: {}", err); + panic!("database could not be upgraded"); + } + } } - } - } - if curr_version == 4 { - info!("database schema needs update from 4->5"); - let upgrade_sql = r##" + if curr_version == 4 { + info!("database schema needs update from 4->5"); + let upgrade_sql = r##" DROP TABLE IF EXISTS event_ref; DROP TABLE IF EXISTS pubkey_ref; PRAGMA user_version=5; "##; - match conn.execute_batch(upgrade_sql) { - Ok(()) => { - info!("database schema upgraded v4 -> v5"); - // uncomment if we have a newer version - //curr_version = 5; - } - Err(err) => { - error!("update failed: {}", err); - panic!("database could not be upgraded"); - } - } - } else if curr_version == 5 { - info!("database schema needs update from 5->6"); - // We need to rebuild the tags table. iterate through the - // event table. build event from json, insert tags into a - // fresh tag table. This was needed due to a logic error in - // how hex-like tags got indexed. - let start = Instant::now(); - let tx = conn.transaction()?; - { - // Clear out table - tx.execute("DELETE FROM tag;", [])?; - let mut stmt = tx.prepare("select id, content from event order by id;")?; - let mut tag_rows = stmt.query([])?; - while let Some(row) = tag_rows.next()? { - // we want to capture the event_id that had the tag, the tag name, and the tag hex value. - let event_id: u64 = row.get(0)?; - let event_json: String = row.get(1)?; - let event: Event = serde_json::from_str(&event_json)?; - // look at each event, and each tag, creating new tag entries if appropriate. - for t in event.tags.iter().filter(|x| x.len() > 1) { - let tagname = t.get(0).unwrap(); - let tagnamechar_opt = single_char_tagname(tagname); - if tagnamechar_opt.is_none() { - continue; + match conn.execute_batch(upgrade_sql) { + Ok(()) => { + info!("database schema upgraded v4 -> v5"); + // uncomment if we have a newer version + //curr_version = 5; + } + Err(err) => { + error!("update failed: {}", err); + panic!("database could not be upgraded"); } - // safe because len was > 1 - let tagval = t.get(1).unwrap(); - // insert as BLOB if we can restore it losslessly. - // this means it needs to be even length and lowercase. - if (tagval.len() % 2 == 0) && is_lower_hex(tagval) { - tx.execute( - "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", - params![event_id, tagname, hex::decode(&tagval).ok()], - )?; - } else { - // otherwise, insert as text - tx.execute( - "INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);", - params![event_id, tagname, &tagval], - )?; + } + } + + if curr_version == 5 { + info!("database schema needs update from 5->6"); + // We need to rebuild the tags table. iterate through the + // event table. build event from json, insert tags into a + // fresh tag table. This was needed due to a logic error in + // how hex-like tags got indexed. + let start = Instant::now(); + let tx = conn.transaction()?; + { + // Clear out table + tx.execute("DELETE FROM tag;", [])?; + let mut stmt = tx.prepare("select id, content from event order by id;")?; + let mut tag_rows = stmt.query([])?; + while let Some(row) = tag_rows.next()? { + // we want to capture the event_id that had the tag, the tag name, and the tag hex value. + let event_id: u64 = row.get(0)?; + let event_json: String = row.get(1)?; + let event: Event = serde_json::from_str(&event_json)?; + // look at each event, and each tag, creating new tag entries if appropriate. + for t in event.tags.iter().filter(|x| x.len() > 1) { + let tagname = t.get(0).unwrap(); + let tagnamechar_opt = single_char_tagname(tagname); + if tagnamechar_opt.is_none() { + continue; + } + // safe because len was > 1 + let tagval = t.get(1).unwrap(); + // insert as BLOB if we can restore it losslessly. + // this means it needs to be even length and lowercase. + if (tagval.len() % 2 == 0) && is_lower_hex(tagval) { + tx.execute( + "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", + params![event_id, tagname, hex::decode(&tagval).ok()], + )?; + } else { + // otherwise, insert as text + tx.execute( + "INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);", + params![event_id, tagname, &tagval], + )?; + } + } } + tx.execute("PRAGMA user_version = 6;", [])?; } + tx.commit()?; + info!("database schema upgraded v5 -> v6 in {:?}", start.elapsed()); + // vacuum after large table modification + let start = Instant::now(); + conn.execute("VACUUM;", [])?; + info!("vacuumed DB after tags rebuild in {:?}", start.elapsed()); } - tx.execute("PRAGMA user_version = 6;", [])?; } - tx.commit()?; - info!("database schema upgraded v5 -> v6 in {:?}", start.elapsed()); - // vacuum after large table modification - let start = Instant::now(); - conn.execute("VACUUM;", [])?; - info!("vacuumed DB after tags rebuild in {:?}", start.elapsed()); - } else if curr_version == 6 { - debug!("Database version was already current (v6)"); - } else if curr_version > 6 { - panic!( - "Database version is newer than supported by this executable (v{})", - curr_version - ); + // Database is current, all is good + Ordering::Equal => { + debug!("Database version was already current (v{})", DB_VERSION); + } + // Database is newer than what this code understands, abort + Ordering::Greater => { + panic!( + "Database version is newer than supported by this executable (v{} > v{})", + curr_version, DB_VERSION + ); + } } // Setup PRAGMA