From d1ca88f704bb5e265095545424f8b6008a1ac5a3 Mon Sep 17 00:00:00 2001 From: Tglman Date: Fri, 18 Jun 2021 00:38:32 +0100 Subject: [PATCH 01/15] first integration of Persy as database Engine --- Cargo.lock | 56 +++++++++++ Cargo.toml | 4 + src/database.rs | 3 + src/database/abstraction.rs | 185 ++++++++++++++++++++++++++++++++++++ src/error.rs | 5 + 5 files changed, 253 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 0006b1c..9118d1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -170,6 +170,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "build_const" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ae4235e6dac0694637c763029ecea1a2ec9e4e06ec2729bd21ba4d9c863eb7" + [[package]] name = "bumpalo" version = "3.7.0" @@ -251,6 +257,7 @@ dependencies = [ "opentelemetry 0.16.0", "opentelemetry-jaeger", "parking_lot", + "persy", "pretty_env_logger", "rand 0.8.4", "regex", @@ -331,6 +338,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" +dependencies = [ + "build_const", +] + [[package]] name = "crc32fast" version = "1.2.1" @@ -1624,6 +1640,31 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "persy" +version = "0.11.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cc96c7b02a7abb1df713f326c4deb493ce2b72155935d3e6ed0ba24bd9e778d" +dependencies = [ + "byteorder", + "crc", + "data-encoding", + "fs2", + "linked-hash-map", + "rand 0.7.3", + "unsigned-varint", + "zigzag", +] + +[[package]] +name = "pest" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53" +dependencies = [ + "ucd-trie", +] + [[package]] name = "pin-project" version = "1.0.8" @@ -3225,6 +3266,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f14ee04d9415b52b3aeab06258a3f07093182b88ba0f9b8d203f211a7a7d41c7" +[[package]] +name = "unsigned-varint" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7fdeedbf205afadfe39ae559b75c3240f24e257d0ca27e85f85cb82aa19ac35" + [[package]] name = "untrusted" version = "0.7.1" @@ -3468,6 +3515,15 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zigzag" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf" +dependencies = [ + "num-traits", +] + [[package]] name = "zstd" version = "0.5.4+zstd.1.4.7" diff --git a/Cargo.toml b/Cargo.toml index 0290957..645e462 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,9 @@ tokio = "1.8.2" sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } +# Used for storing data permanently +persy = { version = "0.11", optional = true } + # Used for the http request / response body type for Ruma endpoints used with reqwest bytes = "1.0.1" # Used for rocket<->ruma conversions @@ -84,6 +87,7 @@ heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c7 [features] default = ["conduit_bin", "backend_sqlite"] backend_sled = ["sled"] +backend_persy = ["persy"] backend_sqlite = ["sqlite"] backend_heed = ["heed", "crossbeam"] sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"] diff --git a/src/database.rs b/src/database.rs index 8fd745b..6f535d2 100644 --- a/src/database.rs +++ b/src/database.rs @@ -132,6 +132,9 @@ pub type Engine = abstraction::sqlite::Engine; #[cfg(feature = "heed")] pub type Engine = abstraction::heed::Engine; +#[cfg(feature = "persy")] +pub type Engine = abstraction::PersyEngine; + pub struct Database { _db: Arc, pub globals: globals::Globals, diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 5b941fb..56557e1 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -3,6 +3,9 @@ use crate::Result; use std::{future::Future, pin::Pin, sync::Arc}; +#[cfg(any(feature = "persy"))] +use std::{collections::BTreeMap, sync::RwLock}; + #[cfg(feature = "sled")] pub mod sled; @@ -52,3 +55,185 @@ pub trait Tree: Send + Sync { Ok(()) } } + +// This is a functional integration at the state of the art of the current +// implementations, it should work and provide the base for stability and performance +// testing, Persy should be pretty resilient to crash and pretty lightweight in memory usage +// the speed in single thread will be pretty low because each transaction commit will wait for data +// to be flushed on disk, multi-thread should guarantee better performances even though I expect +// speed of a few thousand transactions per second. +// +// The current design of the engine right now do not allow to do transactions with multiple keys +// that would allow to reduce the latency quite a lot, anyway support transaction in the engine +// require a massive refactor. + +#[cfg(feature = "persy")] +pub struct PersyEngine(persy::Persy); + +#[cfg(feature = "persy")] +impl DatabaseEngine for PersyEngine { + fn open(config: &Config) -> Result> { + let cfg = persy::Config::new(); + // This is for tweak the in memory cache size + //config.change_cache_size(32 * 1024 * 1024 /*32Mb*/) + + let persy = persy::OpenOptions::new() + .create(true) + .config(cfg) + .open(&config.database_path)?; + Ok(Arc::new(PersyEngine(persy))) + } + + fn open_tree(self: &Arc, name: &'static str) -> Result> { + // Create if it doesn't exist + if !self.0.exists_index(name)? { + let mut tx = self.0.begin()?; + tx.create_index::(name, persy::ValueMode::REPLACE)?; + tx.prepare()?.commit()?; + } + + Ok(Arc::new(PersyTree { + db: self.0.clone(), + name: name.to_owned(), + watchers: RwLock::new(BTreeMap::new()), + })) + } +} + +#[cfg(feature = "persy")] +pub struct PersyTree { + db: persy::Persy, + name: String, + watchers: RwLock, Vec>>>, +} + +#[cfg(feature = "persy")] +impl Tree for PersyTree { + fn get(&self, key: &[u8]) -> Result>> { + Ok(self + .db + .get::(&self.name, &persy::ByteVec(key.to_vec()))? + .map(|v| v.into_iter().map(|bv| bv.0).next()) + .flatten()) + } + + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + let watchers = self.watchers.read().unwrap(); + let mut triggered = Vec::new(); + + for length in 0..=key.len() { + if watchers.contains_key(&key[..length]) { + triggered.push(&key[..length]); + } + } + + drop(watchers); + + if !triggered.is_empty() { + let mut watchers = self.watchers.write().unwrap(); + for prefix in triggered { + if let Some(txs) = watchers.remove(prefix) { + for tx in txs { + let _ = tx.send(()); + } + } + } + } + + let mut tx = self.db.begin()?; + tx.put::( + &self.name, + persy::ByteVec(key.to_owned()), + persy::ByteVec(value.to_owned()), + )?; + tx.prepare()?.commit()?; + Ok(()) + } + + fn remove(&self, key: &[u8]) -> Result<()> { + let mut tx = self.db.begin()?; + tx.remove::( + &self.name, + persy::ByteVec(key.to_owned()), + None, + )?; + tx.prepare()?.commit()?; + Ok(()) + } + + fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { + Box::new( + self.db + .range::(&self.name, ..) + .unwrap() + .filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }), + ) + } + + fn iter_from<'a>( + &'a self, + from: &[u8], + backwards: bool, + ) -> Box, Box<[u8]>)> + 'a> { + Box::new( + self.db + .range::( + &self.name, + persy::ByteVec(from.to_owned()).., + ) + .unwrap() + .filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }), + ) + } + + fn increment(&self, key: &[u8]) -> Result> { + let old = self.get(key)?; + let new = utils::increment(old.as_deref()).unwrap(); + self.insert(key, &new)?; + Ok(new) + } + + fn scan_prefix<'a>( + &'a self, + prefix: Vec, + ) -> Box, Box<[u8]>)> + Send + 'a> { + let range_prefix = persy::ByteVec(prefix.to_owned()); + Box::new( + self.db + .range::( + &self.name, + range_prefix.clone()..=range_prefix, + ) + .unwrap() + .filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }), + ) + } + + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.watchers + .write() + .unwrap() + .entry(prefix.to_vec()) + .or_default() + .push(tx); + + Box::pin(async move { + // Tx is never destroyed + rx.await.unwrap(); + }) + } +} diff --git a/src/error.rs b/src/error.rs index 1ecef3a..ba5ae76 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,6 +35,11 @@ pub enum Error { SqliteError { #[from] source: rusqlite::Error, + #[cfg(feature = "persy")] + #[error("There was a problem with the connection to the persy database.")] + PersyError { + #[from] + source: persy::PersyError, }, #[cfg(feature = "heed")] #[error("There was a problem with the connection to the heed database: {error}")] From 91e79bd9f4ce34a692a80149771d02ef4d95c929 Mon Sep 17 00:00:00 2001 From: Tglman Date: Fri, 18 Jun 2021 19:00:22 +0100 Subject: [PATCH 02/15] minor fix in the integration with persy --- src/database/abstraction.rs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 56557e1..de63301 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -80,7 +80,7 @@ impl DatabaseEngine for PersyEngine { let persy = persy::OpenOptions::new() .create(true) .config(cfg) - .open(&config.database_path)?; + .open(&format!("{}/db.persy", config.database_path))?; Ok(Arc::new(PersyEngine(persy))) } @@ -179,19 +179,31 @@ impl Tree for PersyTree { from: &[u8], backwards: bool, ) -> Box, Box<[u8]>)> + 'a> { - Box::new( + let iter = if backwards { + self.db + .range::( + &self.name, + ..persy::ByteVec(from.to_owned()), + ) + .unwrap() + } else { self.db .range::( &self.name, persy::ByteVec(from.to_owned()).., ) .unwrap() - .filter_map(|(k, v)| { - v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) - .next() - }), - ) + }; + let map = iter.filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }); + if backwards { + Box::new(map.rev()) + } else { + Box::new(map) + } } fn increment(&self, key: &[u8]) -> Result> { @@ -210,9 +222,10 @@ impl Tree for PersyTree { self.db .range::( &self.name, - range_prefix.clone()..=range_prefix, + range_prefix.., ) .unwrap() + .take_while(move |(k, _)| k.0.starts_with(&prefix)) .filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) From 106faff685b5e9614bc03a2da0e5b38338e47dde Mon Sep 17 00:00:00 2001 From: Tglman Date: Wed, 23 Jun 2021 20:51:27 +0100 Subject: [PATCH 03/15] add buffered write cache to reduce write latancy --- src/database/abstraction.rs | 355 +++++++++++++++++++++++++++++++----- 1 file changed, 314 insertions(+), 41 deletions(-) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index de63301..9ed7dc9 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -6,6 +6,9 @@ use std::{future::Future, pin::Pin, sync::Arc}; #[cfg(any(feature = "persy"))] use std::{collections::BTreeMap, sync::RwLock}; +#[cfg(feature = "persy")] +use std::{cmp::Ordering, collections::BTreeSet, iter::Peekable}; + #[cfg(feature = "sled")] pub mod sled; @@ -68,7 +71,7 @@ pub trait Tree: Send + Sync { // require a massive refactor. #[cfg(feature = "persy")] -pub struct PersyEngine(persy::Persy); +pub struct PersyEngine(persy::Persy, Arc>); #[cfg(feature = "persy")] impl DatabaseEngine for PersyEngine { @@ -81,7 +84,14 @@ impl DatabaseEngine for PersyEngine { .create(true) .config(cfg) .open(&format!("{}/db.persy", config.database_path))?; - Ok(Arc::new(PersyEngine(persy))) + + let write_cache = Arc::new(RwLock::new(WriteCache { + add_cache: Default::default(), + remove_cache: Default::default(), + changes_count: Default::default(), + db: persy.clone(), + })); + Ok(Arc::new(PersyEngine(persy, write_cache))) } fn open_tree(self: &Arc, name: &'static str) -> Result> { @@ -96,6 +106,7 @@ impl DatabaseEngine for PersyEngine { db: self.0.clone(), name: name.to_owned(), watchers: RwLock::new(BTreeMap::new()), + write_cache: self.1.clone(), })) } } @@ -105,16 +116,277 @@ pub struct PersyTree { db: persy::Persy, name: String, watchers: RwLock, Vec>>>, + write_cache: Arc>, +} + +#[cfg(feature = "persy")] +pub struct WriteCache { + add_cache: BTreeMap, Vec>>, + remove_cache: BTreeMap>>, + changes_count: i32, + db: persy::Persy, +} + +#[cfg(feature = "persy")] +impl WriteCache { + pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { + use std::collections::btree_map::Entry; + match self.add_cache.entry(index.clone()) { + Entry::Vacant(s) => { + let mut map = BTreeMap::new(); + map.insert(key.to_owned(), value.to_owned()); + s.insert(map); + } + Entry::Occupied(mut o) => { + o.get_mut().insert(key.to_owned(), value.to_owned()); + } + } + self.remove_remove(index, key)?; + self.check_and_flush()?; + Ok(()) + } + + pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { + use std::collections::btree_map::Entry; + match self.remove_cache.entry(index) { + Entry::Vacant(_) => {} + Entry::Occupied(mut o) => { + o.get_mut().remove(key); + } + } + Ok(()) + } + + pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { + use std::collections::btree_map::Entry; + match self.add_cache.entry(index) { + Entry::Vacant(_) => {} + Entry::Occupied(mut o) => { + o.get_mut().remove(key); + } + } + Ok(()) + } + + pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { + use std::collections::btree_map::Entry; + match self.remove_cache.entry(index.clone()) { + Entry::Vacant(s) => { + let mut map = BTreeSet::new(); + map.insert(key.to_owned()); + s.insert(map); + } + Entry::Occupied(mut o) => { + o.get_mut().insert(key.to_owned()); + } + } + self.remove_insert(index, key)?; + self.check_and_flush()?; + Ok(()) + } + pub fn check_and_flush(&mut self) -> Result<()> { + self.changes_count += 1; + if self.changes_count > 1000 { + self.flush_changes()?; + self.changes_count = 0; + } + Ok(()) + } + + pub fn get(&self, index: &str, key: &[u8], value: Option>) -> Result>> { + Ok(if let Some(changes) = self.add_cache.get(index) { + changes.get(key).map(|v| v.to_owned()).or(value) + } else if let Some(remove) = self.remove_cache.get(index) { + if remove.contains(key) { + None + } else { + value + } + } else { + value + }) + } + + fn flush_changes(&mut self) -> Result<()> { + use persy::ByteVec; + let mut tx = self.db.begin()?; + + for (index, changes) in &self.add_cache { + for (key, value) in changes { + tx.put::( + &index, + ByteVec(key.to_owned()), + ByteVec(value.to_owned()), + )?; + } + } + self.add_cache.clear(); + for (index, changes) in &self.remove_cache { + for key in changes { + tx.remove::(&index, ByteVec(key.to_owned()), None)?; + } + } + self.remove_cache.clear(); + tx.prepare()?.commit()?; + Ok(()) + } + + pub fn iter<'a>( + &self, + index: &str, + mut iter: Box, Box<[u8]>)> + Send + Sync + 'a>, + ) -> Box, Box<[u8]>)> + Send + Sync + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); + iter = Box::new(UnionIter::new(iter, added, false)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let to_filter = removes.clone(); + iter = Box::new(iter.filter(move |x| to_filter.contains(&(*x.0).to_owned()))) + } + + iter + } + + fn iter_from<'a>( + &self, + index: &str, + from: &[u8], + backwards: bool, + mut iter: Box, Box<[u8]>)> + 'a>, + ) -> Box, Box<[u8]>)> + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let range = if backwards { + adds.range(..from.to_owned()) + } else { + adds.range(from.to_owned()..) + }; + let added = range + .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) + .collect::, Box<[u8]>)>>(); + let add_iter: Box, Box<[u8]>)>> = if backwards { + Box::new(added.into_iter().rev()) + } else { + Box::new(added.into_iter()) + }; + iter = Box::new(UnionIter::new(iter, add_iter, backwards)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let owned_from = from.to_owned(); + let to_filter = removes.iter(); + let to_filter = if backwards { + to_filter + .filter(|x| (..&owned_from).contains(x)) + .cloned() + .collect::>>() + } else { + to_filter + .filter(|x| (&owned_from..).contains(x)) + .cloned() + .collect::>>() + }; + iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) + } + iter + } + + fn scan_prefix<'a>( + &self, + index: &str, + prefix: Vec, + mut iter: Box, Box<[u8]>)> + Send + 'a>, + ) -> Box, Box<[u8]>)> + Send + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let owned_prefix = prefix.to_owned(); + let added = adds + .range(prefix.to_owned()..) + .take_while(move |(k, _)| k.starts_with(&owned_prefix)) + .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) + .collect::, Box<[u8]>)>>(); + iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let to_filter = removes + .iter() + .filter(move |k| k.starts_with(&prefix)) + .cloned() + .collect::>>(); + iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) + } + iter + } +} + +#[cfg(feature = "persy")] +struct UnionIter, T1: Iterator, I> { + first: Peekable, + second: Peekable, + backwards: bool, +} + +#[cfg(feature = "persy")] +impl, T1: Iterator, I> UnionIter { + fn new(first: T, second: T1, backwards: bool) -> Self { + UnionIter { + first: first.peekable(), + second: second.peekable(), + backwards, + } + } +} + +#[cfg(feature = "persy")] +impl Iterator for UnionIter +where + T: Iterator, + T1: Iterator, +{ + type Item = (K, V); + fn next(&mut self) -> Option { + if let (Some(f), Some(s)) = (self.first.peek(), self.second.peek()) { + if self.backwards { + match f.0.cmp(&s.0) { + Ordering::Less => self.second.next(), + Ordering::Greater => self.first.next(), + Ordering::Equal => { + self.first.next(); + self.second.next() + } + } + } else { + match f.0.cmp(&s.0) { + Ordering::Less => self.first.next(), + Ordering::Greater => self.second.next(), + Ordering::Equal => { + self.first.next(); + self.second.next() + } + } + } + } else { + self.first.next().or_else(|| self.second.next()) + } + } } #[cfg(feature = "persy")] impl Tree for PersyTree { fn get(&self, key: &[u8]) -> Result>> { - Ok(self + use persy::ByteVec; + let result = self .db - .get::(&self.name, &persy::ByteVec(key.to_vec()))? + .get::(&self.name, &ByteVec(key.to_vec()))? .map(|v| v.into_iter().map(|bv| bv.0).next()) - .flatten()) + .flatten(); + let result = self + .write_cache + .read() + .unwrap() + .get(&self.name, key, result)?; + Ok(result) } fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { @@ -139,39 +411,35 @@ impl Tree for PersyTree { } } } - - let mut tx = self.db.begin()?; - tx.put::( - &self.name, - persy::ByteVec(key.to_owned()), - persy::ByteVec(value.to_owned()), - )?; - tx.prepare()?.commit()?; + self.write_cache + .write() + .unwrap() + .insert(self.name.clone(), key, value)?; Ok(()) } fn remove(&self, key: &[u8]) -> Result<()> { - let mut tx = self.db.begin()?; - tx.remove::( - &self.name, - persy::ByteVec(key.to_owned()), - None, - )?; - tx.prepare()?.commit()?; + self.write_cache + .write() + .unwrap() + .remove(self.name.clone(), key)?; Ok(()) } fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { - Box::new( + use persy::ByteVec; + let result = Box::new( self.db - .range::(&self.name, ..) + .range::(&self.name, ..) .unwrap() .filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }), - ) + ); + + self.write_cache.read().unwrap().iter(&self.name, result) } fn iter_from<'a>( @@ -179,19 +447,14 @@ impl Tree for PersyTree { from: &[u8], backwards: bool, ) -> Box, Box<[u8]>)> + 'a> { + use persy::ByteVec; let iter = if backwards { self.db - .range::( - &self.name, - ..persy::ByteVec(from.to_owned()), - ) + .range::(&self.name, ..ByteVec(from.to_owned())) .unwrap() } else { self.db - .range::( - &self.name, - persy::ByteVec(from.to_owned()).., - ) + .range::(&self.name, ByteVec(from.to_owned())..) .unwrap() }; let map = iter.filter_map(|(k, v)| { @@ -199,11 +462,16 @@ impl Tree for PersyTree { .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }); - if backwards { + let result: Box, Box<[u8]>)>> = if backwards { Box::new(map.rev()) } else { Box::new(map) - } + }; + + self.write_cache + .read() + .unwrap() + .iter_from(&self.name, from, backwards, result) } fn increment(&self, key: &[u8]) -> Result> { @@ -217,21 +485,26 @@ impl Tree for PersyTree { &'a self, prefix: Vec, ) -> Box, Box<[u8]>)> + Send + 'a> { - let range_prefix = persy::ByteVec(prefix.to_owned()); - Box::new( + use persy::ByteVec; + + let range_prefix = ByteVec(prefix.to_owned()); + let owned_prefix = prefix.clone(); + let result = Box::new( self.db - .range::( - &self.name, - range_prefix.., - ) + .range::(&self.name, range_prefix..) .unwrap() - .take_while(move |(k, _)| k.0.starts_with(&prefix)) + .take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) .filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }), - ) + ); + + self.write_cache + .read() + .unwrap() + .scan_prefix(&self.name, prefix, result) } fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { From 7c8b05c3bf9c21aadb29751dc98fd3b8cacef2de Mon Sep 17 00:00:00 2001 From: Tglman Date: Wed, 23 Jun 2021 21:17:51 +0100 Subject: [PATCH 04/15] add timed flush logic --- src/database/abstraction.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 9ed7dc9..2267a30 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -7,7 +7,12 @@ use std::{future::Future, pin::Pin, sync::Arc}; use std::{collections::BTreeMap, sync::RwLock}; #[cfg(feature = "persy")] -use std::{cmp::Ordering, collections::BTreeSet, iter::Peekable}; +use std::{ + cmp::Ordering, + collections::BTreeSet, + iter::Peekable, + time::{Duration, Instant}, +}; #[cfg(feature = "sled")] pub mod sled; @@ -89,6 +94,7 @@ impl DatabaseEngine for PersyEngine { add_cache: Default::default(), remove_cache: Default::default(), changes_count: Default::default(), + last_flush: Instant::now(), db: persy.clone(), })); Ok(Arc::new(PersyEngine(persy, write_cache))) @@ -124,6 +130,7 @@ pub struct WriteCache { add_cache: BTreeMap, Vec>>, remove_cache: BTreeMap>>, changes_count: i32, + last_flush: Instant, db: persy::Persy, } @@ -228,6 +235,7 @@ impl WriteCache { } self.remove_cache.clear(); tx.prepare()?.commit()?; + self.last_flush = Instant::now(); Ok(()) } @@ -318,6 +326,16 @@ impl WriteCache { } iter } + + #[allow(unused)] + pub fn flush_timed(&mut self) -> Result<()> { + if self.changes_count > 0 { + if Instant::now() - self.last_flush > Duration::from_secs(2) { + self.flush_changes()?; + } + } + Ok(()) + } } #[cfg(feature = "persy")] From f6c092a0dd2fdf4c28fd4bcd3c98a84f807e66e7 Mon Sep 17 00:00:00 2001 From: Tglman Date: Wed, 23 Jun 2021 23:47:01 +0100 Subject: [PATCH 05/15] add flush on drop to handle soft shutdown write --- src/database/abstraction.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 2267a30..88f71d9 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -338,6 +338,15 @@ impl WriteCache { } } +#[cfg(feature = "persy")] +impl Drop for WriteCache { + fn drop(&mut self) { + if self.changes_count > 0 { + self.flush_changes().unwrap(); + } + } +} + #[cfg(feature = "persy")] struct UnionIter, T1: Iterator, I> { first: Peekable, From 44bc040e2f15bfa77c6ab98655f0394f094a44bc Mon Sep 17 00:00:00 2001 From: Tglman Date: Sat, 26 Jun 2021 13:22:52 +0100 Subject: [PATCH 06/15] cleanup of persy integration, add timer for background flush of write cache --- Cargo.toml | 5 +- src/database/abstraction.rs | 119 ++++++++++++++++++++---------------- 2 files changed, 70 insertions(+), 54 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 645e462..cc8f125 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,9 +28,10 @@ tokio = "1.8.2" # Used for storing data permanently sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } - -# Used for storing data permanently persy = { version = "0.11", optional = true } +# Used by the persy write cache for background flush +timer = "0.2" +chrono = "0.4" # Used for the http request / response body type for Ruma endpoints used with reqwest bytes = "1.0.1" diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 88f71d9..91bcff9 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -9,11 +9,17 @@ use std::{collections::BTreeMap, sync::RwLock}; #[cfg(feature = "persy")] use std::{ cmp::Ordering, - collections::BTreeSet, + collections::{btree_map::Entry, BTreeSet}, iter::Peekable, time::{Duration, Instant}, }; +#[cfg(feature = "persy")] +use persy::{ByteVec, OpenOptions, Persy, ValueMode}; + +#[cfg(feature = "persy")] +use timer::Timer; + #[cfg(feature = "sled")] pub mod sled; @@ -64,62 +70,64 @@ pub trait Tree: Send + Sync { } } -// This is a functional integration at the state of the art of the current -// implementations, it should work and provide the base for stability and performance -// testing, Persy should be pretty resilient to crash and pretty lightweight in memory usage -// the speed in single thread will be pretty low because each transaction commit will wait for data -// to be flushed on disk, multi-thread should guarantee better performances even though I expect -// speed of a few thousand transactions per second. -// -// The current design of the engine right now do not allow to do transactions with multiple keys -// that would allow to reduce the latency quite a lot, anyway support transaction in the engine -// require a massive refactor. - #[cfg(feature = "persy")] -pub struct PersyEngine(persy::Persy, Arc>); +pub struct PersyEngine { + persy: Persy, + write_cache: Arc>, + #[allow(unused)] + timer: Timer, +} #[cfg(feature = "persy")] impl DatabaseEngine for PersyEngine { fn open(config: &Config) -> Result> { - let cfg = persy::Config::new(); - // This is for tweak the in memory cache size - //config.change_cache_size(32 * 1024 * 1024 /*32Mb*/) + let mut cfg = persy::Config::new(); + cfg.change_cache_size(config.cache_capacity as u64); - let persy = persy::OpenOptions::new() + let persy = OpenOptions::new() .create(true) .config(cfg) .open(&format!("{}/db.persy", config.database_path))?; - - let write_cache = Arc::new(RwLock::new(WriteCache { - add_cache: Default::default(), - remove_cache: Default::default(), - changes_count: Default::default(), - last_flush: Instant::now(), - db: persy.clone(), - })); - Ok(Arc::new(PersyEngine(persy, write_cache))) + let write_cache_size = 1000; + let write_cache_window_millisecs = 1000; + let write_cache = Arc::new(RwLock::new(WriteCache::new( + &persy, + write_cache_size, + Duration::from_millis(write_cache_window_millisecs), + ))); + let timer = Timer::new(); + let timer_write_cache = write_cache.clone(); + timer.schedule_repeating( + chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), + move || timer_write_cache.write().unwrap().flush_timed().unwrap(), + ); + Ok(Arc::new(PersyEngine { + persy, + write_cache, + timer, + })) } fn open_tree(self: &Arc, name: &'static str) -> Result> { // Create if it doesn't exist - if !self.0.exists_index(name)? { - let mut tx = self.0.begin()?; - tx.create_index::(name, persy::ValueMode::REPLACE)?; + if !self.persy.exists_index(name)? { + let mut tx = self.persy.begin()?; + tx.create_index::(name, ValueMode::REPLACE)?; tx.prepare()?.commit()?; } Ok(Arc::new(PersyTree { - db: self.0.clone(), + persy: self.persy.clone(), name: name.to_owned(), watchers: RwLock::new(BTreeMap::new()), - write_cache: self.1.clone(), + write_cache: self.write_cache.clone(), })) } } #[cfg(feature = "persy")] pub struct PersyTree { - db: persy::Persy, + persy: Persy, name: String, watchers: RwLock, Vec>>>, write_cache: Arc>, @@ -129,15 +137,31 @@ pub struct PersyTree { pub struct WriteCache { add_cache: BTreeMap, Vec>>, remove_cache: BTreeMap>>, - changes_count: i32, + changes_count: u32, last_flush: Instant, - db: persy::Persy, + persy: Persy, + max_size: u32, + max_time_window: Duration, +} + +#[cfg(feature = "persy")] +impl WriteCache { + fn new(persy: &Persy, max_size: u32, max_time_window: Duration) -> Self { + Self { + add_cache: Default::default(), + remove_cache: Default::default(), + changes_count: Default::default(), + last_flush: Instant::now(), + persy: persy.clone(), + max_size, + max_time_window, + } + } } #[cfg(feature = "persy")] impl WriteCache { pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { - use std::collections::btree_map::Entry; match self.add_cache.entry(index.clone()) { Entry::Vacant(s) => { let mut map = BTreeMap::new(); @@ -154,7 +178,6 @@ impl WriteCache { } pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { - use std::collections::btree_map::Entry; match self.remove_cache.entry(index) { Entry::Vacant(_) => {} Entry::Occupied(mut o) => { @@ -165,7 +188,6 @@ impl WriteCache { } pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { - use std::collections::btree_map::Entry; match self.add_cache.entry(index) { Entry::Vacant(_) => {} Entry::Occupied(mut o) => { @@ -176,7 +198,6 @@ impl WriteCache { } pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { - use std::collections::btree_map::Entry; match self.remove_cache.entry(index.clone()) { Entry::Vacant(s) => { let mut map = BTreeSet::new(); @@ -193,7 +214,7 @@ impl WriteCache { } pub fn check_and_flush(&mut self) -> Result<()> { self.changes_count += 1; - if self.changes_count > 1000 { + if self.changes_count > self.max_size { self.flush_changes()?; self.changes_count = 0; } @@ -215,8 +236,7 @@ impl WriteCache { } fn flush_changes(&mut self) -> Result<()> { - use persy::ByteVec; - let mut tx = self.db.begin()?; + let mut tx = self.persy.begin()?; for (index, changes) in &self.add_cache { for (key, value) in changes { @@ -330,7 +350,7 @@ impl WriteCache { #[allow(unused)] pub fn flush_timed(&mut self) -> Result<()> { if self.changes_count > 0 { - if Instant::now() - self.last_flush > Duration::from_secs(2) { + if Instant::now() - self.last_flush > self.max_time_window { self.flush_changes()?; } } @@ -402,9 +422,8 @@ where #[cfg(feature = "persy")] impl Tree for PersyTree { fn get(&self, key: &[u8]) -> Result>> { - use persy::ByteVec; let result = self - .db + .persy .get::(&self.name, &ByteVec(key.to_vec()))? .map(|v| v.into_iter().map(|bv| bv.0).next()) .flatten(); @@ -454,9 +473,8 @@ impl Tree for PersyTree { } fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { - use persy::ByteVec; let result = Box::new( - self.db + self.persy .range::(&self.name, ..) .unwrap() .filter_map(|(k, v)| { @@ -474,13 +492,12 @@ impl Tree for PersyTree { from: &[u8], backwards: bool, ) -> Box, Box<[u8]>)> + 'a> { - use persy::ByteVec; let iter = if backwards { - self.db + self.persy .range::(&self.name, ..ByteVec(from.to_owned())) .unwrap() } else { - self.db + self.persy .range::(&self.name, ByteVec(from.to_owned())..) .unwrap() }; @@ -512,12 +529,10 @@ impl Tree for PersyTree { &'a self, prefix: Vec, ) -> Box, Box<[u8]>)> + Send + 'a> { - use persy::ByteVec; - let range_prefix = ByteVec(prefix.to_owned()); let owned_prefix = prefix.clone(); let result = Box::new( - self.db + self.persy .range::(&self.name, range_prefix..) .unwrap() .take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) From 5e2d5295d9c5d82f7d5c0de80422b95aa4cc682b Mon Sep 17 00:00:00 2001 From: Tglman Date: Sat, 26 Jun 2021 13:41:32 +0100 Subject: [PATCH 07/15] add error management in the implementation --- src/database/abstraction.rs | 105 +++++++++++++++++++++--------------- 1 file changed, 62 insertions(+), 43 deletions(-) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 91bcff9..bfd7614 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -473,18 +473,22 @@ impl Tree for PersyTree { } fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { - let result = Box::new( - self.persy - .range::(&self.name, ..) - .unwrap() - .filter_map(|(k, v)| { + let iter = self.persy.range::(&self.name, ..); + match iter { + Ok(iter) => { + let result = Box::new(iter.filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() - }), - ); + })); - self.write_cache.read().unwrap().iter(&self.name, result) + self.write_cache.read().unwrap().iter(&self.name, result) + } + Err(e) => { + log::warn!("error iterating {:?}", e); + Box::new(std::iter::empty()) + } + } } fn iter_from<'a>( @@ -492,30 +496,36 @@ impl Tree for PersyTree { from: &[u8], backwards: bool, ) -> Box, Box<[u8]>)> + 'a> { - let iter = if backwards { + let range = if backwards { self.persy .range::(&self.name, ..ByteVec(from.to_owned())) - .unwrap() } else { self.persy .range::(&self.name, ByteVec(from.to_owned())..) - .unwrap() - }; - let map = iter.filter_map(|(k, v)| { - v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) - .next() - }); - let result: Box, Box<[u8]>)>> = if backwards { - Box::new(map.rev()) - } else { - Box::new(map) }; - - self.write_cache - .read() - .unwrap() - .iter_from(&self.name, from, backwards, result) + match range { + Ok(iter) => { + let map = iter.filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }); + let result: Box, Box<[u8]>)>> = if backwards { + Box::new(map.rev()) + } else { + Box::new(map) + }; + + self.write_cache + .read() + .unwrap() + .iter_from(&self.name, from, backwards, result) + } + Err(e) => { + log::warn!("error iterating with prefix {:?}", e); + Box::new(std::iter::empty()) + } + } } fn increment(&self, key: &[u8]) -> Result> { @@ -530,23 +540,32 @@ impl Tree for PersyTree { prefix: Vec, ) -> Box, Box<[u8]>)> + Send + 'a> { let range_prefix = ByteVec(prefix.to_owned()); - let owned_prefix = prefix.clone(); - let result = Box::new( - self.persy - .range::(&self.name, range_prefix..) - .unwrap() - .take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) - .filter_map(|(k, v)| { - v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) - .next() - }), - ); - - self.write_cache - .read() - .unwrap() - .scan_prefix(&self.name, prefix, result) + let range = self + .persy + .range::(&self.name, range_prefix..); + + match range { + Ok(iter) => { + let owned_prefix = prefix.clone(); + let result = Box::new( + iter.take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) + .filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }), + ); + + self.write_cache + .read() + .unwrap() + .scan_prefix(&self.name, prefix, result) + } + Err(e) => { + log::warn!("error scanning prefix {:?}", e); + Box::new(std::iter::empty()) + } + } } fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { From d4f9f958b1f5892290cf564e633f1193e03200df Mon Sep 17 00:00:00 2001 From: Tglman Date: Thu, 15 Jul 2021 00:23:19 +0100 Subject: [PATCH 08/15] fixed compilation issues after migration --- src/database/abstraction.rs | 34 +++++++++++++++++++--------------- src/error.rs | 1 + 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index bfd7614..d89a18f 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -82,7 +82,7 @@ pub struct PersyEngine { impl DatabaseEngine for PersyEngine { fn open(config: &Config) -> Result> { let mut cfg = persy::Config::new(); - cfg.change_cache_size(config.cache_capacity as u64); + cfg.change_cache_size(config.db_cache_capacity_mb as u64 *1048576f64 as u64); let persy = OpenOptions::new() .create(true) @@ -123,6 +123,10 @@ impl DatabaseEngine for PersyEngine { write_cache: self.write_cache.clone(), })) } + + fn flush(self: &Arc) -> Result<()> { + Ok(()) + } } #[cfg(feature = "persy")] @@ -262,8 +266,8 @@ impl WriteCache { pub fn iter<'a>( &self, index: &str, - mut iter: Box, Box<[u8]>)> + Send + Sync + 'a>, - ) -> Box, Box<[u8]>)> + Send + Sync + 'a> { + mut iter: Box, Vec)> + Send + Sync + 'a>, + ) -> Box, Vec)> + Send + Sync + 'a> { if let Some(adds) = self.add_cache.get(index) { let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); iter = Box::new(UnionIter::new(iter, added, false)) @@ -282,8 +286,8 @@ impl WriteCache { index: &str, from: &[u8], backwards: bool, - mut iter: Box, Box<[u8]>)> + 'a>, - ) -> Box, Box<[u8]>)> + 'a> { + mut iter: Box, Vec)> + Send+ 'a>, + ) -> Box, Vec)> + Send + 'a> { if let Some(adds) = self.add_cache.get(index) { let range = if backwards { adds.range(..from.to_owned()) @@ -292,8 +296,8 @@ impl WriteCache { }; let added = range .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) - .collect::, Box<[u8]>)>>(); - let add_iter: Box, Box<[u8]>)>> = if backwards { + .collect::, Vec)>>(); + let add_iter: Box, Vec)> + Send> = if backwards { Box::new(added.into_iter().rev()) } else { Box::new(added.into_iter()) @@ -324,15 +328,15 @@ impl WriteCache { &self, index: &str, prefix: Vec, - mut iter: Box, Box<[u8]>)> + Send + 'a>, - ) -> Box, Box<[u8]>)> + Send + 'a> { + mut iter: Box, Vec)> + Send + 'a>, + ) -> Box, Vec)> + Send + 'a> { if let Some(adds) = self.add_cache.get(index) { let owned_prefix = prefix.to_owned(); let added = adds .range(prefix.to_owned()..) .take_while(move |(k, _)| k.starts_with(&owned_prefix)) .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) - .collect::, Box<[u8]>)>>(); + .collect::, Vec)>>(); iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) } @@ -472,7 +476,7 @@ impl Tree for PersyTree { Ok(()) } - fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { + fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { let iter = self.persy.range::(&self.name, ..); match iter { Ok(iter) => { @@ -495,7 +499,7 @@ impl Tree for PersyTree { &'a self, from: &[u8], backwards: bool, - ) -> Box, Box<[u8]>)> + 'a> { + ) -> Box, Vec)> + Send + 'a> { let range = if backwards { self.persy .range::(&self.name, ..ByteVec(from.to_owned())) @@ -510,7 +514,7 @@ impl Tree for PersyTree { .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }); - let result: Box, Box<[u8]>)>> = if backwards { + let result: Box, Vec)> + Send+'a > = if backwards { Box::new(map.rev()) } else { Box::new(map) @@ -530,7 +534,7 @@ impl Tree for PersyTree { fn increment(&self, key: &[u8]) -> Result> { let old = self.get(key)?; - let new = utils::increment(old.as_deref()).unwrap(); + let new = crate::utils::increment(old.as_deref()).unwrap(); self.insert(key, &new)?; Ok(new) } @@ -538,7 +542,7 @@ impl Tree for PersyTree { fn scan_prefix<'a>( &'a self, prefix: Vec, - ) -> Box, Box<[u8]>)> + Send + 'a> { + ) -> Box, Vec)> + Send+ 'a> { let range_prefix = ByteVec(prefix.to_owned()); let range = self .persy diff --git a/src/error.rs b/src/error.rs index ba5ae76..63d3b21 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,6 +35,7 @@ pub enum Error { SqliteError { #[from] source: rusqlite::Error, + }, #[cfg(feature = "persy")] #[error("There was a problem with the connection to the persy database.")] PersyError { From d28475e402803dd6e6b3a027180e856c3e60145f Mon Sep 17 00:00:00 2001 From: Tglman Date: Thu, 15 Jul 2021 00:31:09 +0100 Subject: [PATCH 09/15] moved persy implementation to specific module --- src/database.rs | 2 +- src/database/abstraction.rs | 544 +----------------------------- src/database/abstraction/persy.rs | 526 +++++++++++++++++++++++++++++ 3 files changed, 530 insertions(+), 542 deletions(-) create mode 100644 src/database/abstraction/persy.rs diff --git a/src/database.rs b/src/database.rs index 6f535d2..d4a4c8c 100644 --- a/src/database.rs +++ b/src/database.rs @@ -133,7 +133,7 @@ pub type Engine = abstraction::sqlite::Engine; pub type Engine = abstraction::heed::Engine; #[cfg(feature = "persy")] -pub type Engine = abstraction::PersyEngine; +pub type Engine = abstraction::persy::PersyEngine; pub struct Database { _db: Arc, diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index d89a18f..b1fd87e 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -3,32 +3,12 @@ use crate::Result; use std::{future::Future, pin::Pin, sync::Arc}; -#[cfg(any(feature = "persy"))] -use std::{collections::BTreeMap, sync::RwLock}; - -#[cfg(feature = "persy")] -use std::{ - cmp::Ordering, - collections::{btree_map::Entry, BTreeSet}, - iter::Peekable, - time::{Duration, Instant}, -}; - -#[cfg(feature = "persy")] -use persy::{ByteVec, OpenOptions, Persy, ValueMode}; - -#[cfg(feature = "persy")] -use timer::Timer; - -#[cfg(feature = "sled")] -pub mod sled; - -#[cfg(feature = "sqlite")] -pub mod sqlite; - #[cfg(feature = "heed")] pub mod heed; +#[cfg(feature = "persy")] +pub mod persy; + pub trait DatabaseEngine: Sized { fn open(config: &Config) -> Result>; fn open_tree(self: &Arc, name: &'static str) -> Result>; @@ -70,521 +50,3 @@ pub trait Tree: Send + Sync { } } -#[cfg(feature = "persy")] -pub struct PersyEngine { - persy: Persy, - write_cache: Arc>, - #[allow(unused)] - timer: Timer, -} - -#[cfg(feature = "persy")] -impl DatabaseEngine for PersyEngine { - fn open(config: &Config) -> Result> { - let mut cfg = persy::Config::new(); - cfg.change_cache_size(config.db_cache_capacity_mb as u64 *1048576f64 as u64); - - let persy = OpenOptions::new() - .create(true) - .config(cfg) - .open(&format!("{}/db.persy", config.database_path))?; - let write_cache_size = 1000; - let write_cache_window_millisecs = 1000; - let write_cache = Arc::new(RwLock::new(WriteCache::new( - &persy, - write_cache_size, - Duration::from_millis(write_cache_window_millisecs), - ))); - let timer = Timer::new(); - let timer_write_cache = write_cache.clone(); - timer.schedule_repeating( - chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), - move || timer_write_cache.write().unwrap().flush_timed().unwrap(), - ); - Ok(Arc::new(PersyEngine { - persy, - write_cache, - timer, - })) - } - - fn open_tree(self: &Arc, name: &'static str) -> Result> { - // Create if it doesn't exist - if !self.persy.exists_index(name)? { - let mut tx = self.persy.begin()?; - tx.create_index::(name, ValueMode::REPLACE)?; - tx.prepare()?.commit()?; - } - - Ok(Arc::new(PersyTree { - persy: self.persy.clone(), - name: name.to_owned(), - watchers: RwLock::new(BTreeMap::new()), - write_cache: self.write_cache.clone(), - })) - } - - fn flush(self: &Arc) -> Result<()> { - Ok(()) - } -} - -#[cfg(feature = "persy")] -pub struct PersyTree { - persy: Persy, - name: String, - watchers: RwLock, Vec>>>, - write_cache: Arc>, -} - -#[cfg(feature = "persy")] -pub struct WriteCache { - add_cache: BTreeMap, Vec>>, - remove_cache: BTreeMap>>, - changes_count: u32, - last_flush: Instant, - persy: Persy, - max_size: u32, - max_time_window: Duration, -} - -#[cfg(feature = "persy")] -impl WriteCache { - fn new(persy: &Persy, max_size: u32, max_time_window: Duration) -> Self { - Self { - add_cache: Default::default(), - remove_cache: Default::default(), - changes_count: Default::default(), - last_flush: Instant::now(), - persy: persy.clone(), - max_size, - max_time_window, - } - } -} - -#[cfg(feature = "persy")] -impl WriteCache { - pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { - match self.add_cache.entry(index.clone()) { - Entry::Vacant(s) => { - let mut map = BTreeMap::new(); - map.insert(key.to_owned(), value.to_owned()); - s.insert(map); - } - Entry::Occupied(mut o) => { - o.get_mut().insert(key.to_owned(), value.to_owned()); - } - } - self.remove_remove(index, key)?; - self.check_and_flush()?; - Ok(()) - } - - pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { - match self.remove_cache.entry(index) { - Entry::Vacant(_) => {} - Entry::Occupied(mut o) => { - o.get_mut().remove(key); - } - } - Ok(()) - } - - pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { - match self.add_cache.entry(index) { - Entry::Vacant(_) => {} - Entry::Occupied(mut o) => { - o.get_mut().remove(key); - } - } - Ok(()) - } - - pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { - match self.remove_cache.entry(index.clone()) { - Entry::Vacant(s) => { - let mut map = BTreeSet::new(); - map.insert(key.to_owned()); - s.insert(map); - } - Entry::Occupied(mut o) => { - o.get_mut().insert(key.to_owned()); - } - } - self.remove_insert(index, key)?; - self.check_and_flush()?; - Ok(()) - } - pub fn check_and_flush(&mut self) -> Result<()> { - self.changes_count += 1; - if self.changes_count > self.max_size { - self.flush_changes()?; - self.changes_count = 0; - } - Ok(()) - } - - pub fn get(&self, index: &str, key: &[u8], value: Option>) -> Result>> { - Ok(if let Some(changes) = self.add_cache.get(index) { - changes.get(key).map(|v| v.to_owned()).or(value) - } else if let Some(remove) = self.remove_cache.get(index) { - if remove.contains(key) { - None - } else { - value - } - } else { - value - }) - } - - fn flush_changes(&mut self) -> Result<()> { - let mut tx = self.persy.begin()?; - - for (index, changes) in &self.add_cache { - for (key, value) in changes { - tx.put::( - &index, - ByteVec(key.to_owned()), - ByteVec(value.to_owned()), - )?; - } - } - self.add_cache.clear(); - for (index, changes) in &self.remove_cache { - for key in changes { - tx.remove::(&index, ByteVec(key.to_owned()), None)?; - } - } - self.remove_cache.clear(); - tx.prepare()?.commit()?; - self.last_flush = Instant::now(); - Ok(()) - } - - pub fn iter<'a>( - &self, - index: &str, - mut iter: Box, Vec)> + Send + Sync + 'a>, - ) -> Box, Vec)> + Send + Sync + 'a> { - if let Some(adds) = self.add_cache.get(index) { - let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); - iter = Box::new(UnionIter::new(iter, added, false)) - } - - if let Some(removes) = self.remove_cache.get(index) { - let to_filter = removes.clone(); - iter = Box::new(iter.filter(move |x| to_filter.contains(&(*x.0).to_owned()))) - } - - iter - } - - fn iter_from<'a>( - &self, - index: &str, - from: &[u8], - backwards: bool, - mut iter: Box, Vec)> + Send+ 'a>, - ) -> Box, Vec)> + Send + 'a> { - if let Some(adds) = self.add_cache.get(index) { - let range = if backwards { - adds.range(..from.to_owned()) - } else { - adds.range(from.to_owned()..) - }; - let added = range - .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) - .collect::, Vec)>>(); - let add_iter: Box, Vec)> + Send> = if backwards { - Box::new(added.into_iter().rev()) - } else { - Box::new(added.into_iter()) - }; - iter = Box::new(UnionIter::new(iter, add_iter, backwards)) - } - - if let Some(removes) = self.remove_cache.get(index) { - let owned_from = from.to_owned(); - let to_filter = removes.iter(); - let to_filter = if backwards { - to_filter - .filter(|x| (..&owned_from).contains(x)) - .cloned() - .collect::>>() - } else { - to_filter - .filter(|x| (&owned_from..).contains(x)) - .cloned() - .collect::>>() - }; - iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) - } - iter - } - - fn scan_prefix<'a>( - &self, - index: &str, - prefix: Vec, - mut iter: Box, Vec)> + Send + 'a>, - ) -> Box, Vec)> + Send + 'a> { - if let Some(adds) = self.add_cache.get(index) { - let owned_prefix = prefix.to_owned(); - let added = adds - .range(prefix.to_owned()..) - .take_while(move |(k, _)| k.starts_with(&owned_prefix)) - .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) - .collect::, Vec)>>(); - iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) - } - - if let Some(removes) = self.remove_cache.get(index) { - let to_filter = removes - .iter() - .filter(move |k| k.starts_with(&prefix)) - .cloned() - .collect::>>(); - iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) - } - iter - } - - #[allow(unused)] - pub fn flush_timed(&mut self) -> Result<()> { - if self.changes_count > 0 { - if Instant::now() - self.last_flush > self.max_time_window { - self.flush_changes()?; - } - } - Ok(()) - } -} - -#[cfg(feature = "persy")] -impl Drop for WriteCache { - fn drop(&mut self) { - if self.changes_count > 0 { - self.flush_changes().unwrap(); - } - } -} - -#[cfg(feature = "persy")] -struct UnionIter, T1: Iterator, I> { - first: Peekable, - second: Peekable, - backwards: bool, -} - -#[cfg(feature = "persy")] -impl, T1: Iterator, I> UnionIter { - fn new(first: T, second: T1, backwards: bool) -> Self { - UnionIter { - first: first.peekable(), - second: second.peekable(), - backwards, - } - } -} - -#[cfg(feature = "persy")] -impl Iterator for UnionIter -where - T: Iterator, - T1: Iterator, -{ - type Item = (K, V); - fn next(&mut self) -> Option { - if let (Some(f), Some(s)) = (self.first.peek(), self.second.peek()) { - if self.backwards { - match f.0.cmp(&s.0) { - Ordering::Less => self.second.next(), - Ordering::Greater => self.first.next(), - Ordering::Equal => { - self.first.next(); - self.second.next() - } - } - } else { - match f.0.cmp(&s.0) { - Ordering::Less => self.first.next(), - Ordering::Greater => self.second.next(), - Ordering::Equal => { - self.first.next(); - self.second.next() - } - } - } - } else { - self.first.next().or_else(|| self.second.next()) - } - } -} - -#[cfg(feature = "persy")] -impl Tree for PersyTree { - fn get(&self, key: &[u8]) -> Result>> { - let result = self - .persy - .get::(&self.name, &ByteVec(key.to_vec()))? - .map(|v| v.into_iter().map(|bv| bv.0).next()) - .flatten(); - let result = self - .write_cache - .read() - .unwrap() - .get(&self.name, key, result)?; - Ok(result) - } - - fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { - let watchers = self.watchers.read().unwrap(); - let mut triggered = Vec::new(); - - for length in 0..=key.len() { - if watchers.contains_key(&key[..length]) { - triggered.push(&key[..length]); - } - } - - drop(watchers); - - if !triggered.is_empty() { - let mut watchers = self.watchers.write().unwrap(); - for prefix in triggered { - if let Some(txs) = watchers.remove(prefix) { - for tx in txs { - let _ = tx.send(()); - } - } - } - } - self.write_cache - .write() - .unwrap() - .insert(self.name.clone(), key, value)?; - Ok(()) - } - - fn remove(&self, key: &[u8]) -> Result<()> { - self.write_cache - .write() - .unwrap() - .remove(self.name.clone(), key)?; - Ok(()) - } - - fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { - let iter = self.persy.range::(&self.name, ..); - match iter { - Ok(iter) => { - let result = Box::new(iter.filter_map(|(k, v)| { - v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) - .next() - })); - - self.write_cache.read().unwrap().iter(&self.name, result) - } - Err(e) => { - log::warn!("error iterating {:?}", e); - Box::new(std::iter::empty()) - } - } - } - - fn iter_from<'a>( - &'a self, - from: &[u8], - backwards: bool, - ) -> Box, Vec)> + Send + 'a> { - let range = if backwards { - self.persy - .range::(&self.name, ..ByteVec(from.to_owned())) - } else { - self.persy - .range::(&self.name, ByteVec(from.to_owned())..) - }; - match range { - Ok(iter) => { - let map = iter.filter_map(|(k, v)| { - v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) - .next() - }); - let result: Box, Vec)> + Send+'a > = if backwards { - Box::new(map.rev()) - } else { - Box::new(map) - }; - - self.write_cache - .read() - .unwrap() - .iter_from(&self.name, from, backwards, result) - } - Err(e) => { - log::warn!("error iterating with prefix {:?}", e); - Box::new(std::iter::empty()) - } - } - } - - fn increment(&self, key: &[u8]) -> Result> { - let old = self.get(key)?; - let new = crate::utils::increment(old.as_deref()).unwrap(); - self.insert(key, &new)?; - Ok(new) - } - - fn scan_prefix<'a>( - &'a self, - prefix: Vec, - ) -> Box, Vec)> + Send+ 'a> { - let range_prefix = ByteVec(prefix.to_owned()); - let range = self - .persy - .range::(&self.name, range_prefix..); - - match range { - Ok(iter) => { - let owned_prefix = prefix.clone(); - let result = Box::new( - iter.take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) - .filter_map(|(k, v)| { - v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) - .next() - }), - ); - - self.write_cache - .read() - .unwrap() - .scan_prefix(&self.name, prefix, result) - } - Err(e) => { - log::warn!("error scanning prefix {:?}", e); - Box::new(std::iter::empty()) - } - } - } - - fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.watchers - .write() - .unwrap() - .entry(prefix.to_vec()) - .or_default() - .push(tx); - - Box::pin(async move { - // Tx is never destroyed - rx.await.unwrap(); - }) - } -} diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs new file mode 100644 index 0000000..bae43e7 --- /dev/null +++ b/src/database/abstraction/persy.rs @@ -0,0 +1,526 @@ +use persy::{ByteVec, OpenOptions, Persy, ValueMode}; +use timer::Timer; +use crate::database::abstraction::{DatabaseEngine, Tree}; +use crate::database::Config; +use crate::Result; + +use std::{ + future::Future, pin::Pin, + cmp::Ordering, + collections::{BTreeMap,btree_map::Entry, BTreeSet}, + iter::Peekable, + time::{Duration, Instant}, + sync::{RwLock, Arc} +}; + +pub struct PersyEngine { + persy: Persy, + write_cache: Arc>, + #[allow(unused)] + timer: Timer, +} + +impl DatabaseEngine for PersyEngine { + fn open(config: &Config) -> Result> { + let mut cfg = persy::Config::new(); + cfg.change_cache_size(config.db_cache_capacity_mb as u64 *1048576f64 as u64); + + let persy = OpenOptions::new() + .create(true) + .config(cfg) + .open(&format!("{}/db.persy", config.database_path))?; + let write_cache_size = 1000; + let write_cache_window_millisecs = 1000; + let write_cache = Arc::new(RwLock::new(WriteCache::new( + &persy, + write_cache_size, + Duration::from_millis(write_cache_window_millisecs), + ))); + let timer = Timer::new(); + let timer_write_cache = write_cache.clone(); + timer.schedule_repeating( + chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), + move || timer_write_cache.write().unwrap().flush_timed().unwrap(), + ); + Ok(Arc::new(PersyEngine { + persy, + write_cache, + timer, + })) + } + + fn open_tree(self: &Arc, name: &'static str) -> Result> { + // Create if it doesn't exist + if !self.persy.exists_index(name)? { + let mut tx = self.persy.begin()?; + tx.create_index::(name, ValueMode::REPLACE)?; + tx.prepare()?.commit()?; + } + + Ok(Arc::new(PersyTree { + persy: self.persy.clone(), + name: name.to_owned(), + watchers: RwLock::new(BTreeMap::new()), + write_cache: self.write_cache.clone(), + })) + } + + fn flush(self: &Arc) -> Result<()> { + Ok(()) + } +} + +pub struct PersyTree { + persy: Persy, + name: String, + watchers: RwLock, Vec>>>, + write_cache: Arc>, +} + +pub struct WriteCache { + add_cache: BTreeMap, Vec>>, + remove_cache: BTreeMap>>, + changes_count: u32, + last_flush: Instant, + persy: Persy, + max_size: u32, + max_time_window: Duration, +} + +impl WriteCache { + fn new(persy: &Persy, max_size: u32, max_time_window: Duration) -> Self { + Self { + add_cache: Default::default(), + remove_cache: Default::default(), + changes_count: Default::default(), + last_flush: Instant::now(), + persy: persy.clone(), + max_size, + max_time_window, + } + } +} + +impl WriteCache { + pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { + match self.add_cache.entry(index.clone()) { + Entry::Vacant(s) => { + let mut map = BTreeMap::new(); + map.insert(key.to_owned(), value.to_owned()); + s.insert(map); + } + Entry::Occupied(mut o) => { + o.get_mut().insert(key.to_owned(), value.to_owned()); + } + } + self.remove_remove(index, key)?; + self.check_and_flush()?; + Ok(()) + } + + pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { + match self.remove_cache.entry(index) { + Entry::Vacant(_) => {} + Entry::Occupied(mut o) => { + o.get_mut().remove(key); + } + } + Ok(()) + } + + pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { + match self.add_cache.entry(index) { + Entry::Vacant(_) => {} + Entry::Occupied(mut o) => { + o.get_mut().remove(key); + } + } + Ok(()) + } + + pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { + match self.remove_cache.entry(index.clone()) { + Entry::Vacant(s) => { + let mut map = BTreeSet::new(); + map.insert(key.to_owned()); + s.insert(map); + } + Entry::Occupied(mut o) => { + o.get_mut().insert(key.to_owned()); + } + } + self.remove_insert(index, key)?; + self.check_and_flush()?; + Ok(()) + } + pub fn check_and_flush(&mut self) -> Result<()> { + self.changes_count += 1; + if self.changes_count > self.max_size { + self.flush_changes()?; + self.changes_count = 0; + } + Ok(()) + } + + pub fn get(&self, index: &str, key: &[u8], value: Option>) -> Result>> { + Ok(if let Some(changes) = self.add_cache.get(index) { + changes.get(key).map(|v| v.to_owned()).or(value) + } else if let Some(remove) = self.remove_cache.get(index) { + if remove.contains(key) { + None + } else { + value + } + } else { + value + }) + } + + fn flush_changes(&mut self) -> Result<()> { + let mut tx = self.persy.begin()?; + + for (index, changes) in &self.add_cache { + for (key, value) in changes { + tx.put::( + &index, + ByteVec(key.to_owned()), + ByteVec(value.to_owned()), + )?; + } + } + self.add_cache.clear(); + for (index, changes) in &self.remove_cache { + for key in changes { + tx.remove::(&index, ByteVec(key.to_owned()), None)?; + } + } + self.remove_cache.clear(); + tx.prepare()?.commit()?; + self.last_flush = Instant::now(); + Ok(()) + } + + pub fn iter<'a>( + &self, + index: &str, + mut iter: Box, Vec)> + Send + Sync + 'a>, + ) -> Box, Vec)> + Send + Sync + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); + iter = Box::new(UnionIter::new(iter, added, false)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let to_filter = removes.clone(); + iter = Box::new(iter.filter(move |x| to_filter.contains(&(*x.0).to_owned()))) + } + + iter + } + + fn iter_from<'a>( + &self, + index: &str, + from: &[u8], + backwards: bool, + mut iter: Box, Vec)> + Send+ 'a>, + ) -> Box, Vec)> + Send + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let range = if backwards { + adds.range(..from.to_owned()) + } else { + adds.range(from.to_owned()..) + }; + let added = range + .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) + .collect::, Vec)>>(); + let add_iter: Box, Vec)> + Send> = if backwards { + Box::new(added.into_iter().rev()) + } else { + Box::new(added.into_iter()) + }; + iter = Box::new(UnionIter::new(iter, add_iter, backwards)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let owned_from = from.to_owned(); + let to_filter = removes.iter(); + let to_filter = if backwards { + to_filter + .filter(|x| (..&owned_from).contains(x)) + .cloned() + .collect::>>() + } else { + to_filter + .filter(|x| (&owned_from..).contains(x)) + .cloned() + .collect::>>() + }; + iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) + } + iter + } + + fn scan_prefix<'a>( + &self, + index: &str, + prefix: Vec, + mut iter: Box, Vec)> + Send + 'a>, + ) -> Box, Vec)> + Send + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let owned_prefix = prefix.to_owned(); + let added = adds + .range(prefix.to_owned()..) + .take_while(move |(k, _)| k.starts_with(&owned_prefix)) + .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) + .collect::, Vec)>>(); + iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let to_filter = removes + .iter() + .filter(move |k| k.starts_with(&prefix)) + .cloned() + .collect::>>(); + iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) + } + iter + } + + #[allow(unused)] + pub fn flush_timed(&mut self) -> Result<()> { + if self.changes_count > 0 { + if Instant::now() - self.last_flush > self.max_time_window { + self.flush_changes()?; + } + } + Ok(()) + } +} + +#[cfg(feature = "persy")] +impl Drop for WriteCache { + fn drop(&mut self) { + if self.changes_count > 0 { + self.flush_changes().unwrap(); + } + } +} + +#[cfg(feature = "persy")] +struct UnionIter, T1: Iterator, I> { + first: Peekable, + second: Peekable, + backwards: bool, +} + +#[cfg(feature = "persy")] +impl, T1: Iterator, I> UnionIter { + fn new(first: T, second: T1, backwards: bool) -> Self { + UnionIter { + first: first.peekable(), + second: second.peekable(), + backwards, + } + } +} + +#[cfg(feature = "persy")] +impl Iterator for UnionIter +where + T: Iterator, + T1: Iterator, +{ + type Item = (K, V); + fn next(&mut self) -> Option { + if let (Some(f), Some(s)) = (self.first.peek(), self.second.peek()) { + if self.backwards { + match f.0.cmp(&s.0) { + Ordering::Less => self.second.next(), + Ordering::Greater => self.first.next(), + Ordering::Equal => { + self.first.next(); + self.second.next() + } + } + } else { + match f.0.cmp(&s.0) { + Ordering::Less => self.first.next(), + Ordering::Greater => self.second.next(), + Ordering::Equal => { + self.first.next(); + self.second.next() + } + } + } + } else { + self.first.next().or_else(|| self.second.next()) + } + } +} + +impl Tree for PersyTree { + fn get(&self, key: &[u8]) -> Result>> { + let result = self + .persy + .get::(&self.name, &ByteVec(key.to_vec()))? + .map(|v| v.into_iter().map(|bv| bv.0).next()) + .flatten(); + let result = self + .write_cache + .read() + .unwrap() + .get(&self.name, key, result)?; + Ok(result) + } + + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + let watchers = self.watchers.read().unwrap(); + let mut triggered = Vec::new(); + + for length in 0..=key.len() { + if watchers.contains_key(&key[..length]) { + triggered.push(&key[..length]); + } + } + + drop(watchers); + + if !triggered.is_empty() { + let mut watchers = self.watchers.write().unwrap(); + for prefix in triggered { + if let Some(txs) = watchers.remove(prefix) { + for tx in txs { + let _ = tx.send(()); + } + } + } + } + self.write_cache + .write() + .unwrap() + .insert(self.name.clone(), key, value)?; + Ok(()) + } + + fn remove(&self, key: &[u8]) -> Result<()> { + self.write_cache + .write() + .unwrap() + .remove(self.name.clone(), key)?; + Ok(()) + } + + fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { + let iter = self.persy.range::(&self.name, ..); + match iter { + Ok(iter) => { + let result = Box::new(iter.filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + })); + + self.write_cache.read().unwrap().iter(&self.name, result) + } + Err(e) => { + log::warn!("error iterating {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + fn iter_from<'a>( + &'a self, + from: &[u8], + backwards: bool, + ) -> Box, Vec)> + Send + 'a> { + let range = if backwards { + self.persy + .range::(&self.name, ..ByteVec(from.to_owned())) + } else { + self.persy + .range::(&self.name, ByteVec(from.to_owned())..) + }; + match range { + Ok(iter) => { + let map = iter.filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }); + let result: Box, Vec)> + Send+'a > = if backwards { + Box::new(map.rev()) + } else { + Box::new(map) + }; + + self.write_cache + .read() + .unwrap() + .iter_from(&self.name, from, backwards, result) + } + Err(e) => { + log::warn!("error iterating with prefix {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + fn increment(&self, key: &[u8]) -> Result> { + let old = self.get(key)?; + let new = crate::utils::increment(old.as_deref()).unwrap(); + self.insert(key, &new)?; + Ok(new) + } + + fn scan_prefix<'a>( + &'a self, + prefix: Vec, + ) -> Box, Vec)> + Send+ 'a> { + let range_prefix = ByteVec(prefix.to_owned()); + let range = self + .persy + .range::(&self.name, range_prefix..); + + match range { + Ok(iter) => { + let owned_prefix = prefix.clone(); + let result = Box::new( + iter.take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) + .filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }), + ); + + self.write_cache + .read() + .unwrap() + .scan_prefix(&self.name, prefix, result) + } + Err(e) => { + log::warn!("error scanning prefix {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.watchers + .write() + .unwrap() + .entry(prefix.to_vec()) + .or_default() + .push(tx); + + Box::pin(async move { + // Tx is never destroyed + rx.await.unwrap(); + }) + } +} From 50a37c7a691dc934a6dd8597c71e5018af4cfe7e Mon Sep 17 00:00:00 2001 From: Tglman Date: Thu, 15 Jul 2021 00:50:52 +0100 Subject: [PATCH 10/15] refactored to use programmatic flush instead of timed flush of the write cache --- Cargo.toml | 2 +- src/database/abstraction.rs | 1 - src/database/abstraction/persy.rs | 51 +++++++++++++------------------ 3 files changed, 22 insertions(+), 32 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cc8f125..c8d84d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,7 +88,7 @@ heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c7 [features] default = ["conduit_bin", "backend_sqlite"] backend_sled = ["sled"] -backend_persy = ["persy"] +backend_persy = ["persy","num_cpus"] backend_sqlite = ["sqlite"] backend_heed = ["heed", "crossbeam"] sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"] diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index b1fd87e..4a98322 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -49,4 +49,3 @@ pub trait Tree: Send + Sync { Ok(()) } } - diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs index bae43e7..5a7c181 100644 --- a/src/database/abstraction/persy.rs +++ b/src/database/abstraction/persy.rs @@ -1,29 +1,31 @@ +use crate::{ + database::{ + abstraction::{DatabaseEngine, Tree}, + Config, + }, + Result, +}; use persy::{ByteVec, OpenOptions, Persy, ValueMode}; -use timer::Timer; -use crate::database::abstraction::{DatabaseEngine, Tree}; -use crate::database::Config; -use crate::Result; use std::{ - future::Future, pin::Pin, cmp::Ordering, - collections::{BTreeMap,btree_map::Entry, BTreeSet}, + collections::{btree_map::Entry, BTreeMap, BTreeSet}, + future::Future, iter::Peekable, + pin::Pin, + sync::{Arc, RwLock}, time::{Duration, Instant}, - sync::{RwLock, Arc} }; pub struct PersyEngine { persy: Persy, write_cache: Arc>, - #[allow(unused)] - timer: Timer, } impl DatabaseEngine for PersyEngine { fn open(config: &Config) -> Result> { let mut cfg = persy::Config::new(); - cfg.change_cache_size(config.db_cache_capacity_mb as u64 *1048576f64 as u64); + cfg.change_cache_size((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64); let persy = OpenOptions::new() .create(true) @@ -36,17 +38,16 @@ impl DatabaseEngine for PersyEngine { write_cache_size, Duration::from_millis(write_cache_window_millisecs), ))); + /* + use timer::Timer; let timer = Timer::new(); let timer_write_cache = write_cache.clone(); timer.schedule_repeating( chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), move || timer_write_cache.write().unwrap().flush_timed().unwrap(), ); - Ok(Arc::new(PersyEngine { - persy, - write_cache, - timer, - })) + */ + Ok(Arc::new(PersyEngine { persy, write_cache })) } fn open_tree(self: &Arc, name: &'static str) -> Result> { @@ -66,6 +67,7 @@ impl DatabaseEngine for PersyEngine { } fn flush(self: &Arc) -> Result<()> { + self.write_cache.write().unwrap().flush_changes()?; Ok(()) } } @@ -223,7 +225,7 @@ impl WriteCache { index: &str, from: &[u8], backwards: bool, - mut iter: Box, Vec)> + Send+ 'a>, + mut iter: Box, Vec)> + Send + 'a>, ) -> Box, Vec)> + Send + 'a> { if let Some(adds) = self.add_cache.get(index) { let range = if backwards { @@ -299,23 +301,12 @@ impl WriteCache { } } -#[cfg(feature = "persy")] -impl Drop for WriteCache { - fn drop(&mut self) { - if self.changes_count > 0 { - self.flush_changes().unwrap(); - } - } -} - -#[cfg(feature = "persy")] struct UnionIter, T1: Iterator, I> { first: Peekable, second: Peekable, backwards: bool, } -#[cfg(feature = "persy")] impl, T1: Iterator, I> UnionIter { fn new(first: T, second: T1, backwards: bool) -> Self { UnionIter { @@ -326,7 +317,6 @@ impl, T1: Iterator, I> UnionIter { } } -#[cfg(feature = "persy")] impl Iterator for UnionIter where T: Iterator, @@ -450,7 +440,8 @@ impl Tree for PersyTree { .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }); - let result: Box, Vec)> + Send+'a > = if backwards { + let result: Box, Vec)> + Send + 'a> = if backwards + { Box::new(map.rev()) } else { Box::new(map) @@ -478,7 +469,7 @@ impl Tree for PersyTree { fn scan_prefix<'a>( &'a self, prefix: Vec, - ) -> Box, Vec)> + Send+ 'a> { + ) -> Box, Vec)> + Send + 'a> { let range_prefix = ByteVec(prefix.to_owned()); let range = self .persy From 16ae5207548f1c9eca6e852df5cdc9a35c2af48c Mon Sep 17 00:00:00 2001 From: Tglman Date: Sat, 17 Jul 2021 18:46:11 +0100 Subject: [PATCH 11/15] removed not needed dipendency for persy feature --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c8d84d5..cc8f125 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,7 +88,7 @@ heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c7 [features] default = ["conduit_bin", "backend_sqlite"] backend_sled = ["sled"] -backend_persy = ["persy","num_cpus"] +backend_persy = ["persy"] backend_sqlite = ["sqlite"] backend_heed = ["heed", "crossbeam"] sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"] From 081db23438ef6059b16dd12b4e3b926a522f2125 Mon Sep 17 00:00:00 2001 From: Tglman Date: Fri, 30 Jul 2021 22:24:56 +0100 Subject: [PATCH 12/15] updated persy implementation for use tracing --- Cargo.lock | 11 +++++++++++ src/database/abstraction/persy.rs | 8 +++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9118d1a..43d9857 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,6 +246,7 @@ version = "0.1.0" dependencies = [ "base64 0.13.0", "bytes", + "chrono", "crossbeam", "directories", "heed", @@ -275,6 +276,7 @@ dependencies = [ "sled", "thiserror", "threadpool", + "timer", "tokio", "tracing", "tracing-flame", @@ -2934,6 +2936,15 @@ dependencies = [ "syn", ] +[[package]] +name = "timer" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d42176308937165701f50638db1c31586f183f1aab416268216577aec7306b" +dependencies = [ + "chrono", +] + [[package]] name = "tinyvec" version = "1.3.1" diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs index 5a7c181..ba1a1ab 100644 --- a/src/database/abstraction/persy.rs +++ b/src/database/abstraction/persy.rs @@ -17,6 +17,8 @@ use std::{ time::{Duration, Instant}, }; +use tracing::warn; + pub struct PersyEngine { persy: Persy, write_cache: Arc>, @@ -415,7 +417,7 @@ impl Tree for PersyTree { self.write_cache.read().unwrap().iter(&self.name, result) } Err(e) => { - log::warn!("error iterating {:?}", e); + warn!("error iterating {:?}", e); Box::new(std::iter::empty()) } } @@ -453,7 +455,7 @@ impl Tree for PersyTree { .iter_from(&self.name, from, backwards, result) } Err(e) => { - log::warn!("error iterating with prefix {:?}", e); + warn!("error iterating with prefix {:?}", e); Box::new(std::iter::empty()) } } @@ -493,7 +495,7 @@ impl Tree for PersyTree { .scan_prefix(&self.name, prefix, result) } Err(e) => { - log::warn!("error scanning prefix {:?}", e); + warn!("error scanning prefix {:?}", e); Box::new(std::iter::empty()) } } From 67f1a2172666bea853ebcc0438c31f5ec7062dd6 Mon Sep 17 00:00:00 2001 From: Tglman Date: Wed, 4 Aug 2021 23:10:50 +0100 Subject: [PATCH 13/15] updated to the last database integration logic of persy --- src/database/abstraction/persy.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs index ba1a1ab..d594811 100644 --- a/src/database/abstraction/persy.rs +++ b/src/database/abstraction/persy.rs @@ -396,6 +396,16 @@ impl Tree for PersyTree { Ok(()) } + #[tracing::instrument(skip(self, iter))] + fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { + //TODO: evaluate if use instead a single big transaction + for (key, value) in iter { + self.insert(&key, &value)?; + } + + Ok(()) + } + fn remove(&self, key: &[u8]) -> Result<()> { self.write_cache .write() @@ -404,7 +414,7 @@ impl Tree for PersyTree { Ok(()) } - fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { + fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { let iter = self.persy.range::(&self.name, ..); match iter { Ok(iter) => { @@ -427,7 +437,7 @@ impl Tree for PersyTree { &'a self, from: &[u8], backwards: bool, - ) -> Box, Vec)> + Send + 'a> { + ) -> Box, Vec)> + 'a> { let range = if backwards { self.persy .range::(&self.name, ..ByteVec(from.to_owned())) @@ -471,7 +481,7 @@ impl Tree for PersyTree { fn scan_prefix<'a>( &'a self, prefix: Vec, - ) -> Box, Vec)> + Send + 'a> { + ) -> Box, Vec)> + 'a> { let range_prefix = ByteVec(prefix.to_owned()); let range = self .persy From ef5359c44e0eee6e85dab788045df7b035ed12c8 Mon Sep 17 00:00:00 2001 From: Tglman Date: Sat, 21 Aug 2021 16:54:48 +0100 Subject: [PATCH 14/15] readd modules removed by mistake --- src/database/abstraction.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 4a98322..0dda363 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -3,6 +3,12 @@ use crate::Result; use std::{future::Future, pin::Pin, sync::Arc}; +#[cfg(feature = "sled")] +pub mod sled; + +#[cfg(feature = "sqlite")] +pub mod sqlite; + #[cfg(feature = "heed")] pub mod heed; From 37471a063fc2bca3ef42d6cc2eeaf91f85c3037b Mon Sep 17 00:00:00 2001 From: Tglman Date: Tue, 24 Aug 2021 19:29:48 +0100 Subject: [PATCH 15/15] updated to persy 1.0 --- Cargo.lock | 29 +++++++++++++------------ Cargo.toml | 2 +- src/database/abstraction/persy.rs | 36 ++++++++++++++++++------------- src/error.rs | 14 ++++++++---- 4 files changed, 47 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43d9857..1843139 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -170,12 +170,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "build_const" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4ae4235e6dac0694637c763029ecea1a2ec9e4e06ec2729bd21ba4d9c863eb7" - [[package]] name = "bumpalo" version = "3.7.0" @@ -342,13 +336,19 @@ dependencies = [ [[package]] name = "crc" -version = "1.8.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" +checksum = "10c2722795460108a7872e1cd933a85d6ec38abc4baecad51028f702da28889f" dependencies = [ - "build_const", + "crc-catalog", ] +[[package]] +name = "crc-catalog" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" + [[package]] name = "crc32fast" version = "1.2.1" @@ -1644,16 +1644,17 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "persy" -version = "0.11.13" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cc96c7b02a7abb1df713f326c4deb493ce2b72155935d3e6ed0ba24bd9e778d" +checksum = "de4cd9bda96e9bab3c961620ca512def7a7a880152780132632506abe4414458" dependencies = [ "byteorder", "crc", "data-encoding", "fs2", "linked-hash-map", - "rand 0.7.3", + "rand 0.8.4", + "thiserror", "unsigned-varint", "zigzag", ] @@ -3279,9 +3280,9 @@ checksum = "f14ee04d9415b52b3aeab06258a3f07093182b88ba0f9b8d203f211a7a7d41c7" [[package]] name = "unsigned-varint" -version = "0.5.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7fdeedbf205afadfe39ae559b75c3240f24e257d0ca27e85f85cb82aa19ac35" +checksum = "5f8d425fafb8cd76bc3f22aace4af471d3156301d7508f2107e98fbeae10bc7f" [[package]] name = "untrusted" diff --git a/Cargo.toml b/Cargo.toml index cc8f125..659a50a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ tokio = "1.8.2" # Used for storing data permanently sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } -persy = { version = "0.11", optional = true } +persy = { version = "1.0", optional = true } # Used by the persy write cache for background flush timer = "0.2" chrono = "0.4" diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs index d594811..9d3ca26 100644 --- a/src/database/abstraction/persy.rs +++ b/src/database/abstraction/persy.rs @@ -56,7 +56,7 @@ impl DatabaseEngine for PersyEngine { // Create if it doesn't exist if !self.persy.exists_index(name)? { let mut tx = self.persy.begin()?; - tx.create_index::(name, ValueMode::REPLACE)?; + tx.create_index::(name, ValueMode::Replace)?; tx.prepare()?.commit()?; } @@ -187,15 +187,15 @@ impl WriteCache { for (key, value) in changes { tx.put::( &index, - ByteVec(key.to_owned()), - ByteVec(value.to_owned()), + ByteVec::new(key.to_owned()), + ByteVec::new(value.to_owned()), )?; } } self.add_cache.clear(); for (index, changes) in &self.remove_cache { for key in changes { - tx.remove::(&index, ByteVec(key.to_owned()), None)?; + tx.remove::(&index, ByteVec::new(key.to_owned()), None)?; } } self.remove_cache.clear(); @@ -356,9 +356,9 @@ impl Tree for PersyTree { fn get(&self, key: &[u8]) -> Result>> { let result = self .persy - .get::(&self.name, &ByteVec(key.to_vec()))? - .map(|v| v.into_iter().map(|bv| bv.0).next()) - .flatten(); + .get::(&self.name, &ByteVec::new(key.to_vec()))? + .next() + .map(|v| (*v).to_owned()); let result = self .write_cache .read() @@ -396,7 +396,6 @@ impl Tree for PersyTree { Ok(()) } - #[tracing::instrument(skip(self, iter))] fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { //TODO: evaluate if use instead a single big transaction for (key, value) in iter { @@ -406,6 +405,13 @@ impl Tree for PersyTree { Ok(()) } + fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { + for key in iter { + self.increment(&key)?; + } + Ok(()) + } + fn remove(&self, key: &[u8]) -> Result<()> { self.write_cache .write() @@ -420,7 +426,7 @@ impl Tree for PersyTree { Ok(iter) => { let result = Box::new(iter.filter_map(|(k, v)| { v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) .next() })); @@ -440,16 +446,16 @@ impl Tree for PersyTree { ) -> Box, Vec)> + 'a> { let range = if backwards { self.persy - .range::(&self.name, ..ByteVec(from.to_owned())) + .range::(&self.name, ..ByteVec::new(from.to_owned())) } else { self.persy - .range::(&self.name, ByteVec(from.to_owned())..) + .range::(&self.name, ByteVec::new(from.to_owned())..) }; match range { Ok(iter) => { let map = iter.filter_map(|(k, v)| { v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) .next() }); let result: Box, Vec)> + Send + 'a> = if backwards @@ -482,7 +488,7 @@ impl Tree for PersyTree { &'a self, prefix: Vec, ) -> Box, Vec)> + 'a> { - let range_prefix = ByteVec(prefix.to_owned()); + let range_prefix = ByteVec::new(prefix.to_owned()); let range = self .persy .range::(&self.name, range_prefix..); @@ -491,10 +497,10 @@ impl Tree for PersyTree { Ok(iter) => { let owned_prefix = prefix.clone(); let result = Box::new( - iter.take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) + iter.take_while(move |(k, _)| (*k).starts_with(&owned_prefix)) .filter_map(|(k, v)| { v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) .next() }), ); diff --git a/src/error.rs b/src/error.rs index 63d3b21..ef135fe 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,3 +1,4 @@ +use persy::PersyError; use ruma::{ api::client::{ error::{Error as RumaError, ErrorKind}, @@ -38,10 +39,7 @@ pub enum Error { }, #[cfg(feature = "persy")] #[error("There was a problem with the connection to the persy database.")] - PersyError { - #[from] - source: persy::PersyError, - }, + PersyError { source: persy::PersyError }, #[cfg(feature = "heed")] #[error("There was a problem with the connection to the heed database: {error}")] HeedError { error: String }, @@ -142,3 +140,11 @@ where self.to_response().respond_to(r) } } + +impl> From> for Error { + fn from(err: persy::PE) -> Self { + Error::PersyError { + source: err.error().into(), + } + } +}