use super::Config; use crate::Result; use std::{future::Future, pin::Pin, sync::Arc}; #[cfg(any(feature = "persy"))] use std::{collections::BTreeMap, sync::RwLock}; #[cfg(feature = "sled")] pub mod sled; #[cfg(feature = "sqlite")] pub mod sqlite; #[cfg(feature = "heed")] pub mod heed; pub trait DatabaseEngine: Sized { fn open(config: &Config) -> Result>; fn open_tree(self: &Arc, name: &'static str) -> Result>; fn flush(self: &Arc) -> Result<()>; } pub trait Tree: Send + Sync { fn get(&self, key: &[u8]) -> Result>>; fn insert(&self, key: &[u8], value: &[u8]) -> Result<()>; fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()>; fn remove(&self, key: &[u8]) -> Result<()>; fn iter<'a>(&'a self) -> Box, Vec)> + 'a>; fn iter_from<'a>( &'a self, from: &[u8], backwards: bool, ) -> Box, Vec)> + 'a>; fn increment(&self, key: &[u8]) -> Result>; fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()>; fn scan_prefix<'a>( &'a self, prefix: Vec, ) -> Box, Vec)> + 'a>; fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>>; fn clear(&self) -> Result<()> { for (key, _) in self.iter() { self.remove(&key)?; } Ok(()) } } // This is a functional integration at the state of the art of the current // implementations, it should work and provide the base for stability and performance // testing, Persy should be pretty resilient to crash and pretty lightweight in memory usage // the speed in single thread will be pretty low because each transaction commit will wait for data // to be flushed on disk, multi-thread should guarantee better performances even though I expect // speed of a few thousand transactions per second. // // The current design of the engine right now do not allow to do transactions with multiple keys // that would allow to reduce the latency quite a lot, anyway support transaction in the engine // require a massive refactor. #[cfg(feature = "persy")] pub struct PersyEngine(persy::Persy); #[cfg(feature = "persy")] impl DatabaseEngine for PersyEngine { fn open(config: &Config) -> Result> { let cfg = persy::Config::new(); // This is for tweak the in memory cache size //config.change_cache_size(32 * 1024 * 1024 /*32Mb*/) let persy = persy::OpenOptions::new() .create(true) .config(cfg) .open(&config.database_path)?; Ok(Arc::new(PersyEngine(persy))) } fn open_tree(self: &Arc, name: &'static str) -> Result> { // Create if it doesn't exist if !self.0.exists_index(name)? { let mut tx = self.0.begin()?; tx.create_index::(name, persy::ValueMode::REPLACE)?; tx.prepare()?.commit()?; } Ok(Arc::new(PersyTree { db: self.0.clone(), name: name.to_owned(), watchers: RwLock::new(BTreeMap::new()), })) } } #[cfg(feature = "persy")] pub struct PersyTree { db: persy::Persy, name: String, watchers: RwLock, Vec>>>, } #[cfg(feature = "persy")] impl Tree for PersyTree { fn get(&self, key: &[u8]) -> Result>> { Ok(self .db .get::(&self.name, &persy::ByteVec(key.to_vec()))? .map(|v| v.into_iter().map(|bv| bv.0).next()) .flatten()) } fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { let watchers = self.watchers.read().unwrap(); let mut triggered = Vec::new(); for length in 0..=key.len() { if watchers.contains_key(&key[..length]) { triggered.push(&key[..length]); } } drop(watchers); if !triggered.is_empty() { let mut watchers = self.watchers.write().unwrap(); for prefix in triggered { if let Some(txs) = watchers.remove(prefix) { for tx in txs { let _ = tx.send(()); } } } } let mut tx = self.db.begin()?; tx.put::( &self.name, persy::ByteVec(key.to_owned()), persy::ByteVec(value.to_owned()), )?; tx.prepare()?.commit()?; Ok(()) } fn remove(&self, key: &[u8]) -> Result<()> { let mut tx = self.db.begin()?; tx.remove::( &self.name, persy::ByteVec(key.to_owned()), None, )?; tx.prepare()?.commit()?; Ok(()) } fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { Box::new( self.db .range::(&self.name, ..) .unwrap() .filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }), ) } fn iter_from<'a>( &'a self, from: &[u8], backwards: bool, ) -> Box, Box<[u8]>)> + 'a> { Box::new( self.db .range::( &self.name, persy::ByteVec(from.to_owned()).., ) .unwrap() .filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }), ) } fn increment(&self, key: &[u8]) -> Result> { let old = self.get(key)?; let new = utils::increment(old.as_deref()).unwrap(); self.insert(key, &new)?; Ok(new) } fn scan_prefix<'a>( &'a self, prefix: Vec, ) -> Box, Box<[u8]>)> + Send + 'a> { let range_prefix = persy::ByteVec(prefix.to_owned()); Box::new( self.db .range::( &self.name, range_prefix.clone()..=range_prefix, ) .unwrap() .filter_map(|(k, v)| { v.into_iter() .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .next() }), ) } fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { let (tx, rx) = tokio::sync::oneshot::channel(); self.watchers .write() .unwrap() .entry(prefix.to_vec()) .or_default() .push(tx); Box::pin(async move { // Tx is never destroyed rx.await.unwrap(); }) } }