use crate::{ database::{ abstraction::{DatabaseEngine, Tree}, Config, }, Result, }; use persy::{ByteVec, OpenOptions, Persy, ValueMode}; use std::{ cmp::Ordering, collections::{btree_map::Entry, BTreeMap, BTreeSet}, future::Future, iter::Peekable, pin::Pin, sync::{Arc, RwLock}, time::{Duration, Instant}, }; pub struct PersyEngine { persy: Persy, write_cache: Arc>, } impl DatabaseEngine for PersyEngine { fn open(config: &Config) -> Result> { let mut cfg = persy::Config::new(); cfg.change_cache_size((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64); let persy = OpenOptions::new() .create(true) .config(cfg) .open(&format!("{}/db.persy", config.database_path))?; let write_cache_size = 1000; let write_cache_window_millisecs = 1000; let write_cache = Arc::new(RwLock::new(WriteCache::new( &persy, write_cache_size, Duration::from_millis(write_cache_window_millisecs), ))); /* use timer::Timer; let timer = Timer::new(); let timer_write_cache = write_cache.clone(); timer.schedule_repeating( chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), move || timer_write_cache.write().unwrap().flush_timed().unwrap(), ); */ Ok(Arc::new(PersyEngine { persy, write_cache })) } fn open_tree(self: &Arc, name: &'static str) -> Result> { // Create if it doesn't exist if !self.persy.exists_index(name)? { let mut tx = self.persy.begin()?; tx.create_index::(name, ValueMode::REPLACE)?; tx.prepare()?.commit()?; } Ok(Arc::new(PersyTree { persy: self.persy.clone(), name: name.to_owned(), watchers: RwLock::new(BTreeMap::new()), write_cache: self.write_cache.clone(), })) } fn flush(self: &Arc) -> Result<()> { self.write_cache.write().unwrap().flush_changes()?; Ok(()) } } pub struct PersyTree { persy: Persy, name: String, watchers: RwLock, Vec>>>, write_cache: Arc>, } pub struct WriteCache { add_cache: BTreeMap, Vec>>, remove_cache: BTreeMap>>, changes_count: u32, last_flush: Instant, persy: Persy, max_size: u32, max_time_window: Duration, } impl WriteCache { fn new(persy: &Persy, max_size: u32, max_time_window: Duration) -> Self { Self { add_cache: Default::default(), remove_cache: Default::default(), changes_count: Default::default(), last_flush: Instant::now(), persy: persy.clone(), max_size, max_time_window, } } } impl WriteCache { pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { match self.add_cache.entry(index.clone()) { Entry::Vacant(s) => { let mut map = BTreeMap::new(); map.insert(key.to_owned(), value.to_owned()); s.insert(map); } Entry::Occupied(mut o) => { o.get_mut().insert(key.to_owned(), value.to_owned()); } } self.remove_remove(index, key)?; self.check_and_flush()?; Ok(()) } pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { match self.remove_cache.entry(index) { Entry::Vacant(_) => {} Entry::Occupied(mut o) => { o.get_mut().remove(key); } } Ok(()) } pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { match self.add_cache.entry(index) { Entry::Vacant(_) => {} Entry::Occupied(mut o) => { o.get_mut().remove(key); } } Ok(()) } pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { match self.remove_cache.entry(index.clone()) { Entry::Vacant(s) => { let mut map = BTreeSet::new(); map.insert(key.to_owned()); s.insert(map); } Entry::Occupied(mut o) => { o.get_mut().insert(key.to_owned()); } } self.remove_insert(index, key)?; self.check_and_flush()?; Ok(()) } pub fn check_and_flush(&mut self) -> Result<()> { self.changes_count += 1; if self.changes_count > self.max_size { self.flush_changes()?; self.changes_count = 0; } Ok(()) } pub fn get(&self, index: &str, key: &[u8], value: Option>) -> Result>> { Ok(if let Some(changes) = self.add_cache.get(index) { changes.get(key).map(|v| v.to_owned()).or(value) } else if let Some(remove) = self.remove_cache.get(index) { if remove.contains(key) { None } else { value } } else { value }) } fn flush_changes(&mut self) -> Result<()> { let mut tx = self.persy.begin()?; for (index, changes) in &self.add_cache { for (key, value) in changes { tx.put::( &index, ByteVec(key.to_owned()), ByteVec(value.to_owned()), )?; } } self.add_cache.clear(); for (index, changes) in &self.remove_cache { for key in changes { tx.remove::(&index, ByteVec(key.to_owned()), None)?; } } self.remove_cache.clear(); tx.prepare()?.commit()?; self.last_flush = Instant::now(); Ok(()) } pub fn iter<'a>( &self, index: &str, mut iter: Box, Vec)> + Send + Sync + 'a>, ) -> Box, Vec)> + Send + Sync + 'a> { if let Some(adds) = self.add_cache.get(index) { let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); iter = Box::new(UnionIter::new(iter, added, false)) } if let Some(removes) = self.remove_cache.get(index) { let to_filter = removes.clone(); iter = Box::new(iter.filter(move |x| to_filter.contains(&(*x.0).to_owned()))) } iter } fn iter_from<'a>( &self, index: &str, from: &[u8], backwards: bool, mut iter: Box, Vec)> + Send + 'a>, ) -> Box, Vec)> + Send + 'a> { if let Some(adds) = self.add_cache.get(index) { let range = if backwards { adds.range(..from.to_owned()) } else { adds.range(from.to_owned()..) }; let added = range .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) .collect::, Vec)>>(); let add_iter: Box, Vec)> + Send> = if backwards { Box::new(added.into_iter().rev()) } else { Box::new(added.into_iter()) }; iter = Box::new(UnionIter::new(iter, add_iter, backwards)) } if let Some(removes) = self.remove_cache.get(index) { let owned_from = from.to_owned(); let to_filter = removes.iter(); let to_filter = if backwards { to_filter .filter(|x| (..&owned_from).contains(x)) .cloned() .collect::>>() } else { to_filter .filter(|x| (&owned_from..).contains(x)) .cloned() .collect::>>() }; iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) } iter } fn scan_prefix<'a>( &self, index: &str, prefix: Vec, mut iter: Box, Vec)> + Send + 'a>, ) -> Box, Vec)> + Send + 'a> { if let Some(adds) = self.add_cache.get(index) { let owned_prefix = prefix.to_owned(); let added = adds .range(prefix.to_owned()..) .take_while(move |(k, _)| k.starts_with(&owned_prefix)) .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) .collect::, Vec)>>(); iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) } if let Some(removes) = self.remove_cache.get(index) { let to_filter = removes .iter() .filter(move |k| k.starts_with(&prefix)) .cloned() .collect::>>(); iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) } iter } #[allow(unused)] pub fn flush_timed(&mut self) -> Result<()> { if self.changes_count > 0 { if Instant::now() - self.last_flush > self.max_time_window { self.flush_changes()?; } } Ok(()) } } struct UnionIter, T1: Iterator, I> { first: Peekable, second: Peekable, backwards: bool, } impl, T1: Iterator, I> UnionIter { fn new(first: T, second: T1, backwards: bool) -> Self { UnionIter { first: first.peekable(), second: second.peekable(), backwards, } } } impl Iterator for UnionIter where T: Iterator, T1: Iterator, { type Item = (K, V); fn next(&mut self) -> Option { if let (Some(f), Some(s)) = (self.first.peek(), self.second.peek()) { if self.backwards { match f.0.cmp(&s.0) { Ordering::Less => self.second.next(), Ordering::Greater => self.first.next(), Ordering::Equal => { self.first.next(); self.second.next() } } } else { match f.0.cmp(&s.0) { Ordering::Less => self.first.next(), Ordering::Greater => self.second.next(), Ordering::Equal => { self.first.next(); self.second.next() } } } } else { self.first.next().or_else(|| self.second.next()) } } } impl Tree for PersyTree { fn get(&self, key: &[u8]) -> Result>> { let result = self .persy .get::(&self.name, &ByteVec(key.to_vec()))? .map(|v| v.into_iter().map(|bv| bv.0).next()) .flatten(); let result = self .write_cache .read() .unwrap() .get(&self.name, key, result)?; Ok(result) } fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { let watchers = self.watchers.read().unwrap(); let mut triggered = Vec::new(); for length in 0..=key.len() { if watchers.contains_key(&key[..length]) { triggered.push(&key[..length]); } } drop(watchers); if !triggered.is_empty() { let mut watchers = self.watchers.write().unwrap(); for prefix in triggered { if let Some(txs) = watchers.remove(prefix) { for tx in txs { let _ = tx.send(()); } } } } self.write_cache .write() .unwrap() .insert(self.name.clone(), key, value)?; Ok(()) } fn remove(&self, key: &[u8]) -> Result<()> { self.write_cache .write() .unwrap() .remove(self.name.clone(), key)?; Ok(()) } fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { let iter = self.persy.range::(&self.name, ..); match iter { Ok(iter) => { let result = Box::new(iter.filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() })); self.write_cache.read().unwrap().iter(&self.name, result) } Err(e) => { log::warn!("error iterating {:?}", e); Box::new(std::iter::empty()) } } } fn iter_from<'a>( &'a self, from: &[u8], backwards: bool, ) -> Box, Vec)> + Send + 'a> { let range = if backwards { self.persy .range::(&self.name, ..ByteVec(from.to_owned())) } else { self.persy .range::(&self.name, ByteVec(from.to_owned())..) }; match range { Ok(iter) => { let map = iter.filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }); let result: Box, Vec)> + Send + 'a> = if backwards { Box::new(map.rev()) } else { Box::new(map) }; self.write_cache .read() .unwrap() .iter_from(&self.name, from, backwards, result) } Err(e) => { log::warn!("error iterating with prefix {:?}", e); Box::new(std::iter::empty()) } } } fn increment(&self, key: &[u8]) -> Result> { let old = self.get(key)?; let new = crate::utils::increment(old.as_deref()).unwrap(); self.insert(key, &new)?; Ok(new) } fn scan_prefix<'a>( &'a self, prefix: Vec, ) -> Box, Vec)> + Send + 'a> { let range_prefix = ByteVec(prefix.to_owned()); let range = self .persy .range::(&self.name, range_prefix..); match range { Ok(iter) => { let owned_prefix = prefix.clone(); let result = Box::new( iter.take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) .filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }), ); self.write_cache .read() .unwrap() .scan_prefix(&self.name, prefix, result) } Err(e) => { log::warn!("error scanning prefix {:?}", e); Box::new(std::iter::empty()) } } } fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { let (tx, rx) = tokio::sync::oneshot::channel(); self.watchers .write() .unwrap() .entry(prefix.to_vec()) .or_default() .push(tx); Box::pin(async move { // Tx is never destroyed rx.await.unwrap(); }) } }