diff --git a/Cargo.lock b/Cargo.lock index 293bcff..d21210e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -230,6 +230,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "clru" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "218d6bd3dde8e442a975fa1cd233c0e5fded7596bccfe39f58eca98d22421e0a" + [[package]] name = "color_quant" version = "1.1.0" @@ -242,6 +248,7 @@ version = "0.2.0" dependencies = [ "base64 0.13.0", "bytes", + "clru", "crossbeam", "directories", "heed", diff --git a/Cargo.toml b/Cargo.toml index 13a7af4..1301179 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,6 +79,7 @@ num_cpus = "1.13.0" threadpool = "1.8.1" heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true } thread_local = "1.1.3" +clru = "0.5.0" [features] default = ["conduit_bin", "backend_sqlite"] diff --git a/src/database.rs b/src/database.rs index 8cf4f64..fb2874b 100644 --- a/src/database.rs +++ b/src/database.rs @@ -48,7 +48,7 @@ pub struct Config { #[serde(default = "default_db_cache_capacity_mb")] db_cache_capacity_mb: f64, #[serde(default = "default_pdu_cache_capacity")] - pdu_cache_capacity: u32, + pdu_cache_capacity: usize, #[serde(default = "default_sqlite_wal_clean_second_interval")] sqlite_wal_clean_second_interval: u32, #[serde(default = "default_max_request_size")] @@ -111,7 +111,7 @@ fn default_db_cache_capacity_mb() -> f64 { 200.0 } -fn default_pdu_cache_capacity() -> u32 { +fn default_pdu_cache_capacity() -> usize { 100_000 } @@ -294,11 +294,12 @@ impl Database { softfailedeventids: builder.open_tree("softfailedeventids")?, referencedevents: builder.open_tree("referencedevents")?, - pdu_cache: Mutex::new(LruCache::new( + pdu_cache: Mutex::new(clru::CLruCache::with_scale( config .pdu_cache_capacity .try_into() .expect("pdu cache capacity fits into usize"), + utils::scale::ConduitScale, )), auth_chain_cache: Mutex::new(LruCache::new(1_000_000)), shorteventid_cache: Mutex::new(LruCache::new(1_000_000)), diff --git a/src/database/rooms.rs b/src/database/rooms.rs index c5b795b..53f5dd0 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -107,7 +107,14 @@ pub struct Rooms { /// RoomId + EventId -> Parent PDU EventId. pub(super) referencedevents: Arc, - pub(super) pdu_cache: Mutex>>, + pub(super) pdu_cache: Mutex< + clru::CLruCache< + EventId, + Arc, + std::collections::hash_map::RandomState, + utils::scale::ConduitScale, + >, + >, pub(super) shorteventid_cache: Mutex>>, pub(super) auth_chain_cache: Mutex, Arc>>>, pub(super) eventidshort_cache: Mutex>, @@ -1087,7 +1094,7 @@ impl Rooms { /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. #[tracing::instrument(skip(self))] pub fn get_pdu(&self, event_id: &EventId) -> Result>> { - if let Some(p) = self.pdu_cache.lock().unwrap().get_mut(event_id) { + if let Some(p) = self.pdu_cache.lock().unwrap().get(event_id) { return Ok(Some(Arc::clone(p))); } @@ -1109,10 +1116,14 @@ impl Rooms { }) .transpose()? { - self.pdu_cache + if let Err(_) = self + .pdu_cache .lock() .unwrap() - .insert(event_id.clone(), Arc::clone(&pdu)); + .put_with_weight(event_id.clone(), Arc::clone(&pdu)) + { + tracing::warn!("PDU too big to fit in cache ({}), not cached, leaked.", &event_id) + }; Ok(Some(pdu)) } else { Ok(None) diff --git a/src/utils.rs b/src/utils.rs index 26d71a8..20a8e90 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -141,3 +141,277 @@ pub fn deserialize_from_str< } deserializer.deserialize_str(Visitor(std::marker::PhantomData)) } + +pub mod scale { + use std::{collections::BTreeMap, mem::size_of, sync::Arc}; + + use clru::WeightScale; + use ruma::{EventId, RoomId, UserId}; + + use crate::PduEvent; + + // This trait captures a plain size_of of a particular value, + // but also grabs all "hidden owned memory" behind it dynamically + pub trait Weighted { + fn size_of() -> usize; + + // inner values that aren't taken into account with size_of + // + // total memory use = size_of() + inner() + fn inner(&self) -> usize; + } + + pub trait WeightedExt { + fn weight(&self) -> usize; + } + + impl WeightedExt for T + where + T: Weighted + ?Sized, + { + fn weight(&self) -> usize { + Self::size_of() + self.inner() + } + } + + impl Weighted for Arc + where + T: Weighted, + { + fn size_of() -> usize { + size_of::>() + } + + fn inner(&self) -> usize { + let i = self.as_ref(); + + i.weight() + } + } + + impl Weighted for Option + where + T: Weighted, + { + fn size_of() -> usize { + size_of::>() + } + + fn inner(&self) -> usize { + if let Some(i) = self { + i.inner() + } else { + 0 + } + } + } + + impl Weighted for Box + where + T: Weighted, + { + fn size_of() -> usize { + size_of::>() + } + + fn inner(&self) -> usize { + (self as &T).weight() + } + } + + // impl Weighted for Box { + // fn size_of() -> usize { + // size_of::>() + // } + + // fn inner(&self) -> usize { + // (self as &ruma::ServerName).weight() + // } + // } + + impl Weighted for Vec + where + T: Weighted, + { + fn size_of() -> usize { + size_of::>() + } + + fn inner(&self) -> usize { + (T::size_of() * self.len()) + self.iter().fold(0, |i, t| i + t.inner()) + } + } + + impl Weighted for BTreeMap + where + A: Weighted, + B: Weighted, + { + fn size_of() -> usize { + size_of::>() + } + + // FIXME: technically we have all values of key/value pairs, + // but btree has a lot of internal nodeleafs andsoforth, so this isn't everything + fn inner(&self) -> usize { + self.iter().fold(0, |a, (k, v)| a + k.weight() + v.weight()) + } + } + + impl Weighted for u64 { + fn size_of() -> usize { + size_of::() + } + + fn inner(&self) -> usize { + 0 + } + } + + impl Weighted for str { + fn size_of() -> usize { + 0 // str is a transmutation of &[u8] + } + + fn inner(&self) -> usize { + self.as_bytes().len() + } + } + + impl Weighted for String { + fn size_of() -> usize { + size_of::() + } + + fn inner(&self) -> usize { + self.as_str().inner() + } + } + + impl Weighted for ruma::ServerName { + fn size_of() -> usize { + 0 // ServerName is effectively str + } + + fn inner(&self) -> usize { + self.as_str().inner() + } + } + + impl Weighted for EventId { + fn size_of() -> usize { + size_of::() + } + + // reconstructing the inner Box + fn inner(&self) -> usize { + 1 // stripped $ symbol + + self.localpart().len() // everything up until :, or the end + + if let Some(server) = self.server_name() { + 1 // stripped : symbol + + server.inner() // everything from after : to the end + } else { + 0 + } + } + } + + impl Weighted for RoomId { + fn size_of() -> usize { + size_of::() + } + + // reconstructing the inner Box + fn inner(&self) -> usize { + 1 // stripped ! symbol + + self.localpart().len() // everything up until : + + 1 // stripped : symbol + + self.server_name().inner() // everything from after : to the end + } + } + + impl Weighted for UserId { + fn size_of() -> usize { + size_of::() + } + + // reconstructing the inner Box + fn inner(&self) -> usize { + 1 // stripped @ symbol + + self.localpart().len() // everything up until : + + 1 // stripped : symbol + + self.server_name().inner() // everything from after : to the end + } + } + + impl Weighted for ruma::events::pdu::EventHash { + fn size_of() -> usize { + size_of::() + } + + // reconstructing the inner Box + fn inner(&self) -> usize { + self.sha256.inner() + } + } + + impl Weighted for ruma::KeyId { + fn size_of() -> usize { + size_of::>() + } + + fn inner(&self) -> usize { + self.as_str().inner() + } + } + + impl Weighted for serde_json::Value { + fn size_of() -> usize { + size_of::() + } + + fn inner(&self) -> usize { + use serde_json::Value; + + match self { + Value::String(s) => s.inner(), + Value::Array(a) => a.inner(), + Value::Object(o) => o + .into_iter() + .fold(0, |a, (s, v)| a + s.weight() + v.weight()), + _ => 0, + } + } + } + + impl Weighted for PduEvent { + fn size_of() -> usize { + size_of::() + } + + fn inner(&self) -> usize { + self.event_id.inner() + + self.room_id.inner() + + self.sender.inner() + + self.content.inner() + + self.prev_events.inner() + + self.auth_events.inner() + + self.redacts.inner() + + self.unsigned.inner() + + self.hashes.inner() + + self.signatures.inner() + } + } + + pub(crate) struct ConduitScale; + + impl WeightScale for ConduitScale + where + A: Weighted, + B: Weighted, + { + fn weight(&self, key: &A, value: &B) -> usize { + key.weight() + value.weight() + } + } +}