Browse Source

initial

merge-requests/200/head
Jonathan de Jong 4 years ago
parent
commit
6dd7294c6b
  1. 7
      Cargo.lock
  2. 1
      Cargo.toml
  3. 7
      src/database.rs
  4. 19
      src/database/rooms.rs
  5. 274
      src/utils.rs

7
Cargo.lock generated

@ -230,6 +230,12 @@ dependencies = [ @@ -230,6 +230,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "clru"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "218d6bd3dde8e442a975fa1cd233c0e5fded7596bccfe39f58eca98d22421e0a"
[[package]]
name = "color_quant"
version = "1.1.0"
@ -242,6 +248,7 @@ version = "0.2.0" @@ -242,6 +248,7 @@ version = "0.2.0"
dependencies = [
"base64 0.13.0",
"bytes",
"clru",
"crossbeam",
"directories",
"heed",

1
Cargo.toml

@ -79,6 +79,7 @@ num_cpus = "1.13.0" @@ -79,6 +79,7 @@ num_cpus = "1.13.0"
threadpool = "1.8.1"
heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true }
thread_local = "1.1.3"
clru = "0.5.0"
[features]
default = ["conduit_bin", "backend_sqlite"]

7
src/database.rs

@ -48,7 +48,7 @@ pub struct Config { @@ -48,7 +48,7 @@ pub struct Config {
#[serde(default = "default_db_cache_capacity_mb")]
db_cache_capacity_mb: f64,
#[serde(default = "default_pdu_cache_capacity")]
pdu_cache_capacity: u32,
pdu_cache_capacity: usize,
#[serde(default = "default_sqlite_wal_clean_second_interval")]
sqlite_wal_clean_second_interval: u32,
#[serde(default = "default_max_request_size")]
@ -109,7 +109,7 @@ fn default_db_cache_capacity_mb() -> f64 { @@ -109,7 +109,7 @@ fn default_db_cache_capacity_mb() -> f64 {
200.0
}
fn default_pdu_cache_capacity() -> u32 {
fn default_pdu_cache_capacity() -> usize {
100_000
}
@ -292,11 +292,12 @@ impl Database { @@ -292,11 +292,12 @@ impl Database {
softfailedeventids: builder.open_tree("softfailedeventids")?,
referencedevents: builder.open_tree("referencedevents")?,
pdu_cache: Mutex::new(LruCache::new(
pdu_cache: Mutex::new(clru::CLruCache::with_scale(
config
.pdu_cache_capacity
.try_into()
.expect("pdu cache capacity fits into usize"),
utils::scale::ConduitScale,
)),
auth_chain_cache: Mutex::new(LruCache::new(1_000_000)),
shorteventid_cache: Mutex::new(LruCache::new(1_000_000)),

19
src/database/rooms.rs

@ -96,7 +96,14 @@ pub struct Rooms { @@ -96,7 +96,14 @@ pub struct Rooms {
/// RoomId + EventId -> Parent PDU EventId.
pub(super) referencedevents: Arc<dyn Tree>,
pub(super) pdu_cache: Mutex<LruCache<EventId, Arc<PduEvent>>>,
pub(super) pdu_cache: Mutex<
clru::CLruCache<
EventId,
Arc<PduEvent>,
std::collections::hash_map::RandomState,
utils::scale::ConduitScale,
>,
>,
pub(super) shorteventid_cache: Mutex<LruCache<u64, Arc<EventId>>>,
pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<HashSet<u64>>>>,
pub(super) eventidshort_cache: Mutex<LruCache<EventId, u64>>,
@ -1081,7 +1088,7 @@ impl Rooms { @@ -1081,7 +1088,7 @@ impl Rooms {
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[tracing::instrument(skip(self))]
pub fn get_pdu(&self, event_id: &EventId) -> Result<Option<Arc<PduEvent>>> {
if let Some(p) = self.pdu_cache.lock().unwrap().get_mut(&event_id) {
if let Some(p) = self.pdu_cache.lock().unwrap().get(&event_id) {
return Ok(Some(Arc::clone(p)));
}
@ -1106,10 +1113,14 @@ impl Rooms { @@ -1106,10 +1113,14 @@ impl Rooms {
})
.transpose()?
{
self.pdu_cache
if let Err(_) = self
.pdu_cache
.lock()
.unwrap()
.insert(event_id.clone(), Arc::clone(&pdu));
.put_with_weight(event_id.clone(), Arc::clone(&pdu))
{
tracing::warn!("PDU too big to fit in cache ({}), not cached, leaked.", &event_id)
};
Ok(Some(pdu))
} else {
Ok(None)

274
src/utils.rs

@ -141,3 +141,277 @@ pub fn deserialize_from_str< @@ -141,3 +141,277 @@ pub fn deserialize_from_str<
}
deserializer.deserialize_str(Visitor(std::marker::PhantomData))
}
pub mod scale {
use std::{collections::BTreeMap, mem::size_of, sync::Arc};
use clru::WeightScale;
use ruma::{EventId, RoomId, UserId};
use crate::PduEvent;
// This trait captures a plain size_of of a particular value,
// but also grabs all "hidden owned memory" behind it dynamically
pub trait Weighted {
fn size_of() -> usize;
// inner values that aren't taken into account with size_of
//
// total memory use = size_of() + inner()
fn inner(&self) -> usize;
}
pub trait WeightedExt {
fn weight(&self) -> usize;
}
impl<T> WeightedExt for T
where
T: Weighted + ?Sized,
{
fn weight(&self) -> usize {
Self::size_of() + self.inner()
}
}
impl<T> Weighted for Arc<T>
where
T: Weighted,
{
fn size_of() -> usize {
size_of::<Arc<T>>()
}
fn inner(&self) -> usize {
let i = self.as_ref();
i.weight()
}
}
impl<T> Weighted for Option<T>
where
T: Weighted,
{
fn size_of() -> usize {
size_of::<Option<T>>()
}
fn inner(&self) -> usize {
if let Some(i) = self {
i.inner()
} else {
0
}
}
}
impl<T: ?Sized> Weighted for Box<T>
where
T: Weighted,
{
fn size_of() -> usize {
size_of::<Box<T>>()
}
fn inner(&self) -> usize {
(self as &T).weight()
}
}
// impl Weighted for Box<ruma::ServerName> {
// fn size_of() -> usize {
// size_of::<Box<ruma::ServerName>>()
// }
// fn inner(&self) -> usize {
// (self as &ruma::ServerName).weight()
// }
// }
impl<T> Weighted for Vec<T>
where
T: Weighted,
{
fn size_of() -> usize {
size_of::<Vec<T>>()
}
fn inner(&self) -> usize {
(T::size_of() * self.len()) + self.iter().fold(0, |i, t| i + t.inner())
}
}
impl<A, B> Weighted for BTreeMap<A, B>
where
A: Weighted,
B: Weighted,
{
fn size_of() -> usize {
size_of::<BTreeMap<A, B>>()
}
// FIXME: technically we have all values of key/value pairs,
// but btree has a lot of internal nodeleafs andsoforth, so this isn't everything
fn inner(&self) -> usize {
self.iter().fold(0, |a, (k, v)| a + k.weight() + v.weight())
}
}
impl Weighted for u64 {
fn size_of() -> usize {
size_of::<u64>()
}
fn inner(&self) -> usize {
0
}
}
impl Weighted for str {
fn size_of() -> usize {
0 // str is a transmutation of &[u8]
}
fn inner(&self) -> usize {
self.as_bytes().len()
}
}
impl Weighted for String {
fn size_of() -> usize {
size_of::<String>()
}
fn inner(&self) -> usize {
self.as_str().inner()
}
}
impl Weighted for ruma::ServerName {
fn size_of() -> usize {
0 // ServerName is effectively str
}
fn inner(&self) -> usize {
self.as_str().inner()
}
}
impl Weighted for EventId {
fn size_of() -> usize {
size_of::<EventId>()
}
// reconstructing the inner Box<str>
fn inner(&self) -> usize {
1 // stripped $ symbol
+ self.localpart().len() // everything up until :, or the end
+ if let Some(server) = self.server_name() {
1 // stripped : symbol
+ server.inner() // everything from after : to the end
} else {
0
}
}
}
impl Weighted for RoomId {
fn size_of() -> usize {
size_of::<RoomId>()
}
// reconstructing the inner Box<str>
fn inner(&self) -> usize {
1 // stripped ! symbol
+ self.localpart().len() // everything up until :
+ 1 // stripped : symbol
+ self.server_name().inner() // everything from after : to the end
}
}
impl Weighted for UserId {
fn size_of() -> usize {
size_of::<UserId>()
}
// reconstructing the inner Box<str>
fn inner(&self) -> usize {
1 // stripped @ symbol
+ self.localpart().len() // everything up until :
+ 1 // stripped : symbol
+ self.server_name().inner() // everything from after : to the end
}
}
impl Weighted for ruma::events::pdu::EventHash {
fn size_of() -> usize {
size_of::<ruma::events::pdu::EventHash>()
}
// reconstructing the inner Box<str>
fn inner(&self) -> usize {
self.sha256.inner()
}
}
impl<A, B: ?Sized> Weighted for ruma::KeyId<A, B> {
fn size_of() -> usize {
size_of::<ruma::KeyId<A, B>>()
}
fn inner(&self) -> usize {
self.as_str().inner()
}
}
impl Weighted for serde_json::Value {
fn size_of() -> usize {
size_of::<serde_json::Value>()
}
fn inner(&self) -> usize {
use serde_json::Value;
match self {
Value::String(s) => s.inner(),
Value::Array(a) => a.inner(),
Value::Object(o) => o
.into_iter()
.fold(0, |a, (s, v)| a + s.weight() + v.weight()),
_ => 0,
}
}
}
impl Weighted for PduEvent {
fn size_of() -> usize {
size_of::<PduEvent>()
}
fn inner(&self) -> usize {
self.event_id.inner()
+ self.room_id.inner()
+ self.sender.inner()
+ self.content.inner()
+ self.prev_events.inner()
+ self.auth_events.inner()
+ self.redacts.inner()
+ self.unsigned.inner()
+ self.hashes.inner()
+ self.signatures.inner()
}
}
pub(crate) struct ConduitScale;
impl<A, B> WeightScale<A, B> for ConduitScale
where
A: Weighted,
B: Weighted,
{
fn weight(&self, key: &A, value: &B) -> usize {
key.weight() + value.weight()
}
}
}

Loading…
Cancel
Save