Browse Source

cleanup of persy integration, add timer for background flush of write cache

merge-requests/107/merge^2
Tglman 5 years ago
parent
commit
44bc040e2f
  1. 5
      Cargo.toml
  2. 119
      src/database/abstraction.rs

5
Cargo.toml

@ -28,9 +28,10 @@ tokio = "1.8.2" @@ -28,9 +28,10 @@ tokio = "1.8.2"
# Used for storing data permanently
sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true }
#sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] }
# Used for storing data permanently
persy = { version = "0.11", optional = true }
# Used by the persy write cache for background flush
timer = "0.2"
chrono = "0.4"
# Used for the http request / response body type for Ruma endpoints used with reqwest
bytes = "1.0.1"

119
src/database/abstraction.rs

@ -9,11 +9,17 @@ use std::{collections::BTreeMap, sync::RwLock}; @@ -9,11 +9,17 @@ use std::{collections::BTreeMap, sync::RwLock};
#[cfg(feature = "persy")]
use std::{
cmp::Ordering,
collections::BTreeSet,
collections::{btree_map::Entry, BTreeSet},
iter::Peekable,
time::{Duration, Instant},
};
#[cfg(feature = "persy")]
use persy::{ByteVec, OpenOptions, Persy, ValueMode};
#[cfg(feature = "persy")]
use timer::Timer;
#[cfg(feature = "sled")]
pub mod sled;
@ -64,62 +70,64 @@ pub trait Tree: Send + Sync { @@ -64,62 +70,64 @@ pub trait Tree: Send + Sync {
}
}
// This is a functional integration at the state of the art of the current
// implementations, it should work and provide the base for stability and performance
// testing, Persy should be pretty resilient to crash and pretty lightweight in memory usage
// the speed in single thread will be pretty low because each transaction commit will wait for data
// to be flushed on disk, multi-thread should guarantee better performances even though I expect
// speed of a few thousand transactions per second.
//
// The current design of the engine right now do not allow to do transactions with multiple keys
// that would allow to reduce the latency quite a lot, anyway support transaction in the engine
// require a massive refactor.
#[cfg(feature = "persy")]
pub struct PersyEngine(persy::Persy, Arc<RwLock<WriteCache>>);
pub struct PersyEngine {
persy: Persy,
write_cache: Arc<RwLock<WriteCache>>,
#[allow(unused)]
timer: Timer,
}
#[cfg(feature = "persy")]
impl DatabaseEngine for PersyEngine {
fn open(config: &Config) -> Result<Arc<Self>> {
let cfg = persy::Config::new();
// This is for tweak the in memory cache size
//config.change_cache_size(32 * 1024 * 1024 /*32Mb*/)
let mut cfg = persy::Config::new();
cfg.change_cache_size(config.cache_capacity as u64);
let persy = persy::OpenOptions::new()
let persy = OpenOptions::new()
.create(true)
.config(cfg)
.open(&format!("{}/db.persy", config.database_path))?;
let write_cache = Arc::new(RwLock::new(WriteCache {
add_cache: Default::default(),
remove_cache: Default::default(),
changes_count: Default::default(),
last_flush: Instant::now(),
db: persy.clone(),
}));
Ok(Arc::new(PersyEngine(persy, write_cache)))
let write_cache_size = 1000;
let write_cache_window_millisecs = 1000;
let write_cache = Arc::new(RwLock::new(WriteCache::new(
&persy,
write_cache_size,
Duration::from_millis(write_cache_window_millisecs),
)));
let timer = Timer::new();
let timer_write_cache = write_cache.clone();
timer.schedule_repeating(
chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64),
move || timer_write_cache.write().unwrap().flush_timed().unwrap(),
);
Ok(Arc::new(PersyEngine {
persy,
write_cache,
timer,
}))
}
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> {
// Create if it doesn't exist
if !self.0.exists_index(name)? {
let mut tx = self.0.begin()?;
tx.create_index::<persy::ByteVec, persy::ByteVec>(name, persy::ValueMode::REPLACE)?;
if !self.persy.exists_index(name)? {
let mut tx = self.persy.begin()?;
tx.create_index::<ByteVec, ByteVec>(name, ValueMode::REPLACE)?;
tx.prepare()?.commit()?;
}
Ok(Arc::new(PersyTree {
db: self.0.clone(),
persy: self.persy.clone(),
name: name.to_owned(),
watchers: RwLock::new(BTreeMap::new()),
write_cache: self.1.clone(),
write_cache: self.write_cache.clone(),
}))
}
}
#[cfg(feature = "persy")]
pub struct PersyTree {
db: persy::Persy,
persy: Persy,
name: String,
watchers: RwLock<BTreeMap<Vec<u8>, Vec<tokio::sync::oneshot::Sender<()>>>>,
write_cache: Arc<RwLock<WriteCache>>,
@ -129,15 +137,31 @@ pub struct PersyTree { @@ -129,15 +137,31 @@ pub struct PersyTree {
pub struct WriteCache {
add_cache: BTreeMap<String, BTreeMap<Vec<u8>, Vec<u8>>>,
remove_cache: BTreeMap<String, BTreeSet<Vec<u8>>>,
changes_count: i32,
changes_count: u32,
last_flush: Instant,
db: persy::Persy,
persy: Persy,
max_size: u32,
max_time_window: Duration,
}
#[cfg(feature = "persy")]
impl WriteCache {
fn new(persy: &Persy, max_size: u32, max_time_window: Duration) -> Self {
Self {
add_cache: Default::default(),
remove_cache: Default::default(),
changes_count: Default::default(),
last_flush: Instant::now(),
persy: persy.clone(),
max_size,
max_time_window,
}
}
}
#[cfg(feature = "persy")]
impl WriteCache {
pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> {
use std::collections::btree_map::Entry;
match self.add_cache.entry(index.clone()) {
Entry::Vacant(s) => {
let mut map = BTreeMap::new();
@ -154,7 +178,6 @@ impl WriteCache { @@ -154,7 +178,6 @@ impl WriteCache {
}
pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> {
use std::collections::btree_map::Entry;
match self.remove_cache.entry(index) {
Entry::Vacant(_) => {}
Entry::Occupied(mut o) => {
@ -165,7 +188,6 @@ impl WriteCache { @@ -165,7 +188,6 @@ impl WriteCache {
}
pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> {
use std::collections::btree_map::Entry;
match self.add_cache.entry(index) {
Entry::Vacant(_) => {}
Entry::Occupied(mut o) => {
@ -176,7 +198,6 @@ impl WriteCache { @@ -176,7 +198,6 @@ impl WriteCache {
}
pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> {
use std::collections::btree_map::Entry;
match self.remove_cache.entry(index.clone()) {
Entry::Vacant(s) => {
let mut map = BTreeSet::new();
@ -193,7 +214,7 @@ impl WriteCache { @@ -193,7 +214,7 @@ impl WriteCache {
}
pub fn check_and_flush(&mut self) -> Result<()> {
self.changes_count += 1;
if self.changes_count > 1000 {
if self.changes_count > self.max_size {
self.flush_changes()?;
self.changes_count = 0;
}
@ -215,8 +236,7 @@ impl WriteCache { @@ -215,8 +236,7 @@ impl WriteCache {
}
fn flush_changes(&mut self) -> Result<()> {
use persy::ByteVec;
let mut tx = self.db.begin()?;
let mut tx = self.persy.begin()?;
for (index, changes) in &self.add_cache {
for (key, value) in changes {
@ -330,7 +350,7 @@ impl WriteCache { @@ -330,7 +350,7 @@ impl WriteCache {
#[allow(unused)]
pub fn flush_timed(&mut self) -> Result<()> {
if self.changes_count > 0 {
if Instant::now() - self.last_flush > Duration::from_secs(2) {
if Instant::now() - self.last_flush > self.max_time_window {
self.flush_changes()?;
}
}
@ -402,9 +422,8 @@ where @@ -402,9 +422,8 @@ where
#[cfg(feature = "persy")]
impl Tree for PersyTree {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
use persy::ByteVec;
let result = self
.db
.persy
.get::<ByteVec, ByteVec>(&self.name, &ByteVec(key.to_vec()))?
.map(|v| v.into_iter().map(|bv| bv.0).next())
.flatten();
@ -454,9 +473,8 @@ impl Tree for PersyTree { @@ -454,9 +473,8 @@ impl Tree for PersyTree {
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + Sync + 'a> {
use persy::ByteVec;
let result = Box::new(
self.db
self.persy
.range::<ByteVec, ByteVec, _>(&self.name, ..)
.unwrap()
.filter_map(|(k, v)| {
@ -474,13 +492,12 @@ impl Tree for PersyTree { @@ -474,13 +492,12 @@ impl Tree for PersyTree {
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
use persy::ByteVec;
let iter = if backwards {
self.db
self.persy
.range::<ByteVec, ByteVec, _>(&self.name, ..ByteVec(from.to_owned()))
.unwrap()
} else {
self.db
self.persy
.range::<ByteVec, ByteVec, _>(&self.name, ByteVec(from.to_owned())..)
.unwrap()
};
@ -512,12 +529,10 @@ impl Tree for PersyTree { @@ -512,12 +529,10 @@ impl Tree for PersyTree {
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + 'a> {
use persy::ByteVec;
let range_prefix = ByteVec(prefix.to_owned());
let owned_prefix = prefix.clone();
let result = Box::new(
self.db
self.persy
.range::<ByteVec, ByteVec, _>(&self.name, range_prefix..)
.unwrap()
.take_while(move |(k, _)| k.0.starts_with(&owned_prefix))

Loading…
Cancel
Save