|
|
|
@ -473,18 +473,22 @@ impl Tree for PersyTree { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + Sync + 'a> { |
|
|
|
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + Sync + 'a> { |
|
|
|
let result = Box::new( |
|
|
|
let iter = self.persy.range::<ByteVec, ByteVec, _>(&self.name, ..); |
|
|
|
self.persy |
|
|
|
match iter { |
|
|
|
.range::<ByteVec, ByteVec, _>(&self.name, ..) |
|
|
|
Ok(iter) => { |
|
|
|
.unwrap() |
|
|
|
let result = Box::new(iter.filter_map(|(k, v)| { |
|
|
|
.filter_map(|(k, v)| { |
|
|
|
|
|
|
|
v.into_iter() |
|
|
|
v.into_iter() |
|
|
|
.map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) |
|
|
|
.map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) |
|
|
|
.next() |
|
|
|
.next() |
|
|
|
}), |
|
|
|
})); |
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.write_cache.read().unwrap().iter(&self.name, result) |
|
|
|
self.write_cache.read().unwrap().iter(&self.name, result) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
Err(e) => { |
|
|
|
|
|
|
|
log::warn!("error iterating {:?}", e); |
|
|
|
|
|
|
|
Box::new(std::iter::empty()) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn iter_from<'a>( |
|
|
|
fn iter_from<'a>( |
|
|
|
@ -492,30 +496,36 @@ impl Tree for PersyTree { |
|
|
|
from: &[u8], |
|
|
|
from: &[u8], |
|
|
|
backwards: bool, |
|
|
|
backwards: bool, |
|
|
|
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> { |
|
|
|
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> { |
|
|
|
let iter = if backwards { |
|
|
|
let range = if backwards { |
|
|
|
self.persy |
|
|
|
self.persy |
|
|
|
.range::<ByteVec, ByteVec, _>(&self.name, ..ByteVec(from.to_owned())) |
|
|
|
.range::<ByteVec, ByteVec, _>(&self.name, ..ByteVec(from.to_owned())) |
|
|
|
.unwrap() |
|
|
|
|
|
|
|
} else { |
|
|
|
} else { |
|
|
|
self.persy |
|
|
|
self.persy |
|
|
|
.range::<ByteVec, ByteVec, _>(&self.name, ByteVec(from.to_owned())..) |
|
|
|
.range::<ByteVec, ByteVec, _>(&self.name, ByteVec(from.to_owned())..) |
|
|
|
.unwrap() |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
let map = iter.filter_map(|(k, v)| { |
|
|
|
|
|
|
|
v.into_iter() |
|
|
|
|
|
|
|
.map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) |
|
|
|
|
|
|
|
.next() |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
let result: Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)>> = if backwards { |
|
|
|
|
|
|
|
Box::new(map.rev()) |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
Box::new(map) |
|
|
|
|
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
match range { |
|
|
|
self.write_cache |
|
|
|
Ok(iter) => { |
|
|
|
.read() |
|
|
|
let map = iter.filter_map(|(k, v)| { |
|
|
|
.unwrap() |
|
|
|
v.into_iter() |
|
|
|
.iter_from(&self.name, from, backwards, result) |
|
|
|
.map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) |
|
|
|
|
|
|
|
.next() |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
let result: Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)>> = if backwards { |
|
|
|
|
|
|
|
Box::new(map.rev()) |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
Box::new(map) |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.write_cache |
|
|
|
|
|
|
|
.read() |
|
|
|
|
|
|
|
.unwrap() |
|
|
|
|
|
|
|
.iter_from(&self.name, from, backwards, result) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
Err(e) => { |
|
|
|
|
|
|
|
log::warn!("error iterating with prefix {:?}", e); |
|
|
|
|
|
|
|
Box::new(std::iter::empty()) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> { |
|
|
|
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> { |
|
|
|
@ -530,23 +540,32 @@ impl Tree for PersyTree { |
|
|
|
prefix: Vec<u8>, |
|
|
|
prefix: Vec<u8>, |
|
|
|
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + 'a> { |
|
|
|
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + Send + 'a> { |
|
|
|
let range_prefix = ByteVec(prefix.to_owned()); |
|
|
|
let range_prefix = ByteVec(prefix.to_owned()); |
|
|
|
let owned_prefix = prefix.clone(); |
|
|
|
let range = self |
|
|
|
let result = Box::new( |
|
|
|
.persy |
|
|
|
self.persy |
|
|
|
.range::<ByteVec, ByteVec, _>(&self.name, range_prefix..); |
|
|
|
.range::<ByteVec, ByteVec, _>(&self.name, range_prefix..) |
|
|
|
|
|
|
|
.unwrap() |
|
|
|
match range { |
|
|
|
.take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) |
|
|
|
Ok(iter) => { |
|
|
|
.filter_map(|(k, v)| { |
|
|
|
let owned_prefix = prefix.clone(); |
|
|
|
v.into_iter() |
|
|
|
let result = Box::new( |
|
|
|
.map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) |
|
|
|
iter.take_while(move |(k, _)| k.0.starts_with(&owned_prefix)) |
|
|
|
.next() |
|
|
|
.filter_map(|(k, v)| { |
|
|
|
}), |
|
|
|
v.into_iter() |
|
|
|
); |
|
|
|
.map(|val| (k.0.to_owned().into(), val.0.to_owned().into())) |
|
|
|
|
|
|
|
.next() |
|
|
|
self.write_cache |
|
|
|
}), |
|
|
|
.read() |
|
|
|
); |
|
|
|
.unwrap() |
|
|
|
|
|
|
|
.scan_prefix(&self.name, prefix, result) |
|
|
|
self.write_cache |
|
|
|
|
|
|
|
.read() |
|
|
|
|
|
|
|
.unwrap() |
|
|
|
|
|
|
|
.scan_prefix(&self.name, prefix, result) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
Err(e) => { |
|
|
|
|
|
|
|
log::warn!("error scanning prefix {:?}", e); |
|
|
|
|
|
|
|
Box::new(std::iter::empty()) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
|
|
|
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
|
|
|
|