diff --git a/Cargo.toml b/Cargo.toml index 35ad473..126424e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,9 +28,10 @@ tokio = "1.11.0" # Used for storing data permanently sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } - -# Used for storing data permanently persy = { version = "0.11", optional = true } +# Used by the persy write cache for background flush +timer = "0.2" +chrono = "0.4" # Used for the http request / response body type for Ruma endpoints used with reqwest bytes = "1.1.0" diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 88f71d9..91bcff9 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -9,11 +9,17 @@ use std::{collections::BTreeMap, sync::RwLock}; #[cfg(feature = "persy")] use std::{ cmp::Ordering, - collections::BTreeSet, + collections::{btree_map::Entry, BTreeSet}, iter::Peekable, time::{Duration, Instant}, }; +#[cfg(feature = "persy")] +use persy::{ByteVec, OpenOptions, Persy, ValueMode}; + +#[cfg(feature = "persy")] +use timer::Timer; + #[cfg(feature = "sled")] pub mod sled; @@ -64,62 +70,64 @@ pub trait Tree: Send + Sync { } } -// This is a functional integration at the state of the art of the current -// implementations, it should work and provide the base for stability and performance -// testing, Persy should be pretty resilient to crash and pretty lightweight in memory usage -// the speed in single thread will be pretty low because each transaction commit will wait for data -// to be flushed on disk, multi-thread should guarantee better performances even though I expect -// speed of a few thousand transactions per second. -// -// The current design of the engine right now do not allow to do transactions with multiple keys -// that would allow to reduce the latency quite a lot, anyway support transaction in the engine -// require a massive refactor. - #[cfg(feature = "persy")] -pub struct PersyEngine(persy::Persy, Arc>); +pub struct PersyEngine { + persy: Persy, + write_cache: Arc>, + #[allow(unused)] + timer: Timer, +} #[cfg(feature = "persy")] impl DatabaseEngine for PersyEngine { fn open(config: &Config) -> Result> { - let cfg = persy::Config::new(); - // This is for tweak the in memory cache size - //config.change_cache_size(32 * 1024 * 1024 /*32Mb*/) + let mut cfg = persy::Config::new(); + cfg.change_cache_size(config.cache_capacity as u64); - let persy = persy::OpenOptions::new() + let persy = OpenOptions::new() .create(true) .config(cfg) .open(&format!("{}/db.persy", config.database_path))?; - - let write_cache = Arc::new(RwLock::new(WriteCache { - add_cache: Default::default(), - remove_cache: Default::default(), - changes_count: Default::default(), - last_flush: Instant::now(), - db: persy.clone(), - })); - Ok(Arc::new(PersyEngine(persy, write_cache))) + let write_cache_size = 1000; + let write_cache_window_millisecs = 1000; + let write_cache = Arc::new(RwLock::new(WriteCache::new( + &persy, + write_cache_size, + Duration::from_millis(write_cache_window_millisecs), + ))); + let timer = Timer::new(); + let timer_write_cache = write_cache.clone(); + timer.schedule_repeating( + chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), + move || timer_write_cache.write().unwrap().flush_timed().unwrap(), + ); + Ok(Arc::new(PersyEngine { + persy, + write_cache, + timer, + })) } fn open_tree(self: &Arc, name: &'static str) -> Result> { // Create if it doesn't exist - if !self.0.exists_index(name)? { - let mut tx = self.0.begin()?; - tx.create_index::(name, persy::ValueMode::REPLACE)?; + if !self.persy.exists_index(name)? { + let mut tx = self.persy.begin()?; + tx.create_index::(name, ValueMode::REPLACE)?; tx.prepare()?.commit()?; } Ok(Arc::new(PersyTree { - db: self.0.clone(), + persy: self.persy.clone(), name: name.to_owned(), watchers: RwLock::new(BTreeMap::new()), - write_cache: self.1.clone(), + write_cache: self.write_cache.clone(), })) } } #[cfg(feature = "persy")] pub struct PersyTree { - db: persy::Persy, + persy: Persy, name: String, watchers: RwLock, Vec>>>, write_cache: Arc>, @@ -129,15 +137,31 @@ pub struct PersyTree { pub struct WriteCache { add_cache: BTreeMap, Vec>>, remove_cache: BTreeMap>>, - changes_count: i32, + changes_count: u32, last_flush: Instant, - db: persy::Persy, + persy: Persy, + max_size: u32, + max_time_window: Duration, +} + +#[cfg(feature = "persy")] +impl WriteCache { + fn new(persy: &Persy, max_size: u32, max_time_window: Duration) -> Self { + Self { + add_cache: Default::default(), + remove_cache: Default::default(), + changes_count: Default::default(), + last_flush: Instant::now(), + persy: persy.clone(), + max_size, + max_time_window, + } + } } #[cfg(feature = "persy")] impl WriteCache { pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { - use std::collections::btree_map::Entry; match self.add_cache.entry(index.clone()) { Entry::Vacant(s) => { let mut map = BTreeMap::new(); @@ -154,7 +178,6 @@ impl WriteCache { } pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { - use std::collections::btree_map::Entry; match self.remove_cache.entry(index) { Entry::Vacant(_) => {} Entry::Occupied(mut o) => { @@ -165,7 +188,6 @@ impl WriteCache { } pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { - use std::collections::btree_map::Entry; match self.add_cache.entry(index) { Entry::Vacant(_) => {} Entry::Occupied(mut o) => { @@ -176,7 +198,6 @@ impl WriteCache { } pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { - use std::collections::btree_map::Entry; match self.remove_cache.entry(index.clone()) { Entry::Vacant(s) => { let mut map = BTreeSet::new(); @@ -193,7 +214,7 @@ impl WriteCache { } pub fn check_and_flush(&mut self) -> Result<()> { self.changes_count += 1; - if self.changes_count > 1000 { + if self.changes_count > self.max_size { self.flush_changes()?; self.changes_count = 0; } @@ -215,8 +236,7 @@ impl WriteCache { } fn flush_changes(&mut self) -> Result<()> { - use persy::ByteVec; - let mut tx = self.db.begin()?; + let mut tx = self.persy.begin()?; for (index, changes) in &self.add_cache { for (key, value) in changes { @@ -330,7 +350,7 @@ impl WriteCache { #[allow(unused)] pub fn flush_timed(&mut self) -> Result<()> { if self.changes_count > 0 { - if Instant::now() - self.last_flush > Duration::from_secs(2) { + if Instant::now() - self.last_flush > self.max_time_window { self.flush_changes()?; } } @@ -402,9 +422,8 @@ where #[cfg(feature = "persy")] impl Tree for PersyTree { fn get(&self, key: &[u8]) -> Result>> { - use persy::ByteVec; let result = self - .db + .persy .get::(&self.name, &ByteVec(key.to_vec()))? .map(|v| v.into_iter().map(|bv| bv.0).next()) .flatten(); @@ -454,9 +473,8 @@ impl Tree for PersyTree { } fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { - use persy::ByteVec; let result = Box::new( - self.db + self.persy .range::(&self.name, ..) .unwrap() .filter_map(|(k, v)| { @@ -474,13 +492,12 @@ impl Tree for PersyTree { from: &[u8], backwards: bool, ) -> Box, Box<[u8]>)> + 'a> { - use persy::ByteVec; let iter = if backwards { - self.db + self.persy .range::(&self.name, ..ByteVec(from.to_owned())) .unwrap() } else { - self.db + self.persy .range::(&self.name, ByteVec(from.to_owned())..) .unwrap() }; @@ -512,12 +529,10 @@ impl Tree for PersyTree { &'a self, prefix: Vec, ) -> Box, Box<[u8]>)> + Send + 'a> { - use persy::ByteVec; - let range_prefix = ByteVec(prefix.to_owned()); let owned_prefix = prefix.clone(); let result = Box::new( - self.db + self.persy .range::(&self.name, range_prefix..) .unwrap() .take_while(move |(k, _)| k.0.starts_with(&owned_prefix))