use super::Config; use crate::Result; use std::{future::Future, pin::Pin, sync::Arc}; #[cfg(any(feature = "persy"))] use std::{collections::BTreeMap, sync::RwLock}; #[cfg(feature = "persy")] use std::{ cmp::Ordering, collections::{btree_map::Entry, BTreeSet}, iter::Peekable, time::{Duration, Instant}, }; #[cfg(feature = "persy")] use persy::{ByteVec, OpenOptions, Persy, ValueMode}; #[cfg(feature = "persy")] use timer::Timer; #[cfg(feature = "sled")] pub mod sled; #[cfg(feature = "sqlite")] pub mod sqlite; #[cfg(feature = "heed")] pub mod heed; pub trait DatabaseEngine: Sized { fn open(config: &Config) -> Result>; fn open_tree(self: &Arc, name: &'static str) -> Result>; fn flush(self: &Arc) -> Result<()>; } pub trait Tree: Send + Sync { fn get(&self, key: &[u8]) -> Result>>; fn insert(&self, key: &[u8], value: &[u8]) -> Result<()>; fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()>; fn remove(&self, key: &[u8]) -> Result<()>; fn iter<'a>(&'a self) -> Box, Vec)> + 'a>; fn iter_from<'a>( &'a self, from: &[u8], backwards: bool, ) -> Box, Vec)> + 'a>; fn increment(&self, key: &[u8]) -> Result>; fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()>; fn scan_prefix<'a>( &'a self, prefix: Vec, ) -> Box, Vec)> + 'a>; fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>>; fn clear(&self) -> Result<()> { for (key, _) in self.iter() { self.remove(&key)?; } Ok(()) } } #[cfg(feature = "persy")] pub struct PersyEngine { persy: Persy, write_cache: Arc>, #[allow(unused)] timer: Timer, } #[cfg(feature = "persy")] impl DatabaseEngine for PersyEngine { fn open(config: &Config) -> Result> { let mut cfg = persy::Config::new(); cfg.change_cache_size(config.cache_capacity as u64); let persy = OpenOptions::new() .create(true) .config(cfg) .open(&format!("{}/db.persy", config.database_path))?; let write_cache_size = 1000; let write_cache_window_millisecs = 1000; let write_cache = Arc::new(RwLock::new(WriteCache::new( &persy, write_cache_size, Duration::from_millis(write_cache_window_millisecs), ))); let timer = Timer::new(); let timer_write_cache = write_cache.clone(); timer.schedule_repeating( chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), move || timer_write_cache.write().unwrap().flush_timed().unwrap(), ); Ok(Arc::new(PersyEngine { persy, write_cache, timer, })) } fn open_tree(self: &Arc, name: &'static str) -> Result> { // Create if it doesn't exist if !self.persy.exists_index(name)? { let mut tx = self.persy.begin()?; tx.create_index::(name, ValueMode::REPLACE)?; tx.prepare()?.commit()?; } Ok(Arc::new(PersyTree { persy: self.persy.clone(), name: name.to_owned(), watchers: RwLock::new(BTreeMap::new()), write_cache: self.write_cache.clone(), })) } } #[cfg(feature = "persy")] pub struct PersyTree { persy: Persy, name: String, watchers: RwLock, Vec>>>, write_cache: Arc>, } #[cfg(feature = "persy")] pub struct WriteCache { add_cache: BTreeMap, Vec>>, remove_cache: BTreeMap>>, changes_count: u32, last_flush: Instant, persy: Persy, max_size: u32, max_time_window: Duration, } #[cfg(feature = "persy")] impl WriteCache { fn new(persy: &Persy, max_size: u32, max_time_window: Duration) -> Self { Self { add_cache: Default::default(), remove_cache: Default::default(), changes_count: Default::default(), last_flush: Instant::now(), persy: persy.clone(), max_size, max_time_window, } } } #[cfg(feature = "persy")] impl WriteCache { pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { match self.add_cache.entry(index.clone()) { Entry::Vacant(s) => { let mut map = BTreeMap::new(); map.insert(key.to_owned(), value.to_owned()); s.insert(map); } Entry::Occupied(mut o) => { o.get_mut().insert(key.to_owned(), value.to_owned()); } } self.remove_remove(index, key)?; self.check_and_flush()?; Ok(()) } pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { match self.remove_cache.entry(index) { Entry::Vacant(_) => {} Entry::Occupied(mut o) => { o.get_mut().remove(key); } } Ok(()) } pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { match self.add_cache.entry(index) { Entry::Vacant(_) => {} Entry::Occupied(mut o) => { o.get_mut().remove(key); } } Ok(()) } pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { match self.remove_cache.entry(index.clone()) { Entry::Vacant(s) => { let mut map = BTreeSet::new(); map.insert(key.to_owned()); s.insert(map); } Entry::Occupied(mut o) => { o.get_mut().insert(key.to_owned()); } } self.remove_insert(index, key)?; self.check_and_flush()?; Ok(()) } pub fn check_and_flush(&mut self) -> Result<()> { self.changes_count += 1; if self.changes_count > self.max_size { self.flush_changes()?; self.changes_count = 0; } Ok(()) } pub fn get(&self, index: &str, key: &[u8], value: Option>) -> Result>> { Ok(if let Some(changes) = self.add_cache.get(index) { changes.get(key).map(|v| v.to_owned()).or(value) } else if let Some(remove) = self.remove_cache.get(index) { if remove.contains(key) { None } else { value } } else { value }) } fn flush_changes(&mut self) -> Result<()> { let mut tx = self.persy.begin()?; for (index, changes) in &self.add_cache { for (key, value) in changes { tx.put::( &index, ByteVec(key.to_owned()), ByteVec(value.to_owned()), )?; } } self.add_cache.clear(); for (index, changes) in &self.remove_cache { for key in changes { tx.remove::(&index, ByteVec(key.to_owned()), None)?; } } self.remove_cache.clear(); tx.prepare()?.commit()?; self.last_flush = Instant::now(); Ok(()) } pub fn iter<'a>( &self, index: &str, mut iter: Box, Box<[u8]>)> + Send + Sync + 'a>, ) -> Box, Box<[u8]>)> + Send + Sync + 'a> { if let Some(adds) = self.add_cache.get(index) { let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); iter = Box::new(UnionIter::new(iter, added, false)) } if let Some(removes) = self.remove_cache.get(index) { let to_filter = removes.clone(); iter = Box::new(iter.filter(move |x| to_filter.contains(&(*x.0).to_owned()))) } iter } fn iter_from<'a>( &self, index: &str, from: &[u8], backwards: bool, mut iter: Box, Box<[u8]>)> + 'a>, ) -> Box, Box<[u8]>)> + 'a> { if let Some(adds) = self.add_cache.get(index) { let range = if backwards { adds.range(..from.to_owned()) } else { adds.range(from.to_owned()..) }; let added = range .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) .collect::, Box<[u8]>)>>(); let add_iter: Box, Box<[u8]>)>> = if backwards { Box::new(added.into_iter().rev()) } else { Box::new(added.into_iter()) }; iter = Box::new(UnionIter::new(iter, add_iter, backwards)) } if let Some(removes) = self.remove_cache.get(index) { let owned_from = from.to_owned(); let to_filter = removes.iter(); let to_filter = if backwards { to_filter .filter(|x| (..&owned_from).contains(x)) .cloned() .collect::>>() } else { to_filter .filter(|x| (&owned_from..).contains(x)) .cloned() .collect::>>() }; iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) } iter } fn scan_prefix<'a>( &self, index: &str, prefix: Vec, mut iter: Box, Box<[u8]>)> + Send + 'a>, ) -> Box, Box<[u8]>)> + Send + 'a> { if let Some(adds) = self.add_cache.get(index) { let owned_prefix = prefix.to_owned(); let added = adds .range(prefix.to_owned()..) .take_while(move |(k, _)| k.starts_with(&owned_prefix)) .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) .collect::, Box<[u8]>)>>(); iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) } if let Some(removes) = self.remove_cache.get(index) { let to_filter = removes .iter() .filter(move |k| k.starts_with(&prefix)) .cloned() .collect::>>(); iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) } iter } #[allow(unused)] pub fn flush_timed(&mut self) -> Result<()> { if self.changes_count > 0 { if Instant::now() - self.last_flush > self.max_time_window { self.flush_changes()?; } } Ok(()) } } #[cfg(feature = "persy")] impl Drop for WriteCache { fn drop(&mut self) { if self.changes_count > 0 { self.flush_changes().unwrap(); } } } #[cfg(feature = "persy")] struct UnionIter, T1: Iterator, I> { first: Peekable, second: Peekable, backwards: bool, } #[cfg(feature = "persy")] impl, T1: Iterator, I> UnionIter { fn new(first: T, second: T1, backwards: bool) -> Self { UnionIter { first: first.peekable(), second: second.peekable(), backwards, } } } #[cfg(feature = "persy")] impl Iterator for UnionIter where T: Iterator, T1: Iterator, { type Item = (K, V); fn next(&mut self) -> Option { if let (Some(f), Some(s)) = (self.first.peek(), self.second.peek()) { if self.backwards { match f.0.cmp(&s.0) { Ordering::Less => self.second.next(), Ordering::Greater => self.first.next(), Ordering::Equal => { self.first.next(); self.second.next() } } } else { match f.0.cmp(&s.0) { Ordering::Less => self.first.next(), Ordering::Greater => self.second.next(), Ordering::Equal => { self.first.next(); self.second.next() } } } } else { self.first.next().or_else(|| self.second.next()) } } } #[cfg(feature = "persy")] impl Tree for PersyTree { fn get(&self, key: &[u8]) -> Result>> { let result = self .persy .get::(&self.name, &ByteVec(key.to_vec()))? .map(|v| v.into_iter().map(|bv| bv.0).next()) .flatten(); let result = self .write_cache .read() .unwrap() .get(&self.name, key, result)?; Ok(result) } fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { let watchers = self.watchers.read().unwrap(); let mut triggered = Vec::new(); for length in 0..=key.len() { if watchers.contains_key(&key[..length]) { triggered.push(&key[..length]); } } drop(watchers); if !triggered.is_empty() { let mut watchers = self.watchers.write().unwrap(); for prefix in triggered { if let Some(txs) = watchers.remove(prefix) { for tx in txs { let _ = tx.send(()); } } } } self.write_cache .write() .unwrap() .insert(self.name.clone(), key, value)?; Ok(()) } fn remove(&self, key: &[u8]) -> Result<()> { self.write_cache .write() .unwrap() .remove(self.name.clone(), key)?; Ok(()) } fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { let result = Box::new( self.persy .range::(&self.name, ..) .unwrap() .filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }), ); self.write_cache.read().unwrap().iter(&self.name, result) } fn iter_from<'a>( &'a self, from: &[u8], backwards: bool, ) -> Box, Box<[u8]>)> + 'a> { let iter = if backwards { self.persy .range::(&self.name, ..ByteVec(from.to_owned())) .unwrap() } else { self.persy .range::(&self.name, ByteVec(from.to_owned())..) .unwrap() }; let map = iter.filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }); let result: Box, Box<[u8]>)>> = if backwards { Box::new(map.rev()) } else { Box::new(map) }; self.write_cache .read() .unwrap() .iter_from(&self.name, from, backwards, result) } fn increment(&self, key: &[u8]) -> Result> { let old = self.get(key)?; let new = utils::increment(old.as_deref()).unwrap(); self.insert(key, &new)?; Ok(new) } fn scan_prefix<'a>( &'a self, prefix: Vec, ) -> Box, Box<[u8]>)> + Send + 'a> { let range_prefix = ByteVec(prefix.to_owned()); let owned_prefix = prefix.clone(); let result = Box::new( self.persy .range::(&self.name, range_prefix..) .unwrap() .take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) .filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }), ); self.write_cache .read() .unwrap() .scan_prefix(&self.name, prefix, result) } fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { let (tx, rx) = tokio::sync::oneshot::channel(); self.watchers .write() .unwrap() .entry(prefix.to_vec()) .or_default() .push(tx); Box::pin(async move { // Tx is never destroyed rx.await.unwrap(); }) } }