6 changed files with 626 additions and 0 deletions
@ -0,0 +1,535 @@
@@ -0,0 +1,535 @@
|
||||
use crate::{ |
||||
database::{ |
||||
abstraction::{DatabaseEngine, Tree}, |
||||
Config, |
||||
}, |
||||
Result, |
||||
}; |
||||
use persy::{ByteVec, OpenOptions, Persy, ValueMode}; |
||||
|
||||
use std::{ |
||||
cmp::Ordering, |
||||
collections::{btree_map::Entry, BTreeMap, BTreeSet}, |
||||
future::Future, |
||||
iter::Peekable, |
||||
pin::Pin, |
||||
sync::{Arc, RwLock}, |
||||
time::{Duration, Instant}, |
||||
}; |
||||
|
||||
use tracing::warn; |
||||
|
||||
pub struct PersyEngine { |
||||
persy: Persy, |
||||
write_cache: Arc<RwLock<WriteCache>>, |
||||
} |
||||
|
||||
impl DatabaseEngine for PersyEngine { |
||||
fn open(config: &Config) -> Result<Arc<Self>> { |
||||
let mut cfg = persy::Config::new(); |
||||
cfg.change_cache_size((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64); |
||||
|
||||
let persy = OpenOptions::new() |
||||
.create(true) |
||||
.config(cfg) |
||||
.open(&format!("{}/db.persy", config.database_path))?; |
||||
let write_cache_size = 1000; |
||||
let write_cache_window_millisecs = 1000; |
||||
let write_cache = Arc::new(RwLock::new(WriteCache::new( |
||||
&persy, |
||||
write_cache_size, |
||||
Duration::from_millis(write_cache_window_millisecs), |
||||
))); |
||||
/* |
||||
use timer::Timer; |
||||
let timer = Timer::new(); |
||||
let timer_write_cache = write_cache.clone(); |
||||
timer.schedule_repeating( |
||||
chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), |
||||
move || timer_write_cache.write().unwrap().flush_timed().unwrap(), |
||||
); |
||||
*/ |
||||
Ok(Arc::new(PersyEngine { persy, write_cache })) |
||||
} |
||||
|
||||
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> { |
||||
// Create if it doesn't exist
|
||||
if !self.persy.exists_index(name)? { |
||||
let mut tx = self.persy.begin()?; |
||||
tx.create_index::<ByteVec, ByteVec>(name, ValueMode::Replace)?; |
||||
tx.prepare()?.commit()?; |
||||
} |
||||
|
||||
Ok(Arc::new(PersyTree { |
||||
persy: self.persy.clone(), |
||||
name: name.to_owned(), |
||||
watchers: RwLock::new(BTreeMap::new()), |
||||
write_cache: self.write_cache.clone(), |
||||
})) |
||||
} |
||||
|
||||
fn flush(self: &Arc<Self>) -> Result<()> { |
||||
self.write_cache.write().unwrap().flush_changes()?; |
||||
Ok(()) |
||||
} |
||||
} |
||||
|
||||
pub struct PersyTree { |
||||
persy: Persy, |
||||
name: String, |
||||
watchers: RwLock<BTreeMap<Vec<u8>, Vec<tokio::sync::oneshot::Sender<()>>>>, |
||||
write_cache: Arc<RwLock<WriteCache>>, |
||||
} |
||||
|
||||
pub struct WriteCache { |
||||
add_cache: BTreeMap<String, BTreeMap<Vec<u8>, Vec<u8>>>, |
||||
remove_cache: BTreeMap<String, BTreeSet<Vec<u8>>>, |
||||
changes_count: u32, |
||||
last_flush: Instant, |
||||
persy: Persy, |
||||
max_size: u32, |
||||
max_time_window: Duration, |
||||
} |
||||
|
||||
impl WriteCache { |
||||
fn new(persy: &Persy, max_size: u32, max_time_window: Duration) -> Self { |
||||
Self { |
||||
add_cache: Default::default(), |
||||
remove_cache: Default::default(), |
||||
changes_count: Default::default(), |
||||
last_flush: Instant::now(), |
||||
persy: persy.clone(), |
||||
max_size, |
||||
max_time_window, |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl WriteCache { |
||||
pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { |
||||
match self.add_cache.entry(index.clone()) { |
||||
Entry::Vacant(s) => { |
||||
let mut map = BTreeMap::new(); |
||||
map.insert(key.to_owned(), value.to_owned()); |
||||
s.insert(map); |
||||
} |
||||
Entry::Occupied(mut o) => { |
||||
o.get_mut().insert(key.to_owned(), value.to_owned()); |
||||
} |
||||
} |
||||
self.remove_remove(index, key)?; |
||||
self.check_and_flush()?; |
||||
Ok(()) |
||||
} |
||||
|
||||
pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { |
||||
match self.remove_cache.entry(index) { |
||||
Entry::Vacant(_) => {} |
||||
Entry::Occupied(mut o) => { |
||||
o.get_mut().remove(key); |
||||
} |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { |
||||
match self.add_cache.entry(index) { |
||||
Entry::Vacant(_) => {} |
||||
Entry::Occupied(mut o) => { |
||||
o.get_mut().remove(key); |
||||
} |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { |
||||
match self.remove_cache.entry(index.clone()) { |
||||
Entry::Vacant(s) => { |
||||
let mut map = BTreeSet::new(); |
||||
map.insert(key.to_owned()); |
||||
s.insert(map); |
||||
} |
||||
Entry::Occupied(mut o) => { |
||||
o.get_mut().insert(key.to_owned()); |
||||
} |
||||
} |
||||
self.remove_insert(index, key)?; |
||||
self.check_and_flush()?; |
||||
Ok(()) |
||||
} |
||||
pub fn check_and_flush(&mut self) -> Result<()> { |
||||
self.changes_count += 1; |
||||
if self.changes_count > self.max_size { |
||||
self.flush_changes()?; |
||||
self.changes_count = 0; |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
pub fn get(&self, index: &str, key: &[u8], value: Option<Vec<u8>>) -> Result<Option<Vec<u8>>> { |
||||
Ok(if let Some(changes) = self.add_cache.get(index) { |
||||
changes.get(key).map(|v| v.to_owned()).or(value) |
||||
} else if let Some(remove) = self.remove_cache.get(index) { |
||||
if remove.contains(key) { |
||||
None |
||||
} else { |
||||
value |
||||
} |
||||
} else { |
||||
value |
||||
}) |
||||
} |
||||
|
||||
fn flush_changes(&mut self) -> Result<()> { |
||||
let mut tx = self.persy.begin()?; |
||||
|
||||
for (index, changes) in &self.add_cache { |
||||
for (key, value) in changes { |
||||
tx.put::<ByteVec, ByteVec>( |
||||
&index, |
||||
ByteVec::new(key.to_owned()), |
||||
ByteVec::new(value.to_owned()), |
||||
)?; |
||||
} |
||||
} |
||||
self.add_cache.clear(); |
||||
for (index, changes) in &self.remove_cache { |
||||
for key in changes { |
||||
tx.remove::<ByteVec, ByteVec>(&index, ByteVec::new(key.to_owned()), None)?; |
||||
} |
||||
} |
||||
self.remove_cache.clear(); |
||||
tx.prepare()?.commit()?; |
||||
self.last_flush = Instant::now(); |
||||
Ok(()) |
||||
} |
||||
|
||||
pub fn iter<'a>( |
||||
&self, |
||||
index: &str, |
||||
mut iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + Sync + 'a>, |
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + Sync + 'a> { |
||||
if let Some(adds) = self.add_cache.get(index) { |
||||
let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); |
||||
iter = Box::new(UnionIter::new(iter, added, false)) |
||||
} |
||||
|
||||
if let Some(removes) = self.remove_cache.get(index) { |
||||
let to_filter = removes.clone(); |
||||
iter = Box::new(iter.filter(move |x| to_filter.contains(&(*x.0).to_owned()))) |
||||
} |
||||
|
||||
iter |
||||
} |
||||
|
||||
fn iter_from<'a>( |
||||
&self, |
||||
index: &str, |
||||
from: &[u8], |
||||
backwards: bool, |
||||
mut iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a>, |
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> { |
||||
if let Some(adds) = self.add_cache.get(index) { |
||||
let range = if backwards { |
||||
adds.range(..from.to_owned()) |
||||
} else { |
||||
adds.range(from.to_owned()..) |
||||
}; |
||||
let added = range |
||||
.map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) |
||||
.collect::<Vec<(Vec<u8>, Vec<u8>)>>(); |
||||
let add_iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send> = if backwards { |
||||
Box::new(added.into_iter().rev()) |
||||
} else { |
||||
Box::new(added.into_iter()) |
||||
}; |
||||
iter = Box::new(UnionIter::new(iter, add_iter, backwards)) |
||||
} |
||||
|
||||
if let Some(removes) = self.remove_cache.get(index) { |
||||
let owned_from = from.to_owned(); |
||||
let to_filter = removes.iter(); |
||||
let to_filter = if backwards { |
||||
to_filter |
||||
.filter(|x| (..&owned_from).contains(x)) |
||||
.cloned() |
||||
.collect::<Vec<Vec<u8>>>() |
||||
} else { |
||||
to_filter |
||||
.filter(|x| (&owned_from..).contains(x)) |
||||
.cloned() |
||||
.collect::<Vec<Vec<u8>>>() |
||||
}; |
||||
iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) |
||||
} |
||||
iter |
||||
} |
||||
|
||||
fn scan_prefix<'a>( |
||||
&self, |
||||
index: &str, |
||||
prefix: Vec<u8>, |
||||
mut iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a>, |
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> { |
||||
if let Some(adds) = self.add_cache.get(index) { |
||||
let owned_prefix = prefix.to_owned(); |
||||
let added = adds |
||||
.range(prefix.to_owned()..) |
||||
.take_while(move |(k, _)| k.starts_with(&owned_prefix)) |
||||
.map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) |
||||
.collect::<Vec<(Vec<u8>, Vec<u8>)>>(); |
||||
iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) |
||||
} |
||||
|
||||
if let Some(removes) = self.remove_cache.get(index) { |
||||
let to_filter = removes |
||||
.iter() |
||||
.filter(move |k| k.starts_with(&prefix)) |
||||
.cloned() |
||||
.collect::<Vec<Vec<u8>>>(); |
||||
iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) |
||||
} |
||||
iter |
||||
} |
||||
|
||||
#[allow(unused)] |
||||
pub fn flush_timed(&mut self) -> Result<()> { |
||||
if self.changes_count > 0 { |
||||
if Instant::now() - self.last_flush > self.max_time_window { |
||||
self.flush_changes()?; |
||||
} |
||||
} |
||||
Ok(()) |
||||
} |
||||
} |
||||
|
||||
struct UnionIter<T: Iterator<Item = I>, T1: Iterator<Item = I>, I> { |
||||
first: Peekable<T>, |
||||
second: Peekable<T1>, |
||||
backwards: bool, |
||||
} |
||||
|
||||
impl<T: Iterator<Item = I>, T1: Iterator<Item = I>, I> UnionIter<T, T1, I> { |
||||
fn new(first: T, second: T1, backwards: bool) -> Self { |
||||
UnionIter { |
||||
first: first.peekable(), |
||||
second: second.peekable(), |
||||
backwards, |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl<K: Ord, V, T, T1> Iterator for UnionIter<T, T1, (K, V)> |
||||
where |
||||
T: Iterator<Item = (K, V)>, |
||||
T1: Iterator<Item = (K, V)>, |
||||
{ |
||||
type Item = (K, V); |
||||
fn next(&mut self) -> Option<Self::Item> { |
||||
if let (Some(f), Some(s)) = (self.first.peek(), self.second.peek()) { |
||||
if self.backwards { |
||||
match f.0.cmp(&s.0) { |
||||
Ordering::Less => self.second.next(), |
||||
Ordering::Greater => self.first.next(), |
||||
Ordering::Equal => { |
||||
self.first.next(); |
||||
self.second.next() |
||||
} |
||||
} |
||||
} else { |
||||
match f.0.cmp(&s.0) { |
||||
Ordering::Less => self.first.next(), |
||||
Ordering::Greater => self.second.next(), |
||||
Ordering::Equal => { |
||||
self.first.next(); |
||||
self.second.next() |
||||
} |
||||
} |
||||
} |
||||
} else { |
||||
self.first.next().or_else(|| self.second.next()) |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl Tree for PersyTree { |
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { |
||||
let result = self |
||||
.persy |
||||
.get::<ByteVec, ByteVec>(&self.name, &ByteVec::new(key.to_vec()))? |
||||
.next() |
||||
.map(|v| (*v).to_owned()); |
||||
let result = self |
||||
.write_cache |
||||
.read() |
||||
.unwrap() |
||||
.get(&self.name, key, result)?; |
||||
Ok(result) |
||||
} |
||||
|
||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { |
||||
let watchers = self.watchers.read().unwrap(); |
||||
let mut triggered = Vec::new(); |
||||
|
||||
for length in 0..=key.len() { |
||||
if watchers.contains_key(&key[..length]) { |
||||
triggered.push(&key[..length]); |
||||
} |
||||
} |
||||
|
||||
drop(watchers); |
||||
|
||||
if !triggered.is_empty() { |
||||
let mut watchers = self.watchers.write().unwrap(); |
||||
for prefix in triggered { |
||||
if let Some(txs) = watchers.remove(prefix) { |
||||
for tx in txs { |
||||
let _ = tx.send(()); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
self.write_cache |
||||
.write() |
||||
.unwrap() |
||||
.insert(self.name.clone(), key, value)?; |
||||
Ok(()) |
||||
} |
||||
|
||||
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> { |
||||
//TODO: evaluate if use instead a single big transaction
|
||||
for (key, value) in iter { |
||||
self.insert(&key, &value)?; |
||||
} |
||||
|
||||
Ok(()) |
||||
} |
||||
|
||||
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> { |
||||
for key in iter { |
||||
self.increment(&key)?; |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
fn remove(&self, key: &[u8]) -> Result<()> { |
||||
self.write_cache |
||||
.write() |
||||
.unwrap() |
||||
.remove(self.name.clone(), key)?; |
||||
Ok(()) |
||||
} |
||||
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> { |
||||
let iter = self.persy.range::<ByteVec, ByteVec, _>(&self.name, ..); |
||||
match iter { |
||||
Ok(iter) => { |
||||
let result = Box::new(iter.filter_map(|(k, v)| { |
||||
v.into_iter() |
||||
.map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) |
||||
.next() |
||||
})); |
||||
|
||||
self.write_cache.read().unwrap().iter(&self.name, result) |
||||
} |
||||
Err(e) => { |
||||
warn!("error iterating {:?}", e); |
||||
Box::new(std::iter::empty()) |
||||
} |
||||
} |
||||
} |
||||
|
||||
fn iter_from<'a>( |
||||
&'a self, |
||||
from: &[u8], |
||||
backwards: bool, |
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> { |
||||
let range = if backwards { |
||||
self.persy |
||||
.range::<ByteVec, ByteVec, _>(&self.name, ..ByteVec::new(from.to_owned())) |
||||
} else { |
||||
self.persy |
||||
.range::<ByteVec, ByteVec, _>(&self.name, ByteVec::new(from.to_owned())..) |
||||
}; |
||||
match range { |
||||
Ok(iter) => { |
||||
let map = iter.filter_map(|(k, v)| { |
||||
v.into_iter() |
||||
.map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) |
||||
.next() |
||||
}); |
||||
let result: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> = if backwards |
||||
{ |
||||
Box::new(map.rev()) |
||||
} else { |
||||
Box::new(map) |
||||
}; |
||||
|
||||
self.write_cache |
||||
.read() |
||||
.unwrap() |
||||
.iter_from(&self.name, from, backwards, result) |
||||
} |
||||
Err(e) => { |
||||
warn!("error iterating with prefix {:?}", e); |
||||
Box::new(std::iter::empty()) |
||||
} |
||||
} |
||||
} |
||||
|
||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> { |
||||
let old = self.get(key)?; |
||||
let new = crate::utils::increment(old.as_deref()).unwrap(); |
||||
self.insert(key, &new)?; |
||||
Ok(new) |
||||
} |
||||
|
||||
fn scan_prefix<'a>( |
||||
&'a self, |
||||
prefix: Vec<u8>, |
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> { |
||||
let range_prefix = ByteVec::new(prefix.to_owned()); |
||||
let range = self |
||||
.persy |
||||
.range::<ByteVec, ByteVec, _>(&self.name, range_prefix..); |
||||
|
||||
match range { |
||||
Ok(iter) => { |
||||
let owned_prefix = prefix.clone(); |
||||
let result = Box::new( |
||||
iter.take_while(move |(k, _)| (*k).starts_with(&owned_prefix)) |
||||
.filter_map(|(k, v)| { |
||||
v.into_iter() |
||||
.map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) |
||||
.next() |
||||
}), |
||||
); |
||||
|
||||
self.write_cache |
||||
.read() |
||||
.unwrap() |
||||
.scan_prefix(&self.name, prefix, result) |
||||
} |
||||
Err(e) => { |
||||
warn!("error scanning prefix {:?}", e); |
||||
Box::new(std::iter::empty()) |
||||
} |
||||
} |
||||
} |
||||
|
||||
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
||||
let (tx, rx) = tokio::sync::oneshot::channel(); |
||||
|
||||
self.watchers |
||||
.write() |
||||
.unwrap() |
||||
.entry(prefix.to_vec()) |
||||
.or_default() |
||||
.push(tx); |
||||
|
||||
Box::pin(async move { |
||||
// Tx is never destroyed
|
||||
rx.await.unwrap(); |
||||
}) |
||||
} |
||||
} |
||||
Loading…
Reference in new issue