|
|
|
|
@ -6,6 +6,9 @@ use std::{future::Future, pin::Pin, sync::Arc};
@@ -6,6 +6,9 @@ use std::{future::Future, pin::Pin, sync::Arc};
|
|
|
|
|
#[cfg(any(feature = "persy"))] |
|
|
|
|
use std::{collections::BTreeMap, sync::RwLock}; |
|
|
|
|
|
|
|
|
|
#[cfg(feature = "persy")] |
|
|
|
|
use std::{cmp::Ordering, collections::BTreeSet, iter::Peekable}; |
|
|
|
|
|
|
|
|
|
#[cfg(feature = "sled")] |
|
|
|
|
pub mod sled; |
|
|
|
|
|
|
|
|
|
@ -68,7 +71,7 @@ pub trait Tree: Send + Sync {
@@ -68,7 +71,7 @@ pub trait Tree: Send + Sync {
|
|
|
|
|
// require a massive refactor.
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "persy")] |
|
|
|
|
pub struct PersyEngine(persy::Persy); |
|
|
|
|
pub struct PersyEngine(persy::Persy, Arc<RwLock<WriteCache>>); |
|
|
|
|
|
|
|
|
|
#[cfg(feature = "persy")] |
|
|
|
|
impl DatabaseEngine for PersyEngine { |
|
|
|
|
@ -81,7 +84,14 @@ impl DatabaseEngine for PersyEngine {
@@ -81,7 +84,14 @@ impl DatabaseEngine for PersyEngine {
|
|
|
|
|
.create(true) |
|
|
|
|
.config(cfg) |
|
|
|
|
.open(&format!("{}/db.persy", config.database_path))?; |
|
|
|
|
Ok(Arc::new(PersyEngine(persy))) |
|
|
|
|
|
|
|
|
|
let write_cache = Arc::new(RwLock::new(WriteCache { |
|
|
|
|
add_cache: Default::default(), |
|
|
|
|
remove_cache: Default::default(), |
|
|
|
|
changes_count: Default::default(), |
|
|
|
|
db: persy.clone(), |
|
|
|
|
})); |
|
|
|
|
Ok(Arc::new(PersyEngine(persy, write_cache))) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> { |
|
|
|
|
@ -96,6 +106,7 @@ impl DatabaseEngine for PersyEngine {
@@ -96,6 +106,7 @@ impl DatabaseEngine for PersyEngine {
|
|
|
|
|
db: self.0.clone(), |
|
|
|
|
name: name.to_owned(), |
|
|
|
|
watchers: RwLock::new(BTreeMap::new()), |
|
|
|
|
write_cache: self.1.clone(), |
|
|
|
|
})) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -105,16 +116,277 @@ pub struct PersyTree {
@@ -105,16 +116,277 @@ pub struct PersyTree {
|
|
|
|
|
db: persy::Persy, |
|
|
|
|
name: String, |
|
|
|
|
watchers: RwLock<BTreeMap<Vec<u8>, Vec<tokio::sync::oneshot::Sender<()>>>>, |
|
|
|
|
write_cache: Arc<RwLock<WriteCache>>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[cfg(feature = "persy")] |
|
|
|
|
pub struct WriteCache { |
|
|
|
|
add_cache: BTreeMap<String, BTreeMap<Vec<u8>, Vec<u8>>>, |
|
|
|
|
remove_cache: BTreeMap<String, BTreeSet<Vec<u8>>>, |
|
|
|
|
changes_count: i32, |
|
|
|
|
db: persy::Persy, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[cfg(feature = "persy")] |
|
|
|
|
impl WriteCache { |
|
|
|
|
pub fn insert(&mut self, index: String, key: &[u8], value: &[u8]) -> Result<()> { |
|
|
|
|
use std::collections::btree_map::Entry; |
|
|
|
|
match self.add_cache.entry(index.clone()) { |
|
|
|
|
Entry::Vacant(s) => { |
|
|
|
|
let mut map = BTreeMap::new(); |
|
|
|
|
map.insert(key.to_owned(), value.to_owned()); |
|
|
|
|
s.insert(map); |
|
|
|
|
} |
|
|
|
|
Entry::Occupied(mut o) => { |
|
|
|
|
o.get_mut().insert(key.to_owned(), value.to_owned()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self.remove_remove(index, key)?; |
|
|
|
|
self.check_and_flush()?; |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn remove_remove(&mut self, index: String, key: &[u8]) -> Result<()> { |
|
|
|
|
use std::collections::btree_map::Entry; |
|
|
|
|
match self.remove_cache.entry(index) { |
|
|
|
|
Entry::Vacant(_) => {} |
|
|
|
|
Entry::Occupied(mut o) => { |
|
|
|
|
o.get_mut().remove(key); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn remove_insert(&mut self, index: String, key: &[u8]) -> Result<()> { |
|
|
|
|
use std::collections::btree_map::Entry; |
|
|
|
|
match self.add_cache.entry(index) { |
|
|
|
|
Entry::Vacant(_) => {} |
|
|
|
|
Entry::Occupied(mut o) => { |
|
|
|
|
o.get_mut().remove(key); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn remove(&mut self, index: String, key: &[u8]) -> Result<()> { |
|
|
|
|
use std::collections::btree_map::Entry; |
|
|
|
|
match self.remove_cache.entry(index.clone()) { |
|
|
|
|
Entry::Vacant(s) => { |
|
|
|
|
let mut map = BTreeSet::new(); |
|
|
|
|
map.insert(key.to_owned()); |
|
|
|
|
s.insert(map); |
|
|
|
|
} |
|
|
|
|
Entry::Occupied(mut o) => { |
|
|
|
|
o.get_mut().insert(key.to_owned()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self.remove_insert(index, key)?; |
|
|
|
|
self.check_and_flush()?; |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
pub fn check_and_flush(&mut self) -> Result<()> { |
|
|
|
|
self.changes_count += 1; |
|
|
|
|
if self.changes_count > 1000 { |
|
|
|
|
self.flush_changes()?; |
|
|
|
|
self.changes_count = 0; |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn get(&self, index: &str, key: &[u8], value: Option<Vec<u8>>) -> Result<Option<Vec<u8>>> { |
|
|
|
|
Ok(if let Some(changes) = self.add_cache.get(index) { |
|
|
|
|
changes.get(key).map(|v| v.to_owned()).or(value) |
|
|
|
|
} else if let Some(remove) = self.remove_cache.get(index) { |
|
|
|
|
if remove.contains(key) { |
|
|
|
|
None |
|
|
|
|
} else { |
|
|
|
|
value |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
value |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn flush_changes(&mut self) -> Result<()> { |
|
|
|
|
use persy::ByteVec; |
|
|
|
|
let mut tx = self.db.begin()?; |
|
|
|
|
|
|
|
|
|
for (index, changes) in &self.add_cache { |
|
|
|
|
for (key, value) in changes { |
|
|
|
|
tx.put::<ByteVec, ByteVec>( |
|
|
|
|
&index, |
|
|
|
|
ByteVec(key.to_owned()), |
|
|
|
|
ByteVec(value.to_owned()), |
|
|
|
|
)?; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self.add_cache.clear(); |
|
|
|
|
for (index, changes) in &self.remove_cache { |
|
|
|
|
for key in changes { |
|
|
|
|
tx.remove::<ByteVec, ByteVec>(&index, ByteVec(key.to_owned()), None)?; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self.remove_cache.clear(); |
|
|
|
|
tx.prepare()?.commit()?; |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn iter<'a>( |
|
|
|
|
&self, |
|
|
|
|
index: &str, |
|
|
|
|
mut iter: Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + Sync + 'a>, |
|
|
|
|
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + Sync + 'a> { |
|
|
|
|
if let Some(adds) = self.add_cache.get(index) { |
|
|
|
|
let added = adds.clone().into_iter().map(|(k, v)| (k.into(), v.into())); |
|
|
|
|
iter = Box::new(UnionIter::new(iter, added, false)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if let Some(removes) = self.remove_cache.get(index) { |
|
|
|
|
let to_filter = removes.clone(); |
|
|
|
|
iter = Box::new(iter.filter(move |x| to_filter.contains(&(*x.0).to_owned()))) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
iter |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn iter_from<'a>( |
|
|
|
|
&self, |
|
|
|
|
index: &str, |
|
|
|
|
from: &[u8], |
|
|
|
|
backwards: bool, |
|
|
|
|
mut iter: Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a>, |
|
|
|
|
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> { |
|
|
|
|
if let Some(adds) = self.add_cache.get(index) { |
|
|
|
|
let range = if backwards { |
|
|
|
|
adds.range(..from.to_owned()) |
|
|
|
|
} else { |
|
|
|
|
adds.range(from.to_owned()..) |
|
|
|
|
}; |
|
|
|
|
let added = range |
|
|
|
|
.map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) |
|
|
|
|
.collect::<Vec<(Box<[u8]>, Box<[u8]>)>>(); |
|
|
|
|
let add_iter: Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)>> = if backwards { |
|
|
|
|
Box::new(added.into_iter().rev()) |
|
|
|
|
} else { |
|
|
|
|
Box::new(added.into_iter()) |
|
|
|
|
}; |
|
|
|
|
iter = Box::new(UnionIter::new(iter, add_iter, backwards)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if let Some(removes) = self.remove_cache.get(index) { |
|
|
|
|
let owned_from = from.to_owned(); |
|
|
|
|
let to_filter = removes.iter(); |
|
|
|
|
let to_filter = if backwards { |
|
|
|
|
to_filter |
|
|
|
|
.filter(|x| (..&owned_from).contains(x)) |
|
|
|
|
.cloned() |
|
|
|
|
.collect::<Vec<Vec<u8>>>() |
|
|
|
|
} else { |
|
|
|
|
to_filter |
|
|
|
|
.filter(|x| (&owned_from..).contains(x)) |
|
|
|
|
.cloned() |
|
|
|
|
.collect::<Vec<Vec<u8>>>() |
|
|
|
|
}; |
|
|
|
|
iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) |
|
|
|
|
} |
|
|
|
|
iter |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn scan_prefix<'a>( |
|
|
|
|
&self, |
|
|
|
|
index: &str, |
|
|
|
|
prefix: Vec<u8>, |
|
|
|
|
mut iter: Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + 'a>, |
|
|
|
|
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + 'a> { |
|
|
|
|
if let Some(adds) = self.add_cache.get(index) { |
|
|
|
|
let owned_prefix = prefix.to_owned(); |
|
|
|
|
let added = adds |
|
|
|
|
.range(prefix.to_owned()..) |
|
|
|
|
.take_while(move |(k, _)| k.starts_with(&owned_prefix)) |
|
|
|
|
.map(|(k, v)| (k.to_owned().into(), v.to_owned().into())) |
|
|
|
|
.collect::<Vec<(Box<[u8]>, Box<[u8]>)>>(); |
|
|
|
|
iter = Box::new(UnionIter::new(iter, added.into_iter(), false)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if let Some(removes) = self.remove_cache.get(index) { |
|
|
|
|
let to_filter = removes |
|
|
|
|
.iter() |
|
|
|
|
.filter(move |k| k.starts_with(&prefix)) |
|
|
|
|
.cloned() |
|
|
|
|
.collect::<Vec<Vec<u8>>>(); |
|
|
|
|
iter = Box::new(iter.filter(move |x| !to_filter.contains(&(*x.0).to_owned()))) |
|
|
|
|
} |
|
|
|
|
iter |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[cfg(feature = "persy")] |
|
|
|
|
struct UnionIter<T: Iterator<Item = I>, T1: Iterator<Item = I>, I> { |
|
|
|
|
first: Peekable<T>, |
|
|
|
|
second: Peekable<T1>, |
|
|
|
|
backwards: bool, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[cfg(feature = "persy")] |
|
|
|
|
impl<T: Iterator<Item = I>, T1: Iterator<Item = I>, I> UnionIter<T, T1, I> { |
|
|
|
|
fn new(first: T, second: T1, backwards: bool) -> Self { |
|
|
|
|
UnionIter { |
|
|
|
|
first: first.peekable(), |
|
|
|
|
second: second.peekable(), |
|
|
|
|
backwards, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[cfg(feature = "persy")] |
|
|
|
|
impl<K: Ord, V, T, T1> Iterator for UnionIter<T, T1, (K, V)> |
|
|
|
|
where |
|
|
|
|
T: Iterator<Item = (K, V)>, |
|
|
|
|
T1: Iterator<Item = (K, V)>, |
|
|
|
|
{ |
|
|
|
|
type Item = (K, V); |
|
|
|
|
fn next(&mut self) -> Option<Self::Item> { |
|
|
|
|
if let (Some(f), Some(s)) = (self.first.peek(), self.second.peek()) { |
|
|
|
|
if self.backwards { |
|
|
|
|
match f.0.cmp(&s.0) { |
|
|
|
|
Ordering::Less => self.second.next(), |
|
|
|
|
Ordering::Greater => self.first.next(), |
|
|
|
|
Ordering::Equal => { |
|
|
|
|
self.first.next(); |
|
|
|
|
self.second.next() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
match f.0.cmp(&s.0) { |
|
|
|
|
Ordering::Less => self.first.next(), |
|
|
|
|
Ordering::Greater => self.second.next(), |
|
|
|
|
Ordering::Equal => { |
|
|
|
|
self.first.next(); |
|
|
|
|
self.second.next() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
self.first.next().or_else(|| self.second.next()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[cfg(feature = "persy")] |
|
|
|
|
impl Tree for PersyTree { |
|
|
|
|
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { |
|
|
|
|
Ok(self |
|
|
|
|
use persy::ByteVec; |
|
|
|
|
let result = self |
|
|
|
|
.db |
|
|
|
|
.get::<persy::ByteVec, persy::ByteVec>(&self.name, &persy::ByteVec(key.to_vec()))? |
|
|
|
|
.get::<ByteVec, ByteVec>(&self.name, &ByteVec(key.to_vec()))? |
|
|
|
|
.map(|v| v.into_iter().map(|bv| bv.0).next()) |
|
|
|
|
.flatten()) |
|
|
|
|
.flatten(); |
|
|
|
|
let result = self |
|
|
|
|
.write_cache |
|
|
|
|
.read() |
|
|
|
|
.unwrap() |
|
|
|
|
.get(&self.name, key, result)?; |
|
|
|
|
Ok(result) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { |
|
|
|
|
@ -139,39 +411,35 @@ impl Tree for PersyTree {
@@ -139,39 +411,35 @@ impl Tree for PersyTree {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let mut tx = self.db.begin()?; |
|
|
|
|
tx.put::<persy::ByteVec, persy::ByteVec>( |
|
|
|
|
&self.name, |
|
|
|
|
persy::ByteVec(key.to_owned()), |
|
|
|
|
persy::ByteVec(value.to_owned()), |
|
|
|
|
)?; |
|
|
|
|
tx.prepare()?.commit()?; |
|
|
|
|
self.write_cache |
|
|
|
|
.write() |
|
|
|
|
.unwrap() |
|
|
|
|
.insert(self.name.clone(), key, value)?; |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn remove(&self, key: &[u8]) -> Result<()> { |
|
|
|
|
let mut tx = self.db.begin()?; |
|
|
|
|
tx.remove::<persy::ByteVec, persy::ByteVec>( |
|
|
|
|
&self.name, |
|
|
|
|
persy::ByteVec(key.to_owned()), |
|
|
|
|
None, |
|
|
|
|
)?; |
|
|
|
|
tx.prepare()?.commit()?; |
|
|
|
|
self.write_cache |
|
|
|
|
.write() |
|
|
|
|
.unwrap() |
|
|
|
|
.remove(self.name.clone(), key)?; |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + Sync + 'a> { |
|
|
|
|
Box::new( |
|
|
|
|
use persy::ByteVec; |
|
|
|
|
let result = Box::new( |
|
|
|
|
self.db |
|
|
|
|
.range::<persy::ByteVec, persy::ByteVec, _>(&self.name, ..) |
|
|
|
|
.range::<ByteVec, ByteVec, _>(&self.name, ..) |
|
|
|
|
.unwrap() |
|
|
|
|
.filter_map(|(k, v)| { |
|
|
|
|
v.into_iter() |
|
|
|
|
.map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) |
|
|
|
|
.next() |
|
|
|
|
}), |
|
|
|
|
) |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
self.write_cache.read().unwrap().iter(&self.name, result) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn iter_from<'a>( |
|
|
|
|
@ -179,19 +447,14 @@ impl Tree for PersyTree {
@@ -179,19 +447,14 @@ impl Tree for PersyTree {
|
|
|
|
|
from: &[u8], |
|
|
|
|
backwards: bool, |
|
|
|
|
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> { |
|
|
|
|
use persy::ByteVec; |
|
|
|
|
let iter = if backwards { |
|
|
|
|
self.db |
|
|
|
|
.range::<persy::ByteVec, persy::ByteVec, _>( |
|
|
|
|
&self.name, |
|
|
|
|
..persy::ByteVec(from.to_owned()), |
|
|
|
|
) |
|
|
|
|
.range::<ByteVec, ByteVec, _>(&self.name, ..ByteVec(from.to_owned())) |
|
|
|
|
.unwrap() |
|
|
|
|
} else { |
|
|
|
|
self.db |
|
|
|
|
.range::<persy::ByteVec, persy::ByteVec, _>( |
|
|
|
|
&self.name, |
|
|
|
|
persy::ByteVec(from.to_owned()).., |
|
|
|
|
) |
|
|
|
|
.range::<ByteVec, ByteVec, _>(&self.name, ByteVec(from.to_owned())..) |
|
|
|
|
.unwrap() |
|
|
|
|
}; |
|
|
|
|
let map = iter.filter_map(|(k, v)| { |
|
|
|
|
@ -199,11 +462,16 @@ impl Tree for PersyTree {
@@ -199,11 +462,16 @@ impl Tree for PersyTree {
|
|
|
|
|
.map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) |
|
|
|
|
.next() |
|
|
|
|
}); |
|
|
|
|
if backwards { |
|
|
|
|
let result: Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)>> = if backwards { |
|
|
|
|
Box::new(map.rev()) |
|
|
|
|
} else { |
|
|
|
|
Box::new(map) |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
self.write_cache |
|
|
|
|
.read() |
|
|
|
|
.unwrap() |
|
|
|
|
.iter_from(&self.name, from, backwards, result) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> { |
|
|
|
|
@ -217,21 +485,26 @@ impl Tree for PersyTree {
@@ -217,21 +485,26 @@ impl Tree for PersyTree {
|
|
|
|
|
&'a self, |
|
|
|
|
prefix: Vec<u8>, |
|
|
|
|
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + 'a> { |
|
|
|
|
let range_prefix = persy::ByteVec(prefix.to_owned()); |
|
|
|
|
Box::new( |
|
|
|
|
use persy::ByteVec; |
|
|
|
|
|
|
|
|
|
let range_prefix = ByteVec(prefix.to_owned()); |
|
|
|
|
let owned_prefix = prefix.clone(); |
|
|
|
|
let result = Box::new( |
|
|
|
|
self.db |
|
|
|
|
.range::<persy::ByteVec, persy::ByteVec, _>( |
|
|
|
|
&self.name, |
|
|
|
|
range_prefix.., |
|
|
|
|
) |
|
|
|
|
.range::<ByteVec, ByteVec, _>(&self.name, range_prefix..) |
|
|
|
|
.unwrap() |
|
|
|
|
.take_while(move |(k, _)| k.0.starts_with(&prefix)) |
|
|
|
|
.take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) |
|
|
|
|
.filter_map(|(k, v)| { |
|
|
|
|
v.into_iter() |
|
|
|
|
.map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) |
|
|
|
|
.next() |
|
|
|
|
}), |
|
|
|
|
) |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
self.write_cache |
|
|
|
|
.read() |
|
|
|
|
.unwrap() |
|
|
|
|
.scan_prefix(&self.name, prefix, result) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
|
|
|
|
|