diff --git a/src/database.rs b/src/database.rs index 699abbd..42da1c5 100644 --- a/src/database.rs +++ b/src/database.rs @@ -139,7 +139,7 @@ pub type Engine = abstraction::sqlite::Engine; pub type Engine = abstraction::heed::Engine; #[cfg(feature = "persy")] -pub type Engine = abstraction::PersyEngine; +pub type Engine = abstraction::persy::PersyEngine; pub struct Database { _db: Arc, diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index d89a18f..b1fd87e 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -3,32 +3,12 @@ use crate::Result; use std::{future::Future, pin::Pin, sync::Arc}; -#[cfg(any(feature = "persy"))] -use std::{collections::BTreeMap, sync::RwLock}; - -#[cfg(feature = "persy")] -use std::{ - cmp::Ordering, - collections::{btree_map::Entry, BTreeSet}, - iter::Peekable, - time::{Duration, Instant}, -}; - -#[cfg(feature = "persy")] -use persy::{ByteVec, OpenOptions, Persy, ValueMode}; - -#[cfg(feature = "persy")] -use timer::Timer; - -#[cfg(feature = "sled")] -pub mod sled; - -#[cfg(feature = "sqlite")] -pub mod sqlite; - #[cfg(feature = "heed")] pub mod heed; +#[cfg(feature = "persy")] +pub mod persy; + pub trait DatabaseEngine: Sized { fn open(config: &Config) -> Result>; fn open_tree(self: &Arc, name: &'static str) -> Result>; @@ -70,521 +50,3 @@ pub trait Tree: Send + Sync { } } -#[cfg(feature = "persy")] -pub struct PersyEngine { - persy: Persy, - write_cache: Arc>, - #[allow(unused)] - timer: Timer, -} - -#[cfg(feature = "persy")] -impl DatabaseEngine for PersyEngine { - fn open(config: &Config) -> Result> { - let mut cfg = persy::Config::new(); - cfg.change_cache_size(config.db_cache_capacity_mb as u64 *1048576f64 as u64); - - let persy = OpenOptions::new() - .create(true) - .config(cfg) - .open(&format!("{}/db.persy", config.database_path))?; - let write_cache_size = 1000; - let write_cache_window_millisecs = 1000; - let write_cache = Arc::new(RwLock::new(WriteCache::new( - &persy, - write_cache_size, - Duration::from_millis(write_cache_window_millisecs), - ))); - let timer = Timer::new(); - let timer_write_cache = write_cache.clone(); - timer.schedule_repeating( - chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), - move || timer_write_cache.write().unwrap().flush_timed().unwrap(), - ); - Ok(Arc::new(PersyEngine { - persy, - write_cache, - timer, - })) - } - - fn open_tree(self: &Arc, name: &'static str) -> Result> { - // Create if it doesn't exist - if !self.persy.exists_index(name)? { - let mut tx = self.persy.begin()?; - tx.create_index::(name, ValueMode::REPLACE)?; - tx.prepare()?.commit()?; - } - - Ok(Arc::new(PersyTree { - persy: self.persy.clone(), - name: name.to_owned(), - watchers: RwLock::new(BTreeMap::new()), - write_cache: self.write_cache.clone(), - })) - } - - fn flush(self: &Arc) -> Result<()> { - Ok(()) - } -} - -#[cfg(feature = "persy")] -pub struct PersyTree { - persy: Persy, - name: String, - watchers: RwLock, Vec>>>, - write_cache: Arc>, -} - -#[cfg(feature = "persy")] -pub struct WriteCache { - add_cache: BTreeMap, Vec>>, - remove_cache: BTreeMap>>, - changes_count: u32, - last_flush: Instant, - persy: Persy, - max_size: u32, - max_time_window: Duration, -} - -#[cfg(feature = "persy")] -impl WriteCache { - fn new(persy: &Persy, max_size: u32, max_time_window: Duration) -> Self { - Self { - add_cache: Default::default(), - remove_cache: Default::default(), - changes_count: Default::default(), - last_flush: Instant::now(), - persy: persy.clone(), - max_size, - max_time_window, - } - } -} - -#[cfg(feature = "persy")] -impl WriteCache { - pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { - match self.add_cache.entry(index.clone()) { - Entry::Vacant(s) => { - let mut map = BTreeMap::new(); - map.insert(key.to_owned(), value.to_owned()); - s.insert(map); - } - Entry::Occupied(mut o) => { - o.get_mut().insert(key.to_owned(), value.to_owned()); - } - } - self.remove_remove(index, key)?; - self.check_and_flush()?; - Ok(()) - } - - pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { - match self.remove_cache.entry(index) { - Entry::Vacant(_) => {} - Entry::Occupied(mut o) => { - o.get_mut().remove(key); - } - } - Ok(()) - } - - pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { - match self.add_cache.entry(index) { - Entry::Vacant(_) => {} - Entry::Occupied(mut o) => { - o.get_mut().remove(key); - } - } - Ok(()) - } - - pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { - match self.remove_cache.entry(index.clone()) { - Entry::Vacant(s) => { - let mut map = BTreeSet::new(); - map.insert(key.to_owned()); - s.insert(map); - } - Entry::Occupied(mut o) => { - o.get_mut().insert(key.to_owned()); - } - } - self.remove_insert(index, key)?; - self.check_and_flush()?; - Ok(()) - } - pub fn check_and_flush(&mut self) -> Result<()> { - self.changes_count += 1; - if self.changes_count > self.max_size { - self.flush_changes()?; - self.changes_count = 0; - } - Ok(()) - } - - pub fn get(&self, index: &str, key: &[u8], value: Option>) -> Result>> { - Ok(if let Some(changes) = self.add_cache.get(index) { - changes.get(key).map(|v| v.to_owned()).or(value) - } else if let Some(remove) = self.remove_cache.get(index) { - if remove.contains(key) { - None - } else { - value - } - } else { - value - }) - } - - fn flush_changes(&mut self) -> Result<()> { - let mut tx = self.persy.begin()?; - - for (index, changes) in &self.add_cache { - for (key, value) in changes { - tx.put::( - &index, - ByteVec(key.to_owned()), - ByteVec(value.to_owned()), - )?; - } - } - self.add_cache.clear(); - for (index, changes) in &self.remove_cache { - for key in changes { - tx.remove::(&index, ByteVec(key.to_owned()), None)?; - } - } - self.remove_cache.clear(); - tx.prepare()?.commit()?; - self.last_flush = Instant::now(); - Ok(()) - } - - pub fn iter<'a>( - &self, - index: &str, - mut iter: Box, Vec)> + Send + Sync + 'a>, - ) -> Box, Vec)> + Send + Sync + 'a> { - if let Some(adds) = self.add_cache.get(index) { - let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); - iter = Box::new(UnionIter::new(iter, added, false)) - } - - if let Some(removes) = self.remove_cache.get(index) { - let to_filter = removes.clone(); - iter = Box::new(iter.filter(move |x| to_filter.contains(&(*x.0).to_owned()))) - } - - iter - } - - fn iter_from<'a>( - &self, - index: &str, - from: &[u8], - backwards: bool, - mut iter: Box, Vec)> + Send+ 'a>, - ) -> Box, Vec)> + Send + 'a> { - if let Some(adds) = self.add_cache.get(index) { - let range = if backwards { - adds.range(..from.to_owned()) - } else { - adds.range(from.to_owned()..) - }; - let added = range - .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) - .collect::, Vec)>>(); - let add_iter: Box, Vec)> + Send> = if backwards { - Box::new(added.into_iter().rev()) - } else { - Box::new(added.into_iter()) - }; - iter = Box::new(UnionIter::new(iter, add_iter, backwards)) - } - - if let Some(removes) = self.remove_cache.get(index) { - let owned_from = from.to_owned(); - let to_filter = removes.iter(); - let to_filter = if backwards { - to_filter - .filter(|x| (..&owned_from).contains(x)) - .cloned() - .collect::>>() - } else { - to_filter - .filter(|x| (&owned_from..).contains(x)) - .cloned() - .collect::>>() - }; - iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) - } - iter - } - - fn scan_prefix<'a>( - &self, - index: &str, - prefix: Vec, - mut iter: Box, Vec)> + Send + 'a>, - ) -> Box, Vec)> + Send + 'a> { - if let Some(adds) = self.add_cache.get(index) { - let owned_prefix = prefix.to_owned(); - let added = adds - .range(prefix.to_owned()..) - .take_while(move |(k, _)| k.starts_with(&owned_prefix)) - .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) - .collect::, Vec)>>(); - iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) - } - - if let Some(removes) = self.remove_cache.get(index) { - let to_filter = removes - .iter() - .filter(move |k| k.starts_with(&prefix)) - .cloned() - .collect::>>(); - iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) - } - iter - } - - #[allow(unused)] - pub fn flush_timed(&mut self) -> Result<()> { - if self.changes_count > 0 { - if Instant::now() - self.last_flush > self.max_time_window { - self.flush_changes()?; - } - } - Ok(()) - } -} - -#[cfg(feature = "persy")] -impl Drop for WriteCache { - fn drop(&mut self) { - if self.changes_count > 0 { - self.flush_changes().unwrap(); - } - } -} - -#[cfg(feature = "persy")] -struct UnionIter, T1: Iterator, I> { - first: Peekable, - second: Peekable, - backwards: bool, -} - -#[cfg(feature = "persy")] -impl, T1: Iterator, I> UnionIter { - fn new(first: T, second: T1, backwards: bool) -> Self { - UnionIter { - first: first.peekable(), - second: second.peekable(), - backwards, - } - } -} - -#[cfg(feature = "persy")] -impl Iterator for UnionIter -where - T: Iterator, - T1: Iterator, -{ - type Item = (K, V); - fn next(&mut self) -> Option { - if let (Some(f), Some(s)) = (self.first.peek(), self.second.peek()) { - if self.backwards { - match f.0.cmp(&s.0) { - Ordering::Less => self.second.next(), - Ordering::Greater => self.first.next(), - Ordering::Equal => { - self.first.next(); - self.second.next() - } - } - } else { - match f.0.cmp(&s.0) { - Ordering::Less => self.first.next(), - Ordering::Greater => self.second.next(), - Ordering::Equal => { - self.first.next(); - self.second.next() - } - } - } - } else { - self.first.next().or_else(|| self.second.next()) - } - } -} - -#[cfg(feature = "persy")] -impl Tree for PersyTree { - fn get(&self, key: &[u8]) -> Result>> { - let result = self - .persy - .get::(&self.name, &ByteVec(key.to_vec()))? - .map(|v| v.into_iter().map(|bv| bv.0).next()) - .flatten(); - let result = self - .write_cache - .read() - .unwrap() - .get(&self.name, key, result)?; - Ok(result) - } - - fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { - let watchers = self.watchers.read().unwrap(); - let mut triggered = Vec::new(); - - for length in 0..=key.len() { - if watchers.contains_key(&key[..length]) { - triggered.push(&key[..length]); - } - } - - drop(watchers); - - if !triggered.is_empty() { - let mut watchers = self.watchers.write().unwrap(); - for prefix in triggered { - if let Some(txs) = watchers.remove(prefix) { - for tx in txs { - let _ = tx.send(()); - } - } - } - } - self.write_cache - .write() - .unwrap() - .insert(self.name.clone(), key, value)?; - Ok(()) - } - - fn remove(&self, key: &[u8]) -> Result<()> { - self.write_cache - .write() - .unwrap() - .remove(self.name.clone(), key)?; - Ok(()) - } - - fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { - let iter = self.persy.range::(&self.name, ..); - match iter { - Ok(iter) => { - let result = Box::new(iter.filter_map(|(k, v)| { - v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) - .next() - })); - - self.write_cache.read().unwrap().iter(&self.name, result) - } - Err(e) => { - log::warn!("error iterating {:?}", e); - Box::new(std::iter::empty()) - } - } - } - - fn iter_from<'a>( - &'a self, - from: &[u8], - backwards: bool, - ) -> Box, Vec)> + Send + 'a> { - let range = if backwards { - self.persy - .range::(&self.name, ..ByteVec(from.to_owned())) - } else { - self.persy - .range::(&self.name, ByteVec(from.to_owned())..) - }; - match range { - Ok(iter) => { - let map = iter.filter_map(|(k, v)| { - v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) - .next() - }); - let result: Box, Vec)> + Send+'a > = if backwards { - Box::new(map.rev()) - } else { - Box::new(map) - }; - - self.write_cache - .read() - .unwrap() - .iter_from(&self.name, from, backwards, result) - } - Err(e) => { - log::warn!("error iterating with prefix {:?}", e); - Box::new(std::iter::empty()) - } - } - } - - fn increment(&self, key: &[u8]) -> Result> { - let old = self.get(key)?; - let new = crate::utils::increment(old.as_deref()).unwrap(); - self.insert(key, &new)?; - Ok(new) - } - - fn scan_prefix<'a>( - &'a self, - prefix: Vec, - ) -> Box, Vec)> + Send+ 'a> { - let range_prefix = ByteVec(prefix.to_owned()); - let range = self - .persy - .range::(&self.name, range_prefix..); - - match range { - Ok(iter) => { - let owned_prefix = prefix.clone(); - let result = Box::new( - iter.take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) - .filter_map(|(k, v)| { - v.into_iter() - .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) - .next() - }), - ); - - self.write_cache - .read() - .unwrap() - .scan_prefix(&self.name, prefix, result) - } - Err(e) => { - log::warn!("error scanning prefix {:?}", e); - Box::new(std::iter::empty()) - } - } - } - - fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.watchers - .write() - .unwrap() - .entry(prefix.to_vec()) - .or_default() - .push(tx); - - Box::pin(async move { - // Tx is never destroyed - rx.await.unwrap(); - }) - } -} diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs new file mode 100644 index 0000000..bae43e7 --- /dev/null +++ b/src/database/abstraction/persy.rs @@ -0,0 +1,526 @@ +use persy::{ByteVec, OpenOptions, Persy, ValueMode}; +use timer::Timer; +use crate::database::abstraction::{DatabaseEngine, Tree}; +use crate::database::Config; +use crate::Result; + +use std::{ + future::Future, pin::Pin, + cmp::Ordering, + collections::{BTreeMap,btree_map::Entry, BTreeSet}, + iter::Peekable, + time::{Duration, Instant}, + sync::{RwLock, Arc} +}; + +pub struct PersyEngine { + persy: Persy, + write_cache: Arc>, + #[allow(unused)] + timer: Timer, +} + +impl DatabaseEngine for PersyEngine { + fn open(config: &Config) -> Result> { + let mut cfg = persy::Config::new(); + cfg.change_cache_size(config.db_cache_capacity_mb as u64 *1048576f64 as u64); + + let persy = OpenOptions::new() + .create(true) + .config(cfg) + .open(&format!("{}/db.persy", config.database_path))?; + let write_cache_size = 1000; + let write_cache_window_millisecs = 1000; + let write_cache = Arc::new(RwLock::new(WriteCache::new( + &persy, + write_cache_size, + Duration::from_millis(write_cache_window_millisecs), + ))); + let timer = Timer::new(); + let timer_write_cache = write_cache.clone(); + timer.schedule_repeating( + chrono::Duration::milliseconds((write_cache_window_millisecs / 4) as i64), + move || timer_write_cache.write().unwrap().flush_timed().unwrap(), + ); + Ok(Arc::new(PersyEngine { + persy, + write_cache, + timer, + })) + } + + fn open_tree(self: &Arc, name: &'static str) -> Result> { + // Create if it doesn't exist + if !self.persy.exists_index(name)? { + let mut tx = self.persy.begin()?; + tx.create_index::(name, ValueMode::REPLACE)?; + tx.prepare()?.commit()?; + } + + Ok(Arc::new(PersyTree { + persy: self.persy.clone(), + name: name.to_owned(), + watchers: RwLock::new(BTreeMap::new()), + write_cache: self.write_cache.clone(), + })) + } + + fn flush(self: &Arc) -> Result<()> { + Ok(()) + } +} + +pub struct PersyTree { + persy: Persy, + name: String, + watchers: RwLock, Vec>>>, + write_cache: Arc>, +} + +pub struct WriteCache { + add_cache: BTreeMap, Vec>>, + remove_cache: BTreeMap>>, + changes_count: u32, + last_flush: Instant, + persy: Persy, + max_size: u32, + max_time_window: Duration, +} + +impl WriteCache { + fn new(persy: &Persy, max_size: u32, max_time_window: Duration) -> Self { + Self { + add_cache: Default::default(), + remove_cache: Default::default(), + changes_count: Default::default(), + last_flush: Instant::now(), + persy: persy.clone(), + max_size, + max_time_window, + } + } +} + +impl WriteCache { + pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { + match self.add_cache.entry(index.clone()) { + Entry::Vacant(s) => { + let mut map = BTreeMap::new(); + map.insert(key.to_owned(), value.to_owned()); + s.insert(map); + } + Entry::Occupied(mut o) => { + o.get_mut().insert(key.to_owned(), value.to_owned()); + } + } + self.remove_remove(index, key)?; + self.check_and_flush()?; + Ok(()) + } + + pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { + match self.remove_cache.entry(index) { + Entry::Vacant(_) => {} + Entry::Occupied(mut o) => { + o.get_mut().remove(key); + } + } + Ok(()) + } + + pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { + match self.add_cache.entry(index) { + Entry::Vacant(_) => {} + Entry::Occupied(mut o) => { + o.get_mut().remove(key); + } + } + Ok(()) + } + + pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { + match self.remove_cache.entry(index.clone()) { + Entry::Vacant(s) => { + let mut map = BTreeSet::new(); + map.insert(key.to_owned()); + s.insert(map); + } + Entry::Occupied(mut o) => { + o.get_mut().insert(key.to_owned()); + } + } + self.remove_insert(index, key)?; + self.check_and_flush()?; + Ok(()) + } + pub fn check_and_flush(&mut self) -> Result<()> { + self.changes_count += 1; + if self.changes_count > self.max_size { + self.flush_changes()?; + self.changes_count = 0; + } + Ok(()) + } + + pub fn get(&self, index: &str, key: &[u8], value: Option>) -> Result>> { + Ok(if let Some(changes) = self.add_cache.get(index) { + changes.get(key).map(|v| v.to_owned()).or(value) + } else if let Some(remove) = self.remove_cache.get(index) { + if remove.contains(key) { + None + } else { + value + } + } else { + value + }) + } + + fn flush_changes(&mut self) -> Result<()> { + let mut tx = self.persy.begin()?; + + for (index, changes) in &self.add_cache { + for (key, value) in changes { + tx.put::( + &index, + ByteVec(key.to_owned()), + ByteVec(value.to_owned()), + )?; + } + } + self.add_cache.clear(); + for (index, changes) in &self.remove_cache { + for key in changes { + tx.remove::(&index, ByteVec(key.to_owned()), None)?; + } + } + self.remove_cache.clear(); + tx.prepare()?.commit()?; + self.last_flush = Instant::now(); + Ok(()) + } + + pub fn iter<'a>( + &self, + index: &str, + mut iter: Box, Vec)> + Send + Sync + 'a>, + ) -> Box, Vec)> + Send + Sync + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); + iter = Box::new(UnionIter::new(iter, added, false)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let to_filter = removes.clone(); + iter = Box::new(iter.filter(move |x| to_filter.contains(&(*x.0).to_owned()))) + } + + iter + } + + fn iter_from<'a>( + &self, + index: &str, + from: &[u8], + backwards: bool, + mut iter: Box, Vec)> + Send+ 'a>, + ) -> Box, Vec)> + Send + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let range = if backwards { + adds.range(..from.to_owned()) + } else { + adds.range(from.to_owned()..) + }; + let added = range + .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) + .collect::, Vec)>>(); + let add_iter: Box, Vec)> + Send> = if backwards { + Box::new(added.into_iter().rev()) + } else { + Box::new(added.into_iter()) + }; + iter = Box::new(UnionIter::new(iter, add_iter, backwards)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let owned_from = from.to_owned(); + let to_filter = removes.iter(); + let to_filter = if backwards { + to_filter + .filter(|x| (..&owned_from).contains(x)) + .cloned() + .collect::>>() + } else { + to_filter + .filter(|x| (&owned_from..).contains(x)) + .cloned() + .collect::>>() + }; + iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) + } + iter + } + + fn scan_prefix<'a>( + &self, + index: &str, + prefix: Vec, + mut iter: Box, Vec)> + Send + 'a>, + ) -> Box, Vec)> + Send + 'a> { + if let Some(adds) = self.add_cache.get(index) { + let owned_prefix = prefix.to_owned(); + let added = adds + .range(prefix.to_owned()..) + .take_while(move |(k, _)| k.starts_with(&owned_prefix)) + .map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) + .collect::, Vec)>>(); + iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) + } + + if let Some(removes) = self.remove_cache.get(index) { + let to_filter = removes + .iter() + .filter(move |k| k.starts_with(&prefix)) + .cloned() + .collect::>>(); + iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) + } + iter + } + + #[allow(unused)] + pub fn flush_timed(&mut self) -> Result<()> { + if self.changes_count > 0 { + if Instant::now() - self.last_flush > self.max_time_window { + self.flush_changes()?; + } + } + Ok(()) + } +} + +#[cfg(feature = "persy")] +impl Drop for WriteCache { + fn drop(&mut self) { + if self.changes_count > 0 { + self.flush_changes().unwrap(); + } + } +} + +#[cfg(feature = "persy")] +struct UnionIter, T1: Iterator, I> { + first: Peekable, + second: Peekable, + backwards: bool, +} + +#[cfg(feature = "persy")] +impl, T1: Iterator, I> UnionIter { + fn new(first: T, second: T1, backwards: bool) -> Self { + UnionIter { + first: first.peekable(), + second: second.peekable(), + backwards, + } + } +} + +#[cfg(feature = "persy")] +impl Iterator for UnionIter +where + T: Iterator, + T1: Iterator, +{ + type Item = (K, V); + fn next(&mut self) -> Option { + if let (Some(f), Some(s)) = (self.first.peek(), self.second.peek()) { + if self.backwards { + match f.0.cmp(&s.0) { + Ordering::Less => self.second.next(), + Ordering::Greater => self.first.next(), + Ordering::Equal => { + self.first.next(); + self.second.next() + } + } + } else { + match f.0.cmp(&s.0) { + Ordering::Less => self.first.next(), + Ordering::Greater => self.second.next(), + Ordering::Equal => { + self.first.next(); + self.second.next() + } + } + } + } else { + self.first.next().or_else(|| self.second.next()) + } + } +} + +impl Tree for PersyTree { + fn get(&self, key: &[u8]) -> Result>> { + let result = self + .persy + .get::(&self.name, &ByteVec(key.to_vec()))? + .map(|v| v.into_iter().map(|bv| bv.0).next()) + .flatten(); + let result = self + .write_cache + .read() + .unwrap() + .get(&self.name, key, result)?; + Ok(result) + } + + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + let watchers = self.watchers.read().unwrap(); + let mut triggered = Vec::new(); + + for length in 0..=key.len() { + if watchers.contains_key(&key[..length]) { + triggered.push(&key[..length]); + } + } + + drop(watchers); + + if !triggered.is_empty() { + let mut watchers = self.watchers.write().unwrap(); + for prefix in triggered { + if let Some(txs) = watchers.remove(prefix) { + for tx in txs { + let _ = tx.send(()); + } + } + } + } + self.write_cache + .write() + .unwrap() + .insert(self.name.clone(), key, value)?; + Ok(()) + } + + fn remove(&self, key: &[u8]) -> Result<()> { + self.write_cache + .write() + .unwrap() + .remove(self.name.clone(), key)?; + Ok(()) + } + + fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { + let iter = self.persy.range::(&self.name, ..); + match iter { + Ok(iter) => { + let result = Box::new(iter.filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + })); + + self.write_cache.read().unwrap().iter(&self.name, result) + } + Err(e) => { + log::warn!("error iterating {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + fn iter_from<'a>( + &'a self, + from: &[u8], + backwards: bool, + ) -> Box, Vec)> + Send + 'a> { + let range = if backwards { + self.persy + .range::(&self.name, ..ByteVec(from.to_owned())) + } else { + self.persy + .range::(&self.name, ByteVec(from.to_owned())..) + }; + match range { + Ok(iter) => { + let map = iter.filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }); + let result: Box, Vec)> + Send+'a > = if backwards { + Box::new(map.rev()) + } else { + Box::new(map) + }; + + self.write_cache + .read() + .unwrap() + .iter_from(&self.name, from, backwards, result) + } + Err(e) => { + log::warn!("error iterating with prefix {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + fn increment(&self, key: &[u8]) -> Result> { + let old = self.get(key)?; + let new = crate::utils::increment(old.as_deref()).unwrap(); + self.insert(key, &new)?; + Ok(new) + } + + fn scan_prefix<'a>( + &'a self, + prefix: Vec, + ) -> Box, Vec)> + Send+ 'a> { + let range_prefix = ByteVec(prefix.to_owned()); + let range = self + .persy + .range::(&self.name, range_prefix..); + + match range { + Ok(iter) => { + let owned_prefix = prefix.clone(); + let result = Box::new( + iter.take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) + .filter_map(|(k, v)| { + v.into_iter() + .map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) + .next() + }), + ); + + self.write_cache + .read() + .unwrap() + .scan_prefix(&self.name, prefix, result) + } + Err(e) => { + log::warn!("error scanning prefix {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.watchers + .write() + .unwrap() + .entry(prefix.to_vec()) + .or_default() + .push(tx); + + Box::pin(async move { + // Tx is never destroyed + rx.await.unwrap(); + }) + } +}