db.rs (26881B)
1 //! Event persistence and querying 2 //use crate::config::SETTINGS; 3 use crate::config::Settings; 4 use crate::error::{Error, Result}; 5 use crate::event::{single_char_tagname, Event}; 6 use crate::hexrange::hex_range; 7 use crate::hexrange::HexSearch; 8 use crate::nip05; 9 use crate::schema::{upgrade_db, STARTUP_SQL}; 10 use crate::subscription::ReqFilter; 11 use crate::subscription::Subscription; 12 use crate::utils::{is_hex, is_lower_hex}; 13 use governor::clock::Clock; 14 use governor::{Quota, RateLimiter}; 15 use hex; 16 use r2d2; 17 use r2d2_sqlite::SqliteConnectionManager; 18 use rusqlite::params; 19 use rusqlite::types::ToSql; 20 use rusqlite::OpenFlags; 21 use std::fmt::Write as _; 22 use std::path::Path; 23 use std::thread; 24 use std::time::Duration; 25 use std::time::Instant; 26 use tokio::task; 27 use tracing::{debug, info, trace, warn}; 28 29 pub type SqlitePool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>; 30 pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>; 31 32 /// Events submitted from a client, with a return channel for notices 33 pub struct SubmittedEvent { 34 pub event: Event, 35 pub notice_tx: tokio::sync::mpsc::Sender<String>, 36 } 37 38 /// Database file 39 pub const DB_FILE: &str = "nostr.db"; 40 41 /// Build a database connection pool. 42 /// # Panics 43 /// 44 /// Will panic if the pool could not be created. 45 #[must_use] 46 pub fn build_pool( 47 name: &str, 48 settings: &Settings, 49 flags: OpenFlags, 50 min_size: u32, 51 max_size: u32, 52 wait_for_db: bool, 53 ) -> SqlitePool { 54 let db_dir = &settings.database.data_directory; 55 let full_path = Path::new(db_dir).join(DB_FILE); 56 // small hack; if the database doesn't exist yet, that means the 57 // writer thread hasn't finished. Give it a chance to work. This 58 // is only an issue with the first time we run. 59 if !settings.database.in_memory { 60 while !full_path.exists() && wait_for_db { 61 debug!("Database reader pool is waiting on the database to be created..."); 62 thread::sleep(Duration::from_millis(500)); 63 } 64 } 65 let manager = if settings.database.in_memory { 66 SqliteConnectionManager::memory() 67 .with_flags(flags) 68 .with_init(|c| c.execute_batch(STARTUP_SQL)) 69 } else { 70 SqliteConnectionManager::file(&full_path) 71 .with_flags(flags) 72 .with_init(|c| c.execute_batch(STARTUP_SQL)) 73 }; 74 let pool: SqlitePool = r2d2::Pool::builder() 75 .test_on_check_out(true) // no noticeable performance hit 76 .min_idle(Some(min_size)) 77 .max_size(max_size) 78 .build(manager) 79 .unwrap(); 80 info!( 81 "Built a connection pool {:?} (min={}, max={})", 82 name, min_size, max_size 83 ); 84 pool 85 } 86 87 /// Spawn a database writer that persists events to the SQLite store. 88 pub async fn db_writer( 89 settings: Settings, 90 mut event_rx: tokio::sync::mpsc::Receiver<SubmittedEvent>, 91 bcast_tx: tokio::sync::broadcast::Sender<Event>, 92 metadata_tx: tokio::sync::broadcast::Sender<Event>, 93 mut shutdown: tokio::sync::broadcast::Receiver<()>, 94 ) -> tokio::task::JoinHandle<Result<()>> { 95 // are we performing NIP-05 checking? 96 let nip05_active = settings.verified_users.is_active(); 97 // are we requriing NIP-05 user verification? 98 let nip05_enabled = settings.verified_users.is_enabled(); 99 100 task::spawn_blocking(move || { 101 let db_dir = &settings.database.data_directory; 102 let full_path = Path::new(db_dir).join(DB_FILE); 103 // create a connection pool 104 let pool = build_pool( 105 "event writer", 106 &settings, 107 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, 108 1, 109 4, 110 false, 111 ); 112 if settings.database.in_memory { 113 info!("using in-memory database, this will not persist a restart!"); 114 } else { 115 info!("opened database {:?} for writing", full_path); 116 } 117 upgrade_db(&mut pool.get()?)?; 118 119 // Make a copy of the whitelist 120 let whitelist = &settings.authorization.pubkey_whitelist.clone(); 121 122 // get rate limit settings 123 let rps_setting = settings.limits.messages_per_sec; 124 let mut most_recent_rate_limit = Instant::now(); 125 let mut lim_opt = None; 126 let clock = governor::clock::QuantaClock::default(); 127 if let Some(rps) = rps_setting { 128 if rps > 0 { 129 info!("Enabling rate limits for event creation ({}/sec)", rps); 130 let quota = core::num::NonZeroU32::new(rps * 60).unwrap(); 131 lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota))); 132 } 133 } 134 loop { 135 if shutdown.try_recv().is_ok() { 136 info!("shutting down database writer"); 137 break; 138 } 139 // call blocking read on channel 140 let next_event = event_rx.blocking_recv(); 141 // if the channel has closed, we will never get work 142 if next_event.is_none() { 143 break; 144 } 145 // track if an event write occurred; this is used to 146 // update the rate limiter 147 let mut event_write = false; 148 let subm_event = next_event.unwrap(); 149 let event = subm_event.event; 150 let notice_tx = subm_event.notice_tx; 151 // check if this event is authorized. 152 if let Some(allowed_addrs) = whitelist { 153 // TODO: incorporate delegated pubkeys 154 // if the event address is not in allowed_addrs. 155 if !allowed_addrs.contains(&event.pubkey) { 156 info!( 157 "Rejecting event {}, unauthorized author", 158 event.get_event_id_prefix() 159 ); 160 notice_tx 161 .try_send("pubkey is not allowed to publish to this relay".to_owned()) 162 .ok(); 163 continue; 164 } 165 } 166 167 // send any metadata events to the NIP-05 verifier 168 if nip05_active && event.is_kind_metadata() { 169 // we are sending this prior to even deciding if we 170 // persist it. this allows the nip05 module to 171 // inspect it, update if necessary, or persist a new 172 // event and broadcast it itself. 173 metadata_tx.send(event.clone()).ok(); 174 } 175 176 // check for NIP-05 verification 177 if nip05_enabled { 178 match nip05::query_latest_user_verification(pool.get()?, event.pubkey.to_owned()) { 179 Ok(uv) => { 180 if uv.is_valid(&settings.verified_users) { 181 info!( 182 "new event from verified author ({:?},{:?})", 183 uv.name.to_string(), 184 event.get_author_prefix() 185 ); 186 } else { 187 info!("rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)", 188 uv.name.to_string(), 189 event.get_author_prefix() 190 ); 191 notice_tx 192 .try_send( 193 "NIP-05 verification is no longer valid (expired/wrong domain)" 194 .to_owned(), 195 ) 196 .ok(); 197 continue; 198 } 199 } 200 Err(Error::SqlError(rusqlite::Error::QueryReturnedNoRows)) => { 201 debug!( 202 "no verification records found for pubkey: {:?}", 203 event.get_author_prefix() 204 ); 205 notice_tx 206 .try_send("NIP-05 verification needed to publish events".to_owned()) 207 .ok(); 208 continue; 209 } 210 Err(e) => { 211 warn!("checking nip05 verification status failed: {:?}", e); 212 continue; 213 } 214 } 215 } 216 // TODO: cache recent list of authors to remove a DB call. 217 let start = Instant::now(); 218 if event.kind >= 20000 && event.kind < 30000 { 219 bcast_tx.send(event.clone()).ok(); 220 info!( 221 "published ephemeral event: {:?} from: {:?} in: {:?}", 222 event.get_event_id_prefix(), 223 event.get_author_prefix(), 224 start.elapsed() 225 ); 226 event_write = true 227 } else { 228 match write_event(&mut pool.get()?, &event) { 229 Ok(updated) => { 230 if updated == 0 { 231 trace!("ignoring duplicate or deleted event"); 232 } else { 233 info!( 234 "persisted event: {:?} from: {:?} in: {:?}", 235 event.get_event_id_prefix(), 236 event.get_author_prefix(), 237 start.elapsed() 238 ); 239 event_write = true; 240 // send this out to all clients 241 bcast_tx.send(event.clone()).ok(); 242 } 243 } 244 Err(err) => { 245 warn!("event insert failed: {:?}", err); 246 notice_tx 247 .try_send( 248 "relay experienced an error trying to publish the latest event" 249 .to_owned(), 250 ) 251 .ok(); 252 } 253 } 254 } 255 256 // use rate limit, if defined, and if an event was actually written. 257 if event_write { 258 if let Some(ref lim) = lim_opt { 259 if let Err(n) = lim.check() { 260 let wait_for = n.wait_time_from(clock.now()); 261 // check if we have recently logged rate 262 // limits, but print out a message only once 263 // per second. 264 if most_recent_rate_limit.elapsed().as_secs() > 10 { 265 warn!( 266 "rate limit reached for event creation (sleep for {:?}) (suppressing future messages for 10 seconds)", 267 wait_for 268 ); 269 // reset last rate limit message 270 most_recent_rate_limit = Instant::now(); 271 } 272 // block event writes, allowing them to queue up 273 thread::sleep(wait_for); 274 continue; 275 } 276 } 277 } 278 } 279 info!("database connection closed"); 280 Ok(()) 281 }) 282 } 283 284 /// Persist an event to the database, returning rows added. 285 pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> { 286 // start transaction 287 let tx = conn.transaction()?; 288 // get relevant fields from event and convert to blobs. 289 let id_blob = hex::decode(&e.id).ok(); 290 let pubkey_blob: Option<Vec<u8>> = hex::decode(&e.pubkey).ok(); 291 let delegator_blob: Option<Vec<u8>> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok()); 292 let event_str = serde_json::to_string(&e).ok(); 293 // ignore if the event hash is a duplicate. 294 let mut ins_count = tx.execute( 295 "INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);", 296 params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str] 297 )?; 298 if ins_count == 0 { 299 // if the event was a duplicate, no need to insert event or 300 // pubkey references. This will abort the txn. 301 return Ok(ins_count); 302 } 303 // remember primary key of the event most recently inserted. 304 let ev_id = tx.last_insert_rowid(); 305 // add all tags to the tag table 306 for tag in e.tags.iter() { 307 // ensure we have 2 values. 308 if tag.len() >= 2 { 309 let tagname = &tag[0]; 310 let tagval = &tag[1]; 311 // only single-char tags are searchable 312 let tagchar_opt = single_char_tagname(tagname); 313 match &tagchar_opt { 314 Some(_) => { 315 // if tagvalue is lowercase hex; 316 if is_lower_hex(tagval) && (tagval.len() % 2 == 0) { 317 tx.execute( 318 "INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)", 319 params![ev_id, &tagname, hex::decode(tagval).ok()], 320 )?; 321 } else { 322 tx.execute( 323 "INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)", 324 params![ev_id, &tagname, &tagval], 325 )?; 326 } 327 } 328 None => {} 329 } 330 } 331 } 332 // if this event is replaceable update, hide every other replaceable 333 // event with the same kind from the same author that was issued 334 // earlier than this. 335 if e.kind == 0 || e.kind == 3 || (e.kind >= 10000 && e.kind < 20000) { 336 let update_count = tx.execute( 337 "UPDATE event SET hidden=TRUE WHERE id!=? AND kind=? AND author=? AND created_at <= ? and hidden!=TRUE", 338 params![ev_id, e.kind, hex::decode(&e.pubkey).ok(), e.created_at], 339 )?; 340 if update_count > 0 { 341 info!( 342 "hid {} older replaceable kind {} events for author: {:?}", 343 update_count, 344 e.kind, 345 e.get_author_prefix() 346 ); 347 } 348 } 349 // if this event is a deletion, hide the referenced events from the same author. 350 if e.kind == 5 { 351 let event_candidates = e.tag_values_by_name("e"); 352 // first parameter will be author 353 let mut params: Vec<Box<dyn ToSql>> = vec![Box::new(hex::decode(&e.pubkey)?)]; 354 event_candidates 355 .iter() 356 .filter(|x| is_hex(x) && x.len() == 64) 357 .filter_map(|x| hex::decode(x).ok()) 358 .for_each(|x| params.push(Box::new(x))); 359 let query = format!( 360 "UPDATE event SET hidden=TRUE WHERE kind!=5 AND author=? AND event_hash IN ({})", 361 repeat_vars(params.len() - 1) 362 ); 363 let mut stmt = tx.prepare(&query)?; 364 let update_count = stmt.execute(rusqlite::params_from_iter(params))?; 365 info!( 366 "hid {} deleted events for author {:?}", 367 update_count, 368 e.get_author_prefix() 369 ); 370 } else { 371 // check if a deletion has already been recorded for this event. 372 // Only relevant for non-deletion events 373 let del_count = tx.query_row( 374 "SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1;", 375 params![pubkey_blob, id_blob], |row| row.get::<usize, usize>(0)); 376 // check if a the query returned a result, meaning we should 377 // hid the current event 378 if del_count.ok().is_some() { 379 // a deletion already existed, mark original event as hidden. 380 info!( 381 "hid event: {:?} due to existing deletion by author: {:?}", 382 e.get_event_id_prefix(), 383 e.get_author_prefix() 384 ); 385 let _update_count = 386 tx.execute("UPDATE event SET hidden=TRUE WHERE id=?", params![ev_id])?; 387 // event was deleted, so let caller know nothing new 388 // arrived, preventing this from being sent to active 389 // subscriptions 390 ins_count = 0; 391 } 392 } 393 tx.commit()?; 394 Ok(ins_count) 395 } 396 397 /// Serialized event associated with a specific subscription request. 398 #[derive(PartialEq, Eq, Debug, Clone)] 399 pub struct QueryResult { 400 /// Subscription identifier 401 pub sub_id: String, 402 /// Serialized event 403 pub event: String, 404 } 405 406 /// Produce a arbitrary list of '?' parameters. 407 fn repeat_vars(count: usize) -> String { 408 if count == 0 { 409 return "".to_owned(); 410 } 411 let mut s = "?,".repeat(count); 412 // Remove trailing comma 413 s.pop(); 414 s 415 } 416 417 /// Create a dynamic SQL subquery and params from a subscription filter. 418 fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) { 419 // build a dynamic SQL query. all user-input is either an integer 420 // (sqli-safe), or a string that is filtered to only contain 421 // hexadecimal characters. Strings that require escaping (tag 422 // names/values) use parameters. 423 424 // if the filter is malformed, don't return anything. 425 if f.force_no_match { 426 let empty_query = 427 "SELECT DISTINCT(e.content), e.created_at FROM event e WHERE 1=0".to_owned(); 428 // query parameters for SQLite 429 let empty_params: Vec<Box<dyn ToSql>> = vec![]; 430 return (empty_query, empty_params); 431 } 432 433 let mut query = "SELECT DISTINCT(e.content), e.created_at FROM event e ".to_owned(); 434 // query parameters for SQLite 435 let mut params: Vec<Box<dyn ToSql>> = vec![]; 436 437 // individual filter components (single conditions such as an author or event ID) 438 let mut filter_components: Vec<String> = Vec::new(); 439 // Query for "authors", allowing prefix matches 440 if let Some(authvec) = &f.authors { 441 // take each author and convert to a hexsearch 442 let mut auth_searches: Vec<String> = vec![]; 443 for auth in authvec { 444 match hex_range(auth) { 445 Some(HexSearch::Exact(ex)) => { 446 auth_searches.push("author=? OR delegated_by=?".to_owned()); 447 params.push(Box::new(ex.clone())); 448 params.push(Box::new(ex)); 449 } 450 Some(HexSearch::Range(lower, upper)) => { 451 auth_searches.push( 452 "(author>? AND author<?) OR (delegated_by>? AND delegated_by<?)".to_owned(), 453 ); 454 params.push(Box::new(lower.clone())); 455 params.push(Box::new(upper.clone())); 456 params.push(Box::new(lower)); 457 params.push(Box::new(upper)); 458 } 459 Some(HexSearch::LowerOnly(lower)) => { 460 auth_searches.push("author>? OR delegated_by>?".to_owned()); 461 params.push(Box::new(lower.clone())); 462 params.push(Box::new(lower)); 463 } 464 None => { 465 info!("Could not parse hex range from author {:?}", auth); 466 } 467 } 468 } 469 let authors_clause = format!("({})", auth_searches.join(" OR ")); 470 filter_components.push(authors_clause); 471 } 472 // Query for Kind 473 if let Some(ks) = &f.kinds { 474 // kind is number, no escaping needed 475 let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect(); 476 let kind_clause = format!("kind IN ({})", str_kinds.join(", ")); 477 filter_components.push(kind_clause); 478 } 479 // Query for event, allowing prefix matches 480 if let Some(idvec) = &f.ids { 481 // take each author and convert to a hexsearch 482 let mut id_searches: Vec<String> = vec![]; 483 for id in idvec { 484 match hex_range(id) { 485 Some(HexSearch::Exact(ex)) => { 486 id_searches.push("event_hash=?".to_owned()); 487 params.push(Box::new(ex)); 488 } 489 Some(HexSearch::Range(lower, upper)) => { 490 id_searches.push("(event_hash>? AND event_hash<?)".to_owned()); 491 params.push(Box::new(lower)); 492 params.push(Box::new(upper)); 493 } 494 Some(HexSearch::LowerOnly(lower)) => { 495 id_searches.push("event_hash>?".to_owned()); 496 params.push(Box::new(lower)); 497 } 498 None => { 499 info!("Could not parse hex range from id {:?}", id); 500 } 501 } 502 } 503 let id_clause = format!("({})", id_searches.join(" OR ")); 504 filter_components.push(id_clause); 505 } 506 // Query for tags 507 if let Some(map) = &f.tags { 508 for (key, val) in map.iter() { 509 let mut str_vals: Vec<Box<dyn ToSql>> = vec![]; 510 let mut blob_vals: Vec<Box<dyn ToSql>> = vec![]; 511 for v in val { 512 if (v.len() % 2 == 0) && is_lower_hex(v) { 513 if let Ok(h) = hex::decode(v) { 514 blob_vals.push(Box::new(h)); 515 } 516 } else { 517 str_vals.push(Box::new(v.to_owned())); 518 } 519 } 520 // create clauses with "?" params for each tag value being searched 521 let str_clause = format!("value IN ({})", repeat_vars(str_vals.len())); 522 let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len())); 523 // find evidence of the target tag name/value existing for this event. 524 let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause); 525 // add the tag name as the first parameter 526 params.push(Box::new(key.to_string())); 527 // add all tag values that are plain strings as params 528 params.append(&mut str_vals); 529 // add all tag values that are blobs as params 530 params.append(&mut blob_vals); 531 filter_components.push(tag_clause); 532 } 533 } 534 // Query for timestamp 535 if f.since.is_some() { 536 let created_clause = format!("created_at > {}", f.since.unwrap()); 537 filter_components.push(created_clause); 538 } 539 // Query for timestamp 540 if f.until.is_some() { 541 let until_clause = format!("created_at < {}", f.until.unwrap()); 542 filter_components.push(until_clause); 543 } 544 // never display hidden events 545 query.push_str(" WHERE hidden!=TRUE"); 546 // build filter component conditions 547 if !filter_components.is_empty() { 548 query.push_str(" AND "); 549 query.push_str(&filter_components.join(" AND ")); 550 } 551 // Apply per-filter limit to this subquery. 552 // The use of a LIMIT implies a DESC order, to capture only the most recent events. 553 if let Some(lim) = f.limit { 554 let _ = write!(query, " ORDER BY e.created_at DESC LIMIT {}", lim); 555 } else { 556 query.push_str(" ORDER BY e.created_at ASC") 557 } 558 (query, params) 559 } 560 561 /// Create a dynamic SQL query string and params from a subscription. 562 fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) { 563 // build a dynamic SQL query for an entire subscription, based on 564 // SQL subqueries for filters. 565 let mut subqueries: Vec<String> = Vec::new(); 566 // subquery params 567 let mut params: Vec<Box<dyn ToSql>> = vec![]; 568 // for every filter in the subscription, generate a subquery 569 for f in sub.filters.iter() { 570 let (f_subquery, mut f_params) = query_from_filter(f); 571 subqueries.push(f_subquery); 572 params.append(&mut f_params); 573 } 574 // encapsulate subqueries into select statements 575 let subqueries_selects: Vec<String> = subqueries 576 .iter() 577 .map(|s| format!("SELECT content, created_at FROM ({})", s)) 578 .collect(); 579 let query: String = subqueries_selects.join(" UNION "); 580 trace!("final query string: {}", query); 581 (query, params) 582 } 583 584 /// Perform a database query using a subscription. 585 /// 586 /// The [`Subscription`] is converted into a SQL query. Each result 587 /// is published on the `query_tx` channel as it is returned. If a 588 /// message becomes available on the `abandon_query_rx` channel, the 589 /// query is immediately aborted. 590 pub async fn db_query( 591 sub: Subscription, 592 client_id: String, 593 pool: SqlitePool, 594 query_tx: tokio::sync::mpsc::Sender<QueryResult>, 595 mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>, 596 ) { 597 task::spawn_blocking(move || { 598 trace!("going to query for: {:?}", sub); 599 let mut row_count: usize = 0; 600 let start = Instant::now(); 601 // generate SQL query 602 let (q, p) = query_from_sub(&sub); 603 trace!("SQL generated in {:?}", start.elapsed()); 604 // show pool stats 605 debug!("DB pool stats: {:?}", pool.state()); 606 let start = Instant::now(); 607 if let Ok(conn) = pool.get() { 608 // execute the query. Don't cache, since queries vary so much. 609 let mut stmt = conn.prepare(&q)?; 610 let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; 611 let mut first_result = true; 612 while let Some(row) = event_rows.next()? { 613 if first_result { 614 debug!( 615 "time to first result: {:?} (cid={}, sub={:?})", 616 start.elapsed(), 617 client_id, 618 sub.id 619 ); 620 first_result = false; 621 } 622 // check if this is still active 623 // TODO: check every N rows 624 if abandon_query_rx.try_recv().is_ok() { 625 debug!("query aborted (sub={:?})", sub.id); 626 return Ok(()); 627 } 628 row_count += 1; 629 let event_json = row.get(0)?; 630 query_tx 631 .blocking_send(QueryResult { 632 sub_id: sub.get_id(), 633 event: event_json, 634 }) 635 .ok(); 636 } 637 query_tx 638 .blocking_send(QueryResult { 639 sub_id: sub.get_id(), 640 event: "EOSE".to_string(), 641 }) 642 .ok(); 643 debug!( 644 "query completed ({} rows) in {:?} (cid={}, sub={:?})", 645 row_count, 646 start.elapsed(), 647 client_id, 648 sub.id 649 ); 650 } else { 651 warn!("Could not get a database connection for querying"); 652 } 653 let ok: Result<()> = Ok(()); 654 ok 655 }); 656 }