schema.rs (13589B)
1 //! Database schema and migrations 2 use crate::db::PooledConnection; 3 use crate::error::Result; 4 use crate::event::{single_char_tagname, Event}; 5 use crate::utils::is_lower_hex; 6 use const_format::formatcp; 7 use rusqlite::limits::Limit; 8 use rusqlite::params; 9 use rusqlite::Connection; 10 use std::cmp::Ordering; 11 use std::time::Instant; 12 use tracing::{debug, error, info}; 13 14 /// Startup DB Pragmas 15 pub const STARTUP_SQL: &str = r##" 16 PRAGMA main.synchronous=NORMAL; 17 PRAGMA foreign_keys = ON; 18 PRAGMA journal_size_limit=32768; 19 pragma mmap_size = 536870912; -- 512MB of mmap 20 "##; 21 22 /// Latest database version 23 pub const DB_VERSION: usize = 7; 24 25 /// Schema definition 26 const INIT_SQL: &str = formatcp!( 27 r##" 28 -- Database settings 29 PRAGMA encoding = "UTF-8"; 30 PRAGMA journal_mode=WAL; 31 PRAGMA main.synchronous=NORMAL; 32 PRAGMA foreign_keys = ON; 33 PRAGMA application_id = 1654008667; 34 PRAGMA user_version = {}; 35 36 -- Event Table 37 CREATE TABLE IF NOT EXISTS event ( 38 id INTEGER PRIMARY KEY, 39 event_hash BLOB NOT NULL, -- 4-byte hash 40 first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970) 41 created_at INTEGER NOT NULL, -- when the event was authored 42 author BLOB NOT NULL, -- author pubkey 43 delegated_by BLOB, -- delegator pubkey (NIP-26) 44 kind INTEGER NOT NULL, -- event kind 45 hidden INTEGER, -- relevant for queries 46 content TEXT NOT NULL -- serialized json of event object 47 ); 48 49 -- Event Indexes 50 CREATE UNIQUE INDEX IF NOT EXISTS event_hash_index ON event(event_hash); 51 CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at); 52 CREATE INDEX IF NOT EXISTS author_index ON event(author); 53 CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by); 54 CREATE INDEX IF NOT EXISTS kind_index ON event(kind); 55 56 -- Tag Table 57 -- Tag values are stored as either a BLOB (if they come in as a 58 -- hex-string), or TEXT otherwise. 59 -- This means that searches need to select the appropriate column. 60 CREATE TABLE IF NOT EXISTS tag ( 61 id INTEGER PRIMARY KEY, 62 event_id INTEGER NOT NULL, -- an event ID that contains a tag. 63 name TEXT, -- the tag name ("p", "e", whatever) 64 value TEXT, -- the tag value, if not hex. 65 value_hex BLOB, -- the tag value, if it can be interpreted as a lowercase hex string. 66 FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE 67 ); 68 CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value); 69 CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex); 70 71 -- NIP-05 User Validation 72 CREATE TABLE IF NOT EXISTS user_verification ( 73 id INTEGER PRIMARY KEY, 74 metadata_event INTEGER NOT NULL, -- the metadata event used for this validation. 75 name TEXT NOT NULL, -- the nip05 field value (user@domain). 76 verified_at INTEGER, -- timestamp this author/nip05 was most recently verified. 77 failed_at INTEGER, -- timestamp a verification attempt failed (host down). 78 failure_count INTEGER DEFAULT 0, -- number of consecutive failures. 79 FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE 80 ); 81 CREATE INDEX IF NOT EXISTS user_verification_name_index ON user_verification(name); 82 CREATE INDEX IF NOT EXISTS user_verification_event_index ON user_verification(metadata_event); 83 "##, 84 DB_VERSION 85 ); 86 87 /// Determine the current application database schema version. 88 pub fn curr_db_version(conn: &mut Connection) -> Result<usize> { 89 let query = "PRAGMA user_version;"; 90 let curr_version = conn.query_row(query, [], |row| row.get(0))?; 91 Ok(curr_version) 92 } 93 94 fn mig_init(conn: &mut PooledConnection) -> Result<usize> { 95 match conn.execute_batch(INIT_SQL) { 96 Ok(()) => { 97 info!( 98 "database pragma/schema initialized to v{}, and ready", 99 DB_VERSION 100 ); 101 } 102 Err(err) => { 103 error!("update failed: {}", err); 104 panic!("database could not be initialized"); 105 } 106 } 107 Ok(DB_VERSION) 108 } 109 110 /// Upgrade DB to latest version, and execute pragma settings 111 pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> { 112 // check the version. 113 let mut curr_version = curr_db_version(conn)?; 114 info!("DB version = {:?}", curr_version); 115 116 debug!( 117 "SQLite max query parameters: {}", 118 conn.limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER) 119 ); 120 debug!( 121 "SQLite max table/blob/text length: {} MB", 122 (conn.limit(Limit::SQLITE_LIMIT_LENGTH) as f64 / (1024 * 1024) as f64).floor() 123 ); 124 debug!( 125 "SQLite max SQL length: {} MB", 126 (conn.limit(Limit::SQLITE_LIMIT_SQL_LENGTH) as f64 / (1024 * 1024) as f64).floor() 127 ); 128 129 match curr_version.cmp(&DB_VERSION) { 130 // Database is new or not current 131 Ordering::Less => { 132 // initialize from scratch 133 if curr_version == 0 { 134 curr_version = mig_init(conn)?; 135 } 136 // for initialized but out-of-date schemas, proceed to 137 // upgrade sequentially until we are current. 138 if curr_version == 1 { 139 curr_version = mig_1_to_2(conn)?; 140 } 141 142 if curr_version == 2 { 143 curr_version = mig_2_to_3(conn)?; 144 } 145 146 if curr_version == 3 { 147 curr_version = mig_3_to_4(conn)?; 148 } 149 150 if curr_version == 4 { 151 curr_version = mig_4_to_5(conn)?; 152 } 153 154 if curr_version == 5 { 155 curr_version = mig_5_to_6(conn)?; 156 } 157 if curr_version == 6 { 158 curr_version = mig_6_to_7(conn)?; 159 } 160 if curr_version == DB_VERSION { 161 info!( 162 "All migration scripts completed successfully. Welcome to v{}.", 163 DB_VERSION 164 ); 165 } 166 } 167 // Database is current, all is good 168 Ordering::Equal => { 169 debug!("Database version was already current (v{})", DB_VERSION); 170 } 171 // Database is newer than what this code understands, abort 172 Ordering::Greater => { 173 panic!( 174 "Database version is newer than supported by this executable (v{} > v{})", 175 curr_version, DB_VERSION 176 ); 177 } 178 } 179 180 // Setup PRAGMA 181 conn.execute_batch(STARTUP_SQL)?; 182 debug!("SQLite PRAGMA startup completed"); 183 Ok(()) 184 } 185 186 //// Migration Scripts 187 188 fn mig_1_to_2(conn: &mut PooledConnection) -> Result<usize> { 189 // only change is adding a hidden column to events. 190 let upgrade_sql = r##" 191 ALTER TABLE event ADD hidden INTEGER; 192 UPDATE event SET hidden=FALSE; 193 PRAGMA user_version = 2; 194 "##; 195 match conn.execute_batch(upgrade_sql) { 196 Ok(()) => { 197 info!("database schema upgraded v1 -> v2"); 198 } 199 Err(err) => { 200 error!("update failed: {}", err); 201 panic!("database could not be upgraded"); 202 } 203 } 204 Ok(2) 205 } 206 207 fn mig_2_to_3(conn: &mut PooledConnection) -> Result<usize> { 208 // this version lacks the tag column 209 info!("database schema needs update from 2->3"); 210 let upgrade_sql = r##" 211 CREATE TABLE IF NOT EXISTS tag ( 212 id INTEGER PRIMARY KEY, 213 event_id INTEGER NOT NULL, -- an event ID that contains a tag. 214 name TEXT, -- the tag name ("p", "e", whatever) 215 value TEXT, -- the tag value, if not hex. 216 value_hex BLOB, -- the tag value, if it can be interpreted as a hex string. 217 FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE 218 ); 219 PRAGMA user_version = 3; 220 "##; 221 // TODO: load existing refs into tag table 222 match conn.execute_batch(upgrade_sql) { 223 Ok(()) => { 224 info!("database schema upgraded v2 -> v3"); 225 } 226 Err(err) => { 227 error!("update failed: {}", err); 228 panic!("database could not be upgraded"); 229 } 230 } 231 // iterate over every event/pubkey tag 232 let tx = conn.transaction()?; 233 { 234 let mut stmt = tx.prepare("select event_id, \"e\", lower(hex(referenced_event)) from event_ref union select event_id, \"p\", lower(hex(referenced_pubkey)) from pubkey_ref;")?; 235 let mut tag_rows = stmt.query([])?; 236 while let Some(row) = tag_rows.next()? { 237 // we want to capture the event_id that had the tag, the tag name, and the tag hex value. 238 let event_id: u64 = row.get(0)?; 239 let tag_name: String = row.get(1)?; 240 let tag_value: String = row.get(2)?; 241 // this will leave behind p/e tags that were non-hex, but they are invalid anyways. 242 if is_lower_hex(&tag_value) { 243 tx.execute( 244 "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", 245 params![event_id, tag_name, hex::decode(&tag_value).ok()], 246 )?; 247 } 248 } 249 } 250 info!("Updated tag values"); 251 tx.commit()?; 252 Ok(3) 253 } 254 255 fn mig_3_to_4(conn: &mut PooledConnection) -> Result<usize> { 256 info!("database schema needs update from 3->4"); 257 let upgrade_sql = r##" 258 -- incoming metadata events with nip05 259 CREATE TABLE IF NOT EXISTS user_verification ( 260 id INTEGER PRIMARY KEY, 261 metadata_event INTEGER NOT NULL, -- the metadata event used for this validation. 262 name TEXT NOT NULL, -- the nip05 field value (user@domain). 263 verified_at INTEGER, -- timestamp this author/nip05 was most recently verified. 264 failed_at INTEGER, -- timestamp a verification attempt failed (host down). 265 failure_count INTEGER DEFAULT 0, -- number of consecutive failures. 266 FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE 267 ); 268 CREATE INDEX IF NOT EXISTS user_verification_name_index ON user_verification(name); 269 CREATE INDEX IF NOT EXISTS user_verification_event_index ON user_verification(metadata_event); 270 PRAGMA user_version = 4; 271 "##; 272 match conn.execute_batch(upgrade_sql) { 273 Ok(()) => { 274 info!("database schema upgraded v3 -> v4"); 275 } 276 Err(err) => { 277 error!("update failed: {}", err); 278 panic!("database could not be upgraded"); 279 } 280 } 281 Ok(4) 282 } 283 284 fn mig_4_to_5(conn: &mut PooledConnection) -> Result<usize> { 285 info!("database schema needs update from 4->5"); 286 let upgrade_sql = r##" 287 DROP TABLE IF EXISTS event_ref; 288 DROP TABLE IF EXISTS pubkey_ref; 289 PRAGMA user_version=5; 290 "##; 291 match conn.execute_batch(upgrade_sql) { 292 Ok(()) => { 293 info!("database schema upgraded v4 -> v5"); 294 } 295 Err(err) => { 296 error!("update failed: {}", err); 297 panic!("database could not be upgraded"); 298 } 299 } 300 Ok(5) 301 } 302 303 fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> { 304 info!("database schema needs update from 5->6"); 305 // We need to rebuild the tags table. iterate through the 306 // event table. build event from json, insert tags into a 307 // fresh tag table. This was needed due to a logic error in 308 // how hex-like tags got indexed. 309 let start = Instant::now(); 310 let tx = conn.transaction()?; 311 { 312 // Clear out table 313 tx.execute("DELETE FROM tag;", [])?; 314 let mut stmt = tx.prepare("select id, content from event order by id;")?; 315 let mut tag_rows = stmt.query([])?; 316 while let Some(row) = tag_rows.next()? { 317 // we want to capture the event_id that had the tag, the tag name, and the tag hex value. 318 let event_id: u64 = row.get(0)?; 319 let event_json: String = row.get(1)?; 320 let event: Event = serde_json::from_str(&event_json)?; 321 // look at each event, and each tag, creating new tag entries if appropriate. 322 for t in event.tags.iter().filter(|x| x.len() > 1) { 323 let tagname = t.get(0).unwrap(); 324 let tagnamechar_opt = single_char_tagname(tagname); 325 if tagnamechar_opt.is_none() { 326 continue; 327 } 328 // safe because len was > 1 329 let tagval = t.get(1).unwrap(); 330 // insert as BLOB if we can restore it losslessly. 331 // this means it needs to be even length and lowercase. 332 if (tagval.len() % 2 == 0) && is_lower_hex(tagval) { 333 tx.execute( 334 "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", 335 params![event_id, tagname, hex::decode(tagval).ok()], 336 )?; 337 } else { 338 // otherwise, insert as text 339 tx.execute( 340 "INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);", 341 params![event_id, tagname, &tagval], 342 )?; 343 } 344 } 345 } 346 tx.execute("PRAGMA user_version = 6;", [])?; 347 } 348 tx.commit()?; 349 info!("database schema upgraded v5 -> v6 in {:?}", start.elapsed()); 350 // vacuum after large table modification 351 let start = Instant::now(); 352 conn.execute("VACUUM;", [])?; 353 info!("vacuumed DB after tags rebuild in {:?}", start.elapsed()); 354 Ok(6) 355 } 356 357 fn mig_6_to_7(conn: &mut PooledConnection) -> Result<usize> { 358 info!("database schema needs update from 6->7"); 359 // only change is adding a hidden column to events. 360 let upgrade_sql = r##" 361 ALTER TABLE event ADD delegated_by BLOB; 362 CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by); 363 PRAGMA user_version = 7; 364 "##; 365 match conn.execute_batch(upgrade_sql) { 366 Ok(()) => { 367 info!("database schema upgraded v6 -> v7"); 368 } 369 Err(err) => { 370 error!("update failed: {}", err); 371 panic!("database could not be upgraded"); 372 } 373 } 374 Ok(7) 375 }