Browse Source

refactored to use programmatic flush instead of timed flush of the write cache

merge-requests/107/head
Tglman 5 years ago
parent
commit
03a0d51ab4
  1. 2
      Cargo.toml
  2. 1
      src/database/abstraction.rs
  3. 47
      src/database/abstraction/persy.rs

2
Cargo.toml

@ -89,7 +89,7 @@ thread_local = "1.1.3"
[features] [features]
default = ["conduit_bin", "backend_sqlite"] default = ["conduit_bin", "backend_sqlite"]
backend_sled = ["sled"] backend_sled = ["sled"]
backend_persy = ["persy"] backend_persy = ["persy","num_cpus"]
backend_sqlite = ["sqlite"] backend_sqlite = ["sqlite"]
backend_heed = ["heed", "crossbeam"] backend_heed = ["heed", "crossbeam"]
sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"] sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"]

1
src/database/abstraction.rs

@ -49,4 +49,3 @@ pub trait Tree: Send + Sync {
Ok(()) Ok(())
} }
} }

47
src/database/abstraction/persy.rs

@ -1,29 +1,31 @@
use crate::{
database::{
abstraction::{DatabaseEngine, Tree},
Config,
},
Result,
};
use persy::{ByteVec, OpenOptions, Persy, ValueMode}; use persy::{ByteVec, OpenOptions, Persy, ValueMode};
use timer::Timer;
use crate::database::abstraction::{DatabaseEngine, Tree};
use crate::database::Config;
use crate::Result;
use std::{ use std::{
future::Future, pin::Pin,
cmp::Ordering, cmp::Ordering,
collections::{BTreeMap,btree_map::Entry, BTreeSet}, collections::{btree_map::Entry, BTreeMap, BTreeSet},
future::Future,
iter::Peekable, iter::Peekable,
pin::Pin,
sync::{Arc, RwLock},
time::{Duration, Instant}, time::{Duration, Instant},
sync::{RwLock, Arc}
}; };
pub struct PersyEngine { pub struct PersyEngine {
persy: Persy, persy: Persy,
write_cache: Arc<RwLock<WriteCache>>, write_cache: Arc<RwLock<WriteCache>>,
#[allow(unused)]
timer: Timer,
} }
impl DatabaseEngine for PersyEngine { impl DatabaseEngine for PersyEngine {
fn open(config: &Config) -> Result<Arc<Self>> { fn open(config: &Config) -> Result<Arc<Self>> {
let mut cfg = persy::Config::new(); let mut cfg = persy::Config::new();
cfg.change_cache_size(config.db_cache_capacity_mb as u64 *1048576f64 as u64); cfg.change_cache_size((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64);
let persy = OpenOptions::new() let persy = OpenOptions::new()
.create(true) .create(true)
@ -36,17 +38,16 @@ impl DatabaseEngine for PersyEngine {
write_cache_size, write_cache_size,
Duration::from_millis(write_cache_window_millisecs), Duration::from_millis(write_cache_window_millisecs),
))); )));
/*
use timer::Timer;
let timer = Timer::new(); let timer = Timer::new();
let timer_write_cache = write_cache.clone(); let timer_write_cache = write_cache.clone();
timer.schedule_repeating( timer.schedule_repeating(
chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64),
move || timer_write_cache.write().unwrap().flush_timed().unwrap(), move || timer_write_cache.write().unwrap().flush_timed().unwrap(),
); );
Ok(Arc::new(PersyEngine { */
persy, Ok(Arc::new(PersyEngine { persy, write_cache }))
write_cache,
timer,
}))
} }
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> { fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> {
@ -66,6 +67,7 @@ impl DatabaseEngine for PersyEngine {
} }
fn flush(self: &Arc<Self>) -> Result<()> { fn flush(self: &Arc<Self>) -> Result<()> {
self.write_cache.write().unwrap().flush_changes()?;
Ok(()) Ok(())
} }
} }
@ -299,23 +301,12 @@ impl WriteCache {
} }
} }
#[cfg(feature = "persy")]
impl Drop for WriteCache {
fn drop(&mut self) {
if self.changes_count > 0 {
self.flush_changes().unwrap();
}
}
}
#[cfg(feature = "persy")]
struct UnionIter<T: Iterator<Item = I>, T1: Iterator<Item = I>, I> { struct UnionIter<T: Iterator<Item = I>, T1: Iterator<Item = I>, I> {
first: Peekable<T>, first: Peekable<T>,
second: Peekable<T1>, second: Peekable<T1>,
backwards: bool, backwards: bool,
} }
#[cfg(feature = "persy")]
impl<T: Iterator<Item = I>, T1: Iterator<Item = I>, I> UnionIter<T, T1, I> { impl<T: Iterator<Item = I>, T1: Iterator<Item = I>, I> UnionIter<T, T1, I> {
fn new(first: T, second: T1, backwards: bool) -> Self { fn new(first: T, second: T1, backwards: bool) -> Self {
UnionIter { UnionIter {
@ -326,7 +317,6 @@ impl<T: Iterator<Item = I>, T1: Iterator<Item = I>, I> UnionIter<T, T1, I> {
} }
} }
#[cfg(feature = "persy")]
impl<K: Ord, V, T, T1> Iterator for UnionIter<T, T1, (K, V)> impl<K: Ord, V, T, T1> Iterator for UnionIter<T, T1, (K, V)>
where where
T: Iterator<Item = (K, V)>, T: Iterator<Item = (K, V)>,
@ -450,7 +440,8 @@ impl Tree for PersyTree {
.map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) .map(|val| (k.0.to_owned().into(), val.0.to_owned().into()))
.next() .next()
}); });
let result: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send+'a > = if backwards { let result: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> = if backwards
{
Box::new(map.rev()) Box::new(map.rev())
} else { } else {
Box::new(map) Box::new(map)

Loading…
Cancel
Save