Conduit is a simple, fast and reliable chat server powered by Matrix https://conduit.rs
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
535 lines
16 KiB
535 lines
16 KiB
use crate::{ |
|
database::{ |
|
abstraction::{DatabaseEngine, Tree}, |
|
Config, |
|
}, |
|
Result, |
|
}; |
|
use persy::{ByteVec, OpenOptions, Persy, ValueMode}; |
|
|
|
use std::{ |
|
cmp::Ordering, |
|
collections::{btree_map::Entry, BTreeMap, BTreeSet}, |
|
future::Future, |
|
iter::Peekable, |
|
pin::Pin, |
|
sync::{Arc, RwLock}, |
|
time::{Duration, Instant}, |
|
}; |
|
|
|
use tracing::warn; |
|
|
|
pub struct PersyEngine { |
|
persy: Persy, |
|
write_cache: Arc<RwLock<WriteCache>>, |
|
} |
|
|
|
impl DatabaseEngine for PersyEngine { |
|
fn open(config: &Config) -> Result<Arc<Self>> { |
|
let mut cfg = persy::Config::new(); |
|
cfg.change_cache_size((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64); |
|
|
|
let persy = OpenOptions::new() |
|
.create(true) |
|
.config(cfg) |
|
.open(&format!("{}/db.persy", config.database_path))?; |
|
let write_cache_size = 1000; |
|
let write_cache_window_millisecs = 1000; |
|
let write_cache = Arc::new(RwLock::new(WriteCache::new( |
|
&persy, |
|
write_cache_size, |
|
Duration::from_millis(write_cache_window_millisecs), |
|
))); |
|
/* |
|
use timer::Timer; |
|
let timer = Timer::new(); |
|
let timer_write_cache = write_cache.clone(); |
|
timer.schedule_repeating( |
|
chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), |
|
move || timer_write_cache.write().unwrap().flush_timed().unwrap(), |
|
); |
|
*/ |
|
Ok(Arc::new(PersyEngine { persy, write_cache })) |
|
} |
|
|
|
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> { |
|
// Create if it doesn't exist |
|
if !self.persy.exists_index(name)? { |
|
let mut tx = self.persy.begin()?; |
|
tx.create_index::<ByteVec, ByteVec>(name, ValueMode::Replace)?; |
|
tx.prepare()?.commit()?; |
|
} |
|
|
|
Ok(Arc::new(PersyTree { |
|
persy: self.persy.clone(), |
|
name: name.to_owned(), |
|
watchers: RwLock::new(BTreeMap::new()), |
|
write_cache: self.write_cache.clone(), |
|
})) |
|
} |
|
|
|
fn flush(self: &Arc<Self>) -> Result<()> { |
|
self.write_cache.write().unwrap().flush_changes()?; |
|
Ok(()) |
|
} |
|
} |
|
|
|
pub struct PersyTree { |
|
persy: Persy, |
|
name: String, |
|
watchers: RwLock<BTreeMap<Vec<u8>, Vec<tokio::sync::oneshot::Sender<()>>>>, |
|
write_cache: Arc<RwLock<WriteCache>>, |
|
} |
|
|
|
pub struct WriteCache { |
|
add_cache: BTreeMap<String, BTreeMap<Vec<u8>, Vec<u8>>>, |
|
remove_cache: BTreeMap<String, BTreeSet<Vec<u8>>>, |
|
changes_count: u32, |
|
last_flush: Instant, |
|
persy: Persy, |
|
max_size: u32, |
|
max_time_window: Duration, |
|
} |
|
|
|
impl WriteCache { |
|
fn new(persy: &Persy, max_size: u32, max_time_window: Duration) -> Self { |
|
Self { |
|
add_cache: Default::default(), |
|
remove_cache: Default::default(), |
|
changes_count: Default::default(), |
|
last_flush: Instant::now(), |
|
persy: persy.clone(), |
|
max_size, |
|
max_time_window, |
|
} |
|
} |
|
} |
|
|
|
impl WriteCache { |
|
pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { |
|
match self.add_cache.entry(index.clone()) { |
|
Entry::Vacant(s) => { |
|
let mut map = BTreeMap::new(); |
|
map.insert(key.to_owned(), value.to_owned()); |
|
s.insert(map); |
|
} |
|
Entry::Occupied(mut o) => { |
|
o.get_mut().insert(key.to_owned(), value.to_owned()); |
|
} |
|
} |
|
self.remove_remove(index, key)?; |
|
self.check_and_flush()?; |
|
Ok(()) |
|
} |
|
|
|
pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { |
|
match self.remove_cache.entry(index) { |
|
Entry::Vacant(_) => {} |
|
Entry::Occupied(mut o) => { |
|
o.get_mut().remove(key); |
|
} |
|
} |
|
Ok(()) |
|
} |
|
|
|
pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { |
|
match self.add_cache.entry(index) { |
|
Entry::Vacant(_) => {} |
|
Entry::Occupied(mut o) => { |
|
o.get_mut().remove(key); |
|
} |
|
} |
|
Ok(()) |
|
} |
|
|
|
pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { |
|
match self.remove_cache.entry(index.clone()) { |
|
Entry::Vacant(s) => { |
|
let mut map = BTreeSet::new(); |
|
map.insert(key.to_owned()); |
|
s.insert(map); |
|
} |
|
Entry::Occupied(mut o) => { |
|
o.get_mut().insert(key.to_owned()); |
|
} |
|
} |
|
self.remove_insert(index, key)?; |
|
self.check_and_flush()?; |
|
Ok(()) |
|
} |
|
pub fn check_and_flush(&mut self) -> Result<()> { |
|
self.changes_count += 1; |
|
if self.changes_count > self.max_size { |
|
self.flush_changes()?; |
|
self.changes_count = 0; |
|
} |
|
Ok(()) |
|
} |
|
|
|
pub fn get(&self, index: &str, key: &[u8], value: Option<Vec<u8>>) -> Result<Option<Vec<u8>>> { |
|
Ok(if let Some(changes) = self.add_cache.get(index) { |
|
changes.get(key).map(|v| v.to_owned()).or(value) |
|
} else if let Some(remove) = self.remove_cache.get(index) { |
|
if remove.contains(key) { |
|
None |
|
} else { |
|
value |
|
} |
|
} else { |
|
value |
|
}) |
|
} |
|
|
|
fn flush_changes(&mut self) -> Result<()> { |
|
let mut tx = self.persy.begin()?; |
|
|
|
for (index, changes) in &self.add_cache { |
|
for (key, value) in changes { |
|
tx.put::<ByteVec, ByteVec>( |
|
&index, |
|
ByteVec::new(key.to_owned()), |
|
ByteVec::new(value.to_owned()), |
|
)?; |
|
} |
|
} |
|
self.add_cache.clear(); |
|
for (index, changes) in &self.remove_cache { |
|
for key in changes { |
|
tx.remove::<ByteVec, ByteVec>(&index, ByteVec::new(key.to_owned()), None)?; |
|
} |
|
} |
|
self.remove_cache.clear(); |
|
tx.prepare()?.commit()?; |
|
self.last_flush = Instant::now(); |
|
Ok(()) |
|
} |
|
|
|
pub fn iter<'a>( |
|
&self, |
|
index: &str, |
|
mut iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + Sync + 'a>, |
|
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + Sync + 'a> { |
|
if let Some(adds) = self.add_cache.get(index) { |
|
let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); |
|
iter = Box::new(UnionIter::new(iter, added, false)) |
|
} |
|
|
|
if let Some(removes) = self.remove_cache.get(index) { |
|
let to_filter = removes.clone(); |
|
iter = Box::new(iter.filter(move |x| to_filter.contains(&(*x.0).to_owned()))) |
|
} |
|
|
|
iter |
|
} |
|
|
|
fn iter_from<'a>( |
|
&self, |
|
index: &str, |
|
from: &[u8], |
|
backwards: bool, |
|
mut iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a>, |
|
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> { |
|
if let Some(adds) = self.add_cache.get(index) { |
|
let range = if backwards { |
|
adds.range(..from.to_owned()) |
|
} else { |
|
adds.range(from.to_owned()..) |
|
}; |
|
let added = range |
|
.map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) |
|
.collect::<Vec<(Vec<u8>, Vec<u8>)>>(); |
|
let add_iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send> = if backwards { |
|
Box::new(added.into_iter().rev()) |
|
} else { |
|
Box::new(added.into_iter()) |
|
}; |
|
iter = Box::new(UnionIter::new(iter, add_iter, backwards)) |
|
} |
|
|
|
if let Some(removes) = self.remove_cache.get(index) { |
|
let owned_from = from.to_owned(); |
|
let to_filter = removes.iter(); |
|
let to_filter = if backwards { |
|
to_filter |
|
.filter(|x| (..&owned_from).contains(x)) |
|
.cloned() |
|
.collect::<Vec<Vec<u8>>>() |
|
} else { |
|
to_filter |
|
.filter(|x| (&owned_from..).contains(x)) |
|
.cloned() |
|
.collect::<Vec<Vec<u8>>>() |
|
}; |
|
iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) |
|
} |
|
iter |
|
} |
|
|
|
fn scan_prefix<'a>( |
|
&self, |
|
index: &str, |
|
prefix: Vec<u8>, |
|
mut iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a>, |
|
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> { |
|
if let Some(adds) = self.add_cache.get(index) { |
|
let owned_prefix = prefix.to_owned(); |
|
let added = adds |
|
.range(prefix.to_owned()..) |
|
.take_while(move |(k, _)| k.starts_with(&owned_prefix)) |
|
.map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) |
|
.collect::<Vec<(Vec<u8>, Vec<u8>)>>(); |
|
iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) |
|
} |
|
|
|
if let Some(removes) = self.remove_cache.get(index) { |
|
let to_filter = removes |
|
.iter() |
|
.filter(move |k| k.starts_with(&prefix)) |
|
.cloned() |
|
.collect::<Vec<Vec<u8>>>(); |
|
iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) |
|
} |
|
iter |
|
} |
|
|
|
#[allow(unused)] |
|
pub fn flush_timed(&mut self) -> Result<()> { |
|
if self.changes_count > 0 { |
|
if Instant::now() - self.last_flush > self.max_time_window { |
|
self.flush_changes()?; |
|
} |
|
} |
|
Ok(()) |
|
} |
|
} |
|
|
|
struct UnionIter<T: Iterator<Item = I>, T1: Iterator<Item = I>, I> { |
|
first: Peekable<T>, |
|
second: Peekable<T1>, |
|
backwards: bool, |
|
} |
|
|
|
impl<T: Iterator<Item = I>, T1: Iterator<Item = I>, I> UnionIter<T, T1, I> { |
|
fn new(first: T, second: T1, backwards: bool) -> Self { |
|
UnionIter { |
|
first: first.peekable(), |
|
second: second.peekable(), |
|
backwards, |
|
} |
|
} |
|
} |
|
|
|
impl<K: Ord, V, T, T1> Iterator for UnionIter<T, T1, (K, V)> |
|
where |
|
T: Iterator<Item = (K, V)>, |
|
T1: Iterator<Item = (K, V)>, |
|
{ |
|
type Item = (K, V); |
|
fn next(&mut self) -> Option<Self::Item> { |
|
if let (Some(f), Some(s)) = (self.first.peek(), self.second.peek()) { |
|
if self.backwards { |
|
match f.0.cmp(&s.0) { |
|
Ordering::Less => self.second.next(), |
|
Ordering::Greater => self.first.next(), |
|
Ordering::Equal => { |
|
self.first.next(); |
|
self.second.next() |
|
} |
|
} |
|
} else { |
|
match f.0.cmp(&s.0) { |
|
Ordering::Less => self.first.next(), |
|
Ordering::Greater => self.second.next(), |
|
Ordering::Equal => { |
|
self.first.next(); |
|
self.second.next() |
|
} |
|
} |
|
} |
|
} else { |
|
self.first.next().or_else(|| self.second.next()) |
|
} |
|
} |
|
} |
|
|
|
impl Tree for PersyTree { |
|
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { |
|
let result = self |
|
.persy |
|
.get::<ByteVec, ByteVec>(&self.name, &ByteVec::new(key.to_vec()))? |
|
.next() |
|
.map(|v| (*v).to_owned()); |
|
let result = self |
|
.write_cache |
|
.read() |
|
.unwrap() |
|
.get(&self.name, key, result)?; |
|
Ok(result) |
|
} |
|
|
|
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { |
|
let watchers = self.watchers.read().unwrap(); |
|
let mut triggered = Vec::new(); |
|
|
|
for length in 0..=key.len() { |
|
if watchers.contains_key(&key[..length]) { |
|
triggered.push(&key[..length]); |
|
} |
|
} |
|
|
|
drop(watchers); |
|
|
|
if !triggered.is_empty() { |
|
let mut watchers = self.watchers.write().unwrap(); |
|
for prefix in triggered { |
|
if let Some(txs) = watchers.remove(prefix) { |
|
for tx in txs { |
|
let _ = tx.send(()); |
|
} |
|
} |
|
} |
|
} |
|
self.write_cache |
|
.write() |
|
.unwrap() |
|
.insert(self.name.clone(), key, value)?; |
|
Ok(()) |
|
} |
|
|
|
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> { |
|
//TODO: evaluate if use instead a single big transaction |
|
for (key, value) in iter { |
|
self.insert(&key, &value)?; |
|
} |
|
|
|
Ok(()) |
|
} |
|
|
|
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> { |
|
for key in iter { |
|
self.increment(&key)?; |
|
} |
|
Ok(()) |
|
} |
|
|
|
fn remove(&self, key: &[u8]) -> Result<()> { |
|
self.write_cache |
|
.write() |
|
.unwrap() |
|
.remove(self.name.clone(), key)?; |
|
Ok(()) |
|
} |
|
|
|
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> { |
|
let iter = self.persy.range::<ByteVec, ByteVec, _>(&self.name, ..); |
|
match iter { |
|
Ok(iter) => { |
|
let result = Box::new(iter.filter_map(|(k, v)| { |
|
v.into_iter() |
|
.map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) |
|
.next() |
|
})); |
|
|
|
self.write_cache.read().unwrap().iter(&self.name, result) |
|
} |
|
Err(e) => { |
|
warn!("error iterating {:?}", e); |
|
Box::new(std::iter::empty()) |
|
} |
|
} |
|
} |
|
|
|
fn iter_from<'a>( |
|
&'a self, |
|
from: &[u8], |
|
backwards: bool, |
|
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> { |
|
let range = if backwards { |
|
self.persy |
|
.range::<ByteVec, ByteVec, _>(&self.name, ..ByteVec::new(from.to_owned())) |
|
} else { |
|
self.persy |
|
.range::<ByteVec, ByteVec, _>(&self.name, ByteVec::new(from.to_owned())..) |
|
}; |
|
match range { |
|
Ok(iter) => { |
|
let map = iter.filter_map(|(k, v)| { |
|
v.into_iter() |
|
.map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) |
|
.next() |
|
}); |
|
let result: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> = if backwards |
|
{ |
|
Box::new(map.rev()) |
|
} else { |
|
Box::new(map) |
|
}; |
|
|
|
self.write_cache |
|
.read() |
|
.unwrap() |
|
.iter_from(&self.name, from, backwards, result) |
|
} |
|
Err(e) => { |
|
warn!("error iterating with prefix {:?}", e); |
|
Box::new(std::iter::empty()) |
|
} |
|
} |
|
} |
|
|
|
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> { |
|
let old = self.get(key)?; |
|
let new = crate::utils::increment(old.as_deref()).unwrap(); |
|
self.insert(key, &new)?; |
|
Ok(new) |
|
} |
|
|
|
fn scan_prefix<'a>( |
|
&'a self, |
|
prefix: Vec<u8>, |
|
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> { |
|
let range_prefix = ByteVec::new(prefix.to_owned()); |
|
let range = self |
|
.persy |
|
.range::<ByteVec, ByteVec, _>(&self.name, range_prefix..); |
|
|
|
match range { |
|
Ok(iter) => { |
|
let owned_prefix = prefix.clone(); |
|
let result = Box::new( |
|
iter.take_while(move |(k, _)| (*k).starts_with(&owned_prefix)) |
|
.filter_map(|(k, v)| { |
|
v.into_iter() |
|
.map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) |
|
.next() |
|
}), |
|
); |
|
|
|
self.write_cache |
|
.read() |
|
.unwrap() |
|
.scan_prefix(&self.name, prefix, result) |
|
} |
|
Err(e) => { |
|
warn!("error scanning prefix {:?}", e); |
|
Box::new(std::iter::empty()) |
|
} |
|
} |
|
} |
|
|
|
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
|
let (tx, rx) = tokio::sync::oneshot::channel(); |
|
|
|
self.watchers |
|
.write() |
|
.unwrap() |
|
.entry(prefix.to_vec()) |
|
.or_default() |
|
.push(tx); |
|
|
|
Box::pin(async move { |
|
// Tx is never destroyed |
|
rx.await.unwrap(); |
|
}) |
|
} |
|
}
|
|
|