nostr-rs-relay

My dev fork of nostr-rs-relay
git clone git://jb55.com/nostr-rs-relay
Log | Files | Refs | README | LICENSE

schema.rs (13589B)


      1 //! Database schema and migrations
      2 use crate::db::PooledConnection;
      3 use crate::error::Result;
      4 use crate::event::{single_char_tagname, Event};
      5 use crate::utils::is_lower_hex;
      6 use const_format::formatcp;
      7 use rusqlite::limits::Limit;
      8 use rusqlite::params;
      9 use rusqlite::Connection;
     10 use std::cmp::Ordering;
     11 use std::time::Instant;
     12 use tracing::{debug, error, info};
     13 
     14 /// Startup DB Pragmas
     15 pub const STARTUP_SQL: &str = r##"
     16 PRAGMA main.synchronous=NORMAL;
     17 PRAGMA foreign_keys = ON;
     18 PRAGMA journal_size_limit=32768;
     19 pragma mmap_size = 536870912; -- 512MB of mmap
     20 "##;
     21 
     22 /// Latest database version
     23 pub const DB_VERSION: usize = 7;
     24 
     25 /// Schema definition
     26 const INIT_SQL: &str = formatcp!(
     27     r##"
     28 -- Database settings
     29 PRAGMA encoding = "UTF-8";
     30 PRAGMA journal_mode=WAL;
     31 PRAGMA main.synchronous=NORMAL;
     32 PRAGMA foreign_keys = ON;
     33 PRAGMA application_id = 1654008667;
     34 PRAGMA user_version = {};
     35 
     36 -- Event Table
     37 CREATE TABLE IF NOT EXISTS event (
     38 id INTEGER PRIMARY KEY,
     39 event_hash BLOB NOT NULL, -- 4-byte hash
     40 first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970)
     41 created_at INTEGER NOT NULL, -- when the event was authored
     42 author BLOB NOT NULL, -- author pubkey
     43 delegated_by BLOB, -- delegator pubkey (NIP-26)
     44 kind INTEGER NOT NULL, -- event kind
     45 hidden INTEGER, -- relevant for queries
     46 content TEXT NOT NULL -- serialized json of event object
     47 );
     48 
     49 -- Event Indexes
     50 CREATE UNIQUE INDEX IF NOT EXISTS event_hash_index ON event(event_hash);
     51 CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
     52 CREATE INDEX IF NOT EXISTS author_index ON event(author);
     53 CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
     54 CREATE INDEX IF NOT EXISTS kind_index ON event(kind);
     55 
     56 -- Tag Table
     57 -- Tag values are stored as either a BLOB (if they come in as a
     58 -- hex-string), or TEXT otherwise.
     59 -- This means that searches need to select the appropriate column.
     60 CREATE TABLE IF NOT EXISTS tag (
     61 id INTEGER PRIMARY KEY,
     62 event_id INTEGER NOT NULL, -- an event ID that contains a tag.
     63 name TEXT, -- the tag name ("p", "e", whatever)
     64 value TEXT, -- the tag value, if not hex.
     65 value_hex BLOB, -- the tag value, if it can be interpreted as a lowercase hex string.
     66 FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
     67 );
     68 CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
     69 CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
     70 
     71 -- NIP-05 User Validation
     72 CREATE TABLE IF NOT EXISTS user_verification (
     73 id INTEGER PRIMARY KEY,
     74 metadata_event INTEGER NOT NULL, -- the metadata event used for this validation.
     75 name TEXT NOT NULL, -- the nip05 field value (user@domain).
     76 verified_at INTEGER, -- timestamp this author/nip05 was most recently verified.
     77 failed_at INTEGER, -- timestamp a verification attempt failed (host down).
     78 failure_count INTEGER DEFAULT 0, -- number of consecutive failures.
     79 FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
     80 );
     81 CREATE INDEX IF NOT EXISTS user_verification_name_index ON user_verification(name);
     82 CREATE INDEX IF NOT EXISTS user_verification_event_index ON user_verification(metadata_event);
     83 "##,
     84     DB_VERSION
     85 );
     86 
     87 /// Determine the current application database schema version.
     88 pub fn curr_db_version(conn: &mut Connection) -> Result<usize> {
     89     let query = "PRAGMA user_version;";
     90     let curr_version = conn.query_row(query, [], |row| row.get(0))?;
     91     Ok(curr_version)
     92 }
     93 
     94 fn mig_init(conn: &mut PooledConnection) -> Result<usize> {
     95     match conn.execute_batch(INIT_SQL) {
     96         Ok(()) => {
     97             info!(
     98                 "database pragma/schema initialized to v{}, and ready",
     99                 DB_VERSION
    100             );
    101         }
    102         Err(err) => {
    103             error!("update failed: {}", err);
    104             panic!("database could not be initialized");
    105         }
    106     }
    107     Ok(DB_VERSION)
    108 }
    109 
    110 /// Upgrade DB to latest version, and execute pragma settings
    111 pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
    112     // check the version.
    113     let mut curr_version = curr_db_version(conn)?;
    114     info!("DB version = {:?}", curr_version);
    115 
    116     debug!(
    117         "SQLite max query parameters: {}",
    118         conn.limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER)
    119     );
    120     debug!(
    121         "SQLite max table/blob/text length: {} MB",
    122         (conn.limit(Limit::SQLITE_LIMIT_LENGTH) as f64 / (1024 * 1024) as f64).floor()
    123     );
    124     debug!(
    125         "SQLite max SQL length: {} MB",
    126         (conn.limit(Limit::SQLITE_LIMIT_SQL_LENGTH) as f64 / (1024 * 1024) as f64).floor()
    127     );
    128 
    129     match curr_version.cmp(&DB_VERSION) {
    130         // Database is new or not current
    131         Ordering::Less => {
    132             // initialize from scratch
    133             if curr_version == 0 {
    134                 curr_version = mig_init(conn)?;
    135             }
    136             // for initialized but out-of-date schemas, proceed to
    137             // upgrade sequentially until we are current.
    138             if curr_version == 1 {
    139                 curr_version = mig_1_to_2(conn)?;
    140             }
    141 
    142             if curr_version == 2 {
    143                 curr_version = mig_2_to_3(conn)?;
    144             }
    145 
    146             if curr_version == 3 {
    147                 curr_version = mig_3_to_4(conn)?;
    148             }
    149 
    150             if curr_version == 4 {
    151                 curr_version = mig_4_to_5(conn)?;
    152             }
    153 
    154             if curr_version == 5 {
    155                 curr_version = mig_5_to_6(conn)?;
    156             }
    157             if curr_version == 6 {
    158                 curr_version = mig_6_to_7(conn)?;
    159             }
    160             if curr_version == DB_VERSION {
    161                 info!(
    162                     "All migration scripts completed successfully.  Welcome to v{}.",
    163                     DB_VERSION
    164                 );
    165             }
    166         }
    167         // Database is current, all is good
    168         Ordering::Equal => {
    169             debug!("Database version was already current (v{})", DB_VERSION);
    170         }
    171         // Database is newer than what this code understands, abort
    172         Ordering::Greater => {
    173             panic!(
    174                 "Database version is newer than supported by this executable (v{} > v{})",
    175                 curr_version, DB_VERSION
    176             );
    177         }
    178     }
    179 
    180     // Setup PRAGMA
    181     conn.execute_batch(STARTUP_SQL)?;
    182     debug!("SQLite PRAGMA startup completed");
    183     Ok(())
    184 }
    185 
    186 //// Migration Scripts
    187 
    188 fn mig_1_to_2(conn: &mut PooledConnection) -> Result<usize> {
    189     // only change is adding a hidden column to events.
    190     let upgrade_sql = r##"
    191 ALTER TABLE event ADD hidden INTEGER;
    192 UPDATE event SET hidden=FALSE;
    193 PRAGMA user_version = 2;
    194 "##;
    195     match conn.execute_batch(upgrade_sql) {
    196         Ok(()) => {
    197             info!("database schema upgraded v1 -> v2");
    198         }
    199         Err(err) => {
    200             error!("update failed: {}", err);
    201             panic!("database could not be upgraded");
    202         }
    203     }
    204     Ok(2)
    205 }
    206 
    207 fn mig_2_to_3(conn: &mut PooledConnection) -> Result<usize> {
    208     // this version lacks the tag column
    209     info!("database schema needs update from 2->3");
    210     let upgrade_sql = r##"
    211 CREATE TABLE IF NOT EXISTS tag (
    212 id INTEGER PRIMARY KEY,
    213 event_id INTEGER NOT NULL, -- an event ID that contains a tag.
    214 name TEXT, -- the tag name ("p", "e", whatever)
    215 value TEXT, -- the tag value, if not hex.
    216 value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
    217 FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
    218 );
    219 PRAGMA user_version = 3;
    220 "##;
    221     // TODO: load existing refs into tag table
    222     match conn.execute_batch(upgrade_sql) {
    223         Ok(()) => {
    224             info!("database schema upgraded v2 -> v3");
    225         }
    226         Err(err) => {
    227             error!("update failed: {}", err);
    228             panic!("database could not be upgraded");
    229         }
    230     }
    231     // iterate over every event/pubkey tag
    232     let tx = conn.transaction()?;
    233     {
    234         let mut stmt = tx.prepare("select event_id, \"e\", lower(hex(referenced_event)) from event_ref union select event_id, \"p\", lower(hex(referenced_pubkey)) from pubkey_ref;")?;
    235         let mut tag_rows = stmt.query([])?;
    236         while let Some(row) = tag_rows.next()? {
    237             // we want to capture the event_id that had the tag, the tag name, and the tag hex value.
    238             let event_id: u64 = row.get(0)?;
    239             let tag_name: String = row.get(1)?;
    240             let tag_value: String = row.get(2)?;
    241             // this will leave behind p/e tags that were non-hex, but they are invalid anyways.
    242             if is_lower_hex(&tag_value) {
    243                 tx.execute(
    244                     "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
    245                     params![event_id, tag_name, hex::decode(&tag_value).ok()],
    246                 )?;
    247             }
    248         }
    249     }
    250     info!("Updated tag values");
    251     tx.commit()?;
    252     Ok(3)
    253 }
    254 
    255 fn mig_3_to_4(conn: &mut PooledConnection) -> Result<usize> {
    256     info!("database schema needs update from 3->4");
    257     let upgrade_sql = r##"
    258 -- incoming metadata events with nip05
    259 CREATE TABLE IF NOT EXISTS user_verification (
    260 id INTEGER PRIMARY KEY,
    261 metadata_event INTEGER NOT NULL, -- the metadata event used for this validation.
    262 name TEXT NOT NULL, -- the nip05 field value (user@domain).
    263 verified_at INTEGER, -- timestamp this author/nip05 was most recently verified.
    264 failed_at INTEGER, -- timestamp a verification attempt failed (host down).
    265 failure_count INTEGER DEFAULT 0, -- number of consecutive failures.
    266 FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
    267 );
    268 CREATE INDEX IF NOT EXISTS user_verification_name_index ON user_verification(name);
    269 CREATE INDEX IF NOT EXISTS user_verification_event_index ON user_verification(metadata_event);
    270 PRAGMA user_version = 4;
    271 "##;
    272     match conn.execute_batch(upgrade_sql) {
    273         Ok(()) => {
    274             info!("database schema upgraded v3 -> v4");
    275         }
    276         Err(err) => {
    277             error!("update failed: {}", err);
    278             panic!("database could not be upgraded");
    279         }
    280     }
    281     Ok(4)
    282 }
    283 
    284 fn mig_4_to_5(conn: &mut PooledConnection) -> Result<usize> {
    285     info!("database schema needs update from 4->5");
    286     let upgrade_sql = r##"
    287 DROP TABLE IF EXISTS event_ref;
    288 DROP TABLE IF EXISTS pubkey_ref;
    289 PRAGMA user_version=5;
    290 "##;
    291     match conn.execute_batch(upgrade_sql) {
    292         Ok(()) => {
    293             info!("database schema upgraded v4 -> v5");
    294         }
    295         Err(err) => {
    296             error!("update failed: {}", err);
    297             panic!("database could not be upgraded");
    298         }
    299     }
    300     Ok(5)
    301 }
    302 
    303 fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
    304     info!("database schema needs update from 5->6");
    305     // We need to rebuild the tags table.  iterate through the
    306     // event table.  build event from json, insert tags into a
    307     // fresh tag table.  This was needed due to a logic error in
    308     // how hex-like tags got indexed.
    309     let start = Instant::now();
    310     let tx = conn.transaction()?;
    311     {
    312         // Clear out table
    313         tx.execute("DELETE FROM tag;", [])?;
    314         let mut stmt = tx.prepare("select id, content from event order by id;")?;
    315         let mut tag_rows = stmt.query([])?;
    316         while let Some(row) = tag_rows.next()? {
    317             // we want to capture the event_id that had the tag, the tag name, and the tag hex value.
    318             let event_id: u64 = row.get(0)?;
    319             let event_json: String = row.get(1)?;
    320             let event: Event = serde_json::from_str(&event_json)?;
    321             // look at each event, and each tag, creating new tag entries if appropriate.
    322             for t in event.tags.iter().filter(|x| x.len() > 1) {
    323                 let tagname = t.get(0).unwrap();
    324                 let tagnamechar_opt = single_char_tagname(tagname);
    325                 if tagnamechar_opt.is_none() {
    326                     continue;
    327                 }
    328                 // safe because len was > 1
    329                 let tagval = t.get(1).unwrap();
    330                 // insert as BLOB if we can restore it losslessly.
    331                 // this means it needs to be even length and lowercase.
    332                 if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
    333                     tx.execute(
    334                         "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
    335                         params![event_id, tagname, hex::decode(tagval).ok()],
    336                     )?;
    337                 } else {
    338                     // otherwise, insert as text
    339                     tx.execute(
    340                         "INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);",
    341                         params![event_id, tagname, &tagval],
    342                     )?;
    343                 }
    344             }
    345         }
    346         tx.execute("PRAGMA user_version = 6;", [])?;
    347     }
    348     tx.commit()?;
    349     info!("database schema upgraded v5 -> v6 in {:?}", start.elapsed());
    350     // vacuum after large table modification
    351     let start = Instant::now();
    352     conn.execute("VACUUM;", [])?;
    353     info!("vacuumed DB after tags rebuild in {:?}", start.elapsed());
    354     Ok(6)
    355 }
    356 
    357 fn mig_6_to_7(conn: &mut PooledConnection) -> Result<usize> {
    358     info!("database schema needs update from 6->7");
    359     // only change is adding a hidden column to events.
    360     let upgrade_sql = r##"
    361 ALTER TABLE event ADD delegated_by BLOB;
    362 CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
    363 PRAGMA user_version = 7;
    364 "##;
    365     match conn.execute_batch(upgrade_sql) {
    366         Ok(()) => {
    367             info!("database schema upgraded v6 -> v7");
    368         }
    369         Err(err) => {
    370             error!("update failed: {}", err);
    371             panic!("database could not be upgraded");
    372         }
    373     }
    374     Ok(7)
    375 }