diff --git a/Cargo.lock b/Cargo.lock index 0006b1c..9118d1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -170,6 +170,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "build_const" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ae4235e6dac0694637c763029ecea1a2ec9e4e06ec2729bd21ba4d9c863eb7" + [[package]] name = "bumpalo" version = "3.7.0" @@ -251,6 +257,7 @@ dependencies = [ "opentelemetry 0.16.0", "opentelemetry-jaeger", "parking_lot", + "persy", "pretty_env_logger", "rand 0.8.4", "regex", @@ -331,6 +338,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" +dependencies = [ + "build_const", +] + [[package]] name = "crc32fast" version = "1.2.1" @@ -1624,6 +1640,31 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "persy" +version = "0.11.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cc96c7b02a7abb1df713f326c4deb493ce2b72155935d3e6ed0ba24bd9e778d" +dependencies = [ + "byteorder", + "crc", + "data-encoding", + "fs2", + "linked-hash-map", + "rand 0.7.3", + "unsigned-varint", + "zigzag", +] + +[[package]] +name = "pest" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53" +dependencies = [ + "ucd-trie", +] + [[package]] name = "pin-project" version = "1.0.8" @@ -3225,6 +3266,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f14ee04d9415b52b3aeab06258a3f07093182b88ba0f9b8d203f211a7a7d41c7" +[[package]] +name = "unsigned-varint" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7fdeedbf205afadfe39ae559b75c3240f24e257d0ca27e85f85cb82aa19ac35" + [[package]] name = "untrusted" version = "0.7.1" @@ -3468,6 +3515,15 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zigzag" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf" +dependencies = [ + "num-traits", +] + [[package]] name = "zstd" version = "0.5.4+zstd.1.4.7" diff --git a/Cargo.toml b/Cargo.toml index 0290957..645e462 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,9 @@ tokio = "1.8.2" sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } +# Used for storing data permanently +persy = { version = "0.11", optional = true } + # Used for the http request / response body type for Ruma endpoints used with reqwest bytes = "1.0.1" # Used for rocket<->ruma conversions @@ -84,6 +87,7 @@ heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c7 [features] default = ["conduit_bin", "backend_sqlite"] backend_sled = ["sled"] +backend_persy = ["persy"] backend_sqlite = ["sqlite"] backend_heed = ["heed", "crossbeam"] sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"] diff --git a/src/database.rs b/src/database.rs index 8fd745b..6f535d2 100644 --- a/src/database.rs +++ b/src/database.rs @@ -132,6 +132,9 @@ pub type Engine = abstraction::sqlite::Engine; #[cfg(feature = "heed")] pub type Engine = abstraction::heed::Engine; +#[cfg(feature = "persy")] +pub type Engine = abstraction::PersyEngine; + pub struct Database { _db: Arc, pub globals: globals::Globals, diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 5b941fb..56557e1 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -3,6 +3,9 @@ use crate::Result; use std::{future::Future, pin::Pin, sync::Arc}; +#[cfg(any(feature = "persy"))] +use std::{collections::BTreeMap, sync::RwLock}; + #[cfg(feature = "sled")] pub mod sled; @@ -52,3 +55,185 @@ pub trait Tree: Send + Sync { Ok(()) } } + +// This is a functional integration at the state of the art of the current +// implementations, it should work and provide the base for stability and performance +// testing, Persy should be pretty resilient to crash and pretty lightweight in memory usage +// the speed in single thread will be pretty low because each transaction commit will wait for data +// to be flushed on disk, multi-thread should guarantee better performances even though I expect +// speed of a few thousand transactions per second. +// +// The current design of the engine right now do not allow to do transactions with multiple keys +// that would allow to reduce the latency quite a lot, anyway support transaction in the engine +// require a massive refactor. + +#[cfg(feature = "persy")] +pub struct PersyEngine(persy::Persy); + +#[cfg(feature = "persy")] +impl DatabaseEngine for PersyEngine { + fn open(config: &Config) -> Result> { + let cfg = persy::Config::new(); + // This is for tweak the in memory cache size + //config.change_cache_size(32 * 1024 * 1024 /*32Mb*/) + + let persy = persy::OpenOptions::new() + .create(true) + .config(cfg) + .open(&config.database_path)?; + Ok(Arc::new(PersyEngine(persy))) + } + + fn open_tree(self: &Arc, name: &'static str) -> Result> { + // Create if it doesn't exist + if !self.0.exists_index(name)? { + let mut tx = self.0.begin()?; + tx.create_index::(name, persy::ValueMode::REPLACE)?; + tx.prepare()?.commit()?; + } + + Ok(Arc::new(PersyTree { + db: self.0.clone(), + name: name.to_owned(), + watchers: RwLock::new(BTreeMap::new()), + })) + } +} + +#[cfg(feature = "persy")] +pub struct PersyTree { + db: persy::Persy, + name: String, + watchers: RwLock, Vec>>>, +} + +#[cfg(feature = "persy")] +impl Tree for PersyTree { + fn get(&self, key: &[u8]) -> Result>> { + Ok(self + .db + .get::(&self.name, &persy::ByteVec(key.to_vec()))? + .map(|v| v.into_iter().map(|bv| bv.0).next()) + .flatten()) + } + + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + let watchers = self.watchers.read().unwrap(); + let mut triggered = Vec::new(); + + for length in 0..=key.len() { + if watchers.contains_key(&key[..length]) { + triggered.push(&key[..length]); + } + } + + drop(watchers); + + if !triggered.is_empty() { + let mut watchers = self.watchers.write().unwrap(); + for prefix in triggered { + if let Some(txs) = watchers.remove(prefix) { + for tx in txs { + let _ = tx.send(()); + } + } + } + } + + let mut tx = self.db.begin()?; + tx.put::( + &self.name, + persy::ByteVec(key.to_owned()), + persy::ByteVec(value.to_owned()), + )?; + tx.prepare()?.commit()?; + Ok(()) + } + + fn remove(&self, key: &[u8]) -> Result<()> { + let mut tx = self.db.begin()?; + tx.remove::( + &self.name, + persy::ByteVec(key.to_owned()), + None, + )?; + tx.prepare()?.commit()?; + Ok(()) + } + + fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { + Box::new( + self.db + .range::(&self.name, ..) + .unwrap() + .filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }), + ) + } + + fn iter_from<'a>( + &'a self, + from: &[u8], + backwards: bool, + ) -> Box, Box<[u8]>)> + 'a> { + Box::new( + self.db + .range::( + &self.name, + persy::ByteVec(from.to_owned()).., + ) + .unwrap() + .filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }), + ) + } + + fn increment(&self, key: &[u8]) -> Result> { + let old = self.get(key)?; + let new = utils::increment(old.as_deref()).unwrap(); + self.insert(key, &new)?; + Ok(new) + } + + fn scan_prefix<'a>( + &'a self, + prefix: Vec, + ) -> Box, Box<[u8]>)> + Send + 'a> { + let range_prefix = persy::ByteVec(prefix.to_owned()); + Box::new( + self.db + .range::( + &self.name, + range_prefix.clone()..=range_prefix, + ) + .unwrap() + .filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }), + ) + } + + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.watchers + .write() + .unwrap() + .entry(prefix.to_vec()) + .or_default() + .push(tx); + + Box::pin(async move { + // Tx is never destroyed + rx.await.unwrap(); + }) + } +} diff --git a/src/error.rs b/src/error.rs index 1ecef3a..ba5ae76 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,6 +35,11 @@ pub enum Error { SqliteError { #[from] source: rusqlite::Error, + #[cfg(feature = "persy")] + #[error("There was a problem with the connection to the persy database.")] + PersyError { + #[from] + source: persy::PersyError, }, #[cfg(feature = "heed")] #[error("There was a problem with the connection to the heed database: {error}")]