commit 48b64361e48fea94b3d4ada458c7dad7346e181d
parent 2e48b8b18b7c67030eacec09574ba35ba619c603
Author: William Casarin <jb55@jb55.com>
Date: Tue, 6 Feb 2024 19:22:19 -0800
ndb: add {wait,poll}_for_notes
Add an async function that blocks until new note_ids are received
Also add a function that polls for notes.
Diffstat:
4 files changed, 105 insertions(+), 5 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
@@ -22,6 +22,7 @@ env_logger = "0.10.1"
flatbuffers = "23.5.26"
libc = "0.2.151"
log = "0.4.20"
+tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
[dev-dependencies]
hex = "0.4.3"
diff --git a/src/error.rs b/src/error.rs
@@ -7,6 +7,7 @@ pub enum Error {
DecodeError,
NoteProcessFailed,
TransactionFailed,
+ SubscriptionError,
}
impl fmt::Display for Error {
@@ -17,6 +18,7 @@ impl fmt::Display for Error {
Error::DecodeError => "Decode error",
Error::NoteProcessFailed => "Note process failed",
Error::TransactionFailed => "Transaction failed",
+ Error::SubscriptionError => "Subscription failed",
};
write!(f, "{}", s)
}
diff --git a/src/lib.rs b/src/lib.rs
@@ -11,6 +11,7 @@ mod ndb_profile;
mod block;
mod config;
mod error;
+mod filter;
mod ndb;
mod note;
mod profile;
@@ -20,6 +21,7 @@ mod transaction;
pub use block::{Block, BlockType, Blocks, Mention};
pub use config::Config;
pub use error::Error;
+pub use filter::Filter;
pub use ndb::Ndb;
pub use note::Note;
pub use profile::ProfileRecord;
diff --git a/src/ndb.rs b/src/ndb.rs
@@ -3,10 +3,12 @@ use std::ffi::CString;
use std::ptr;
use crate::bindings;
-use crate::{Blocks, Config, Error, Note, ProfileRecord, Result, Transaction};
+use crate::{Blocks, Config, Error, Filter, Note, ProfileRecord, Result, Transaction};
use std::fs;
+use std::os::raw::c_int;
use std::path::Path;
use std::sync::Arc;
+use tokio::task; // Make sure to import the task module
#[derive(Debug)]
struct NdbRef {
@@ -78,6 +80,64 @@ impl Ndb {
Ok(())
}
+ pub fn subscribe(&self, filter: Filter) -> Result<u64> {
+ unsafe {
+ let res = bindings::ndb_subscribe(self.as_ptr(), filter.as_mut_ptr(), 1);
+ if res == 0 {
+ Err(Error::SubscriptionError)
+ } else {
+ Ok(res)
+ }
+ }
+ }
+
+ pub fn poll_for_notes(&self, sub_id: u64, max_notes: u32) -> Vec<u64> {
+ let mut vec = vec![];
+ vec.reserve_exact(max_notes as usize);
+
+ let res = unsafe {
+ let res = bindings::ndb_poll_for_notes(
+ self.as_ptr(),
+ sub_id,
+ vec.as_mut_ptr(),
+ max_notes as c_int,
+ );
+ vec.set_len(res as usize);
+ };
+
+ vec
+ }
+
+ pub async fn wait_for_notes(&self, sub_id: u64, max_notes: u32) -> Result<Vec<u64>> {
+ let ndb = self.clone();
+ let handle = task::spawn_blocking(move || {
+ let mut vec = vec![];
+ vec.reserve_exact(max_notes as usize);
+ let res = unsafe {
+ bindings::ndb_wait_for_notes(
+ ndb.as_ptr(),
+ sub_id,
+ vec.as_mut_ptr(),
+ max_notes as c_int,
+ )
+ };
+ if res == 0 {
+ Err(Error::SubscriptionError)
+ } else {
+ unsafe {
+ vec.set_len(res as usize);
+ };
+ Ok(vec)
+ }
+ });
+
+ match handle.await {
+ Ok(Ok(res)) => Ok(res),
+ Ok(Err(err)) => Err(err),
+ Err(_) => Err(Error::SubscriptionError),
+ }
+ }
+
pub fn get_profile_by_pubkey<'a>(
&self,
transaction: &'a Transaction,
@@ -177,18 +237,55 @@ mod tests {
#[test]
fn ndb_init_works() {
let db = "target/testdbs/init_works";
+ test_util::cleanup_db(db);
{
let cfg = Config::new();
let _ = Ndb::new(db, &cfg).expect("ok");
}
+ }
- test_util::cleanup_db(db);
+ #[tokio::test]
+ async fn subscribe_event_works() {
+ let db = "target/testdbs/subscribe";
+ test_util::cleanup_db(&db);
+
+ {
+ let ndb = Ndb::new(db, &Config::new()).expect("ndb");
+ let filter = Filter::new().kinds(vec![1]);
+ let sub_id = ndb.subscribe(filter).expect("sub_id");
+ let waiter = ndb.wait_for_notes(sub_id, 1);
+ ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
+ let res = waiter.await.expect("await ok");
+ assert_eq!(res, vec![1]);
+ }
+ }
+
+ #[test]
+ fn poll_note_works() {
+ let db = "target/testdbs/poll";
+ test_util::cleanup_db(&db);
+
+ {
+ let ndb = Ndb::new(db, &Config::new()).expect("ndb");
+ let filter = Filter::new().kinds(vec![1]);
+ let sub_id = ndb.subscribe(filter).expect("sub_id");
+ ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
+ // this is too fast, we should have nothing
+ let res = ndb.poll_for_notes(sub_id, 1);
+ assert_eq!(res, vec![]);
+
+ std::thread::sleep(std::time::Duration::from_millis(100));
+ // now we should have something
+ let res = ndb.poll_for_notes(sub_id, 1);
+ assert_eq!(res, vec![1]);
+ }
}
#[test]
fn process_event_works() {
let db = "target/testdbs/event_works";
+ test_util::cleanup_db(&db);
{
let ndb = Ndb::new(db, &Config::new()).expect("ndb");
@@ -203,9 +300,7 @@ mod tests {
let mut txn = Transaction::new(&ndb).expect("txn");
let id_bytes: [u8; 32] = id.try_into().expect("id bytes");
let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note");
- assert!(note.kind() == 1);
+ assert_eq!(note.kind(), 1);
}
-
- test_util::cleanup_db(&db);
}
}