From b73650f4c7067c490b15ab2f1403a3821732629d Mon Sep 17 00:00:00 2001 From: Tglman Date: Wed, 23 Jun 2021 20:51:27 +0100 Subject: [PATCH] add buffered write cache to reduce write latancy --- src/database/abstraction.rs | 355 +++++++++++++++++++++++++++++++----- 1 file changed, 314 insertions(+), 41 deletions(-) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index de63301..9ed7dc9 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -6,6 +6,9 @@ use std::{future::Future, pin::Pin, sync::Arc}; #[cfg(any(feature = "persy"))] use std::{collections::BTreeMap, sync::RwLock}; +#[cfg(feature = "persy")] +use std::{cmp::Ordering, collections::BTreeSet, iter::Peekable}; + #[cfg(feature = "sled")] pub mod sled; @@ -68,7 +71,7 @@ pub trait Tree: Send + Sync { // require a massive refactor. #[cfg(feature = "persy")] -pub struct PersyEngine(persy::Persy); +pub struct PersyEngine(persy::Persy, Arc>); #[cfg(feature = "persy")] impl DatabaseEngine for PersyEngine { @@ -81,7 +84,14 @@ impl DatabaseEngine for PersyEngine { .create(true) .config(cfg) .open(&format!("{}/db.persy", config.database_path))?; - Ok(Arc::new(PersyEngine(persy))) + + let write_cache = Arc::new(RwLock::new(WriteCache { + add_cache: Default::default(), + remove_cache: Default::default(), + changes_count: Default::default(), + db: persy.clone(), + })); + Ok(Arc::new(PersyEngine(persy, write_cache))) } fn open_tree(self: &Arc, name: &'static str) -> Result> { @@ -96,6 +106,7 @@ impl DatabaseEngine for PersyEngine { db: self.0.clone(), name: name.to_owned(), watchers: RwLock::new(BTreeMap::new()), + write_cache: self.1.clone(), })) } } @@ -105,16 +116,277 @@ pub struct PersyTree { db: persy::Persy, name: String, watchers: RwLock, Vec>>>, + write_cache: Arc>, +} + +#[cfg(feature = "persy")] +pub struct WriteCache { + add_cache: BTreeMap, Vec>>, + remove_cache: BTreeMap>>, + changes_count: i32, + db: persy::Persy, +} + +#[cfg(feature = "persy")] +impl WriteCache { + pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { + use std::collections::btree_map::Entry; + match self.add_cache.entry(index.clone()) { + Entry::Vacant(s) => { + let mut map = BTreeMap::new(); + map.insert(key.to_owned(), value.to_owned()); + s.insert(map); + } + Entry::Occupied(mut o) => { + o.get_mut().insert(key.to_owned(), value.to_owned()); + } + } + self.remove_remove(index, key)?; + self.check_and_flush()?; + Ok(()) + } + + pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { + use std::collections::btree_map::Entry; + match self.remove_cache.entry(index) { + Entry::Vacant(_) => {} + Entry::Occupied(mut o) => { + o.get_mut().remove(key); + } + } + Ok(()) + } + + pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { + use std::collections::btree_map::Entry; + match self.add_cache.entry(index) { + Entry::Vacant(_) => {} + Entry::Occupied(mut o) => { + o.get_mut().remove(key); + } + } + Ok(()) + } + + pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { + use std::collections::btree_map::Entry; + match self.remove_cache.entry(index.clone()) { + Entry::Vacant(s) => { + let mut map = BTreeSet::new(); + map.insert(key.to_owned()); + s.insert(map); + } + Entry::Occupied(mut o) => { + o.get_mut().insert(key.to_owned()); + } + } + self.remove_insert(index, key)?; + self.check_and_flush()?; + Ok(()) + } + pub fn check_and_flush(&mut self) -> Result<()> { + self.changes_count += 1; + if self.changes_count > 1000 { + self.flush_changes()?; + self.changes_count = 0; + } + Ok(()) + } + + pub fn get(&self, index: &str, key: &[u8], value: Option>) -> Result>> { + Ok(if let Some(changes) = self.add_cache.get(index) { + changes.get(key).map(|v| v.to_owned()).or(value) + } else if let Some(remove) = self.remove_cache.get(index) { + if remove.contains(key) { + None + } else { + value + } + } else { + value + }) + } + + fn flush_changes(&mut self) -> Result<()> { + use persy::ByteVec; + let mut tx = self.db.begin()?; + + for (index, changes) in &self.add_cache { + for (key, value) in changes { + tx.put::( + &index, + ByteVec(key.to_owned()), + ByteVec(value.to_owned()), + )?; + } + } + self.add_cache.clear(); + for (index, changes) in &self.remove_cache { + for key in changes { + tx.remove::(&index, ByteVec(key.to_owned()), None)?; + } + } + self.remove_cache.clear(); + tx.prepare()?.commit()?; + Ok(()) + } + + pub fn iter<'a>( + &self, + index: &str, + mut iter: Box, Box<[u8]>)> + Send + Sync + 'a>, + ) -> Box, Box<[u8]>)> + Send + Sync + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); + iter = Box::new(UnionIter::new(iter, added, false)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let to_filter = removes.clone(); + iter = Box::new(iter.filter(move |x| to_filter.contains(&(*x.0).to_owned()))) + } + + iter + } + + fn iter_from<'a>( + &self, + index: &str, + from: &[u8], + backwards: bool, + mut iter: Box, Box<[u8]>)> + 'a>, + ) -> Box, Box<[u8]>)> + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let range = if backwards { + adds.range(..from.to_owned()) + } else { + adds.range(from.to_owned()..) + }; + let added = range + .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) + .collect::, Box<[u8]>)>>(); + let add_iter: Box, Box<[u8]>)>> = if backwards { + Box::new(added.into_iter().rev()) + } else { + Box::new(added.into_iter()) + }; + iter = Box::new(UnionIter::new(iter, add_iter, backwards)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let owned_from = from.to_owned(); + let to_filter = removes.iter(); + let to_filter = if backwards { + to_filter + .filter(|x| (..&owned_from).contains(x)) + .cloned() + .collect::>>() + } else { + to_filter + .filter(|x| (&owned_from..).contains(x)) + .cloned() + .collect::>>() + }; + iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) + } + iter + } + + fn scan_prefix<'a>( + &self, + index: &str, + prefix: Vec, + mut iter: Box, Box<[u8]>)> + Send + 'a>, + ) -> Box, Box<[u8]>)> + Send + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let owned_prefix = prefix.to_owned(); + let added = adds + .range(prefix.to_owned()..) + .take_while(move |(k, _)| k.starts_with(&owned_prefix)) + .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) + .collect::, Box<[u8]>)>>(); + iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let to_filter = removes + .iter() + .filter(move |k| k.starts_with(&prefix)) + .cloned() + .collect::>>(); + iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) + } + iter + } +} + +#[cfg(feature = "persy")] +struct UnionIter, T1: Iterator, I> { + first: Peekable, + second: Peekable, + backwards: bool, +} + +#[cfg(feature = "persy")] +impl, T1: Iterator, I> UnionIter { + fn new(first: T, second: T1, backwards: bool) -> Self { + UnionIter { + first: first.peekable(), + second: second.peekable(), + backwards, + } + } +} + +#[cfg(feature = "persy")] +impl Iterator for UnionIter +where + T: Iterator, + T1: Iterator, +{ + type Item = (K, V); + fn next(&mut self) -> Option { + if let (Some(f), Some(s)) = (self.first.peek(), self.second.peek()) { + if self.backwards { + match f.0.cmp(&s.0) { + Ordering::Less => self.second.next(), + Ordering::Greater => self.first.next(), + Ordering::Equal => { + self.first.next(); + self.second.next() + } + } + } else { + match f.0.cmp(&s.0) { + Ordering::Less => self.first.next(), + Ordering::Greater => self.second.next(), + Ordering::Equal => { + self.first.next(); + self.second.next() + } + } + } + } else { + self.first.next().or_else(|| self.second.next()) + } + } } #[cfg(feature = "persy")] impl Tree for PersyTree { fn get(&self, key: &[u8]) -> Result>> { - Ok(self + use persy::ByteVec; + let result = self .db - .get::(&self.name, &persy::ByteVec(key.to_vec()))? + .get::(&self.name, &ByteVec(key.to_vec()))? .map(|v| v.into_iter().map(|bv| bv.0).next()) - .flatten()) + .flatten(); + let result = self + .write_cache + .read() + .unwrap() + .get(&self.name, key, result)?; + Ok(result) } fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { @@ -139,39 +411,35 @@ impl Tree for PersyTree { } } } - - let mut tx = self.db.begin()?; - tx.put::( - &self.name, - persy::ByteVec(key.to_owned()), - persy::ByteVec(value.to_owned()), - )?; - tx.prepare()?.commit()?; + self.write_cache + .write() + .unwrap() + .insert(self.name.clone(), key, value)?; Ok(()) } fn remove(&self, key: &[u8]) -> Result<()> { - let mut tx = self.db.begin()?; - tx.remove::( - &self.name, - persy::ByteVec(key.to_owned()), - None, - )?; - tx.prepare()?.commit()?; + self.write_cache + .write() + .unwrap() + .remove(self.name.clone(), key)?; Ok(()) } fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { - Box::new( + use persy::ByteVec; + let result = Box::new( self.db - .range::(&self.name, ..) + .range::(&self.name, ..) .unwrap() .filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }), - ) + ); + + self.write_cache.read().unwrap().iter(&self.name, result) } fn iter_from<'a>( @@ -179,19 +447,14 @@ impl Tree for PersyTree { from: &[u8], backwards: bool, ) -> Box, Box<[u8]>)> + 'a> { + use persy::ByteVec; let iter = if backwards { self.db - .range::( - &self.name, - ..persy::ByteVec(from.to_owned()), - ) + .range::(&self.name, ..ByteVec(from.to_owned())) .unwrap() } else { self.db - .range::( - &self.name, - persy::ByteVec(from.to_owned()).., - ) + .range::(&self.name, ByteVec(from.to_owned())..) .unwrap() }; let map = iter.filter_map(|(k, v)| { @@ -199,11 +462,16 @@ impl Tree for PersyTree { .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }); - if backwards { + let result: Box, Box<[u8]>)>> = if backwards { Box::new(map.rev()) } else { Box::new(map) - } + }; + + self.write_cache + .read() + .unwrap() + .iter_from(&self.name, from, backwards, result) } fn increment(&self, key: &[u8]) -> Result> { @@ -217,21 +485,26 @@ impl Tree for PersyTree { &'a self, prefix: Vec, ) -> Box, Box<[u8]>)> + Send + 'a> { - let range_prefix = persy::ByteVec(prefix.to_owned()); - Box::new( + use persy::ByteVec; + + let range_prefix = ByteVec(prefix.to_owned()); + let owned_prefix = prefix.clone(); + let result = Box::new( self.db - .range::( - &self.name, - range_prefix.., - ) + .range::(&self.name, range_prefix..) .unwrap() - .take_while(move |(k, _)| k.0.starts_with(&prefix)) + .take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) .filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }), - ) + ); + + self.write_cache + .read() + .unwrap() + .scan_prefix(&self.name, prefix, result) } fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> {