From 9588f3a3194209e9842258acf7b6fa399343a6df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sat, 16 Oct 2021 15:19:25 +0200 Subject: [PATCH 01/11] improvement: allow rocksdb again --- Cargo.lock | 113 +++++++++++++++ Cargo.toml | 2 + src/database.rs | 11 +- src/database/abstraction.rs | 3 + src/database/abstraction/rocksdb.rs | 215 ++++++++++++++++++++++++++++ src/database/abstraction/sqlite.rs | 14 +- src/error.rs | 6 + src/utils.rs | 11 ++ 8 files changed, 364 insertions(+), 11 deletions(-) create mode 100644 src/database/abstraction/rocksdb.rs diff --git a/Cargo.lock b/Cargo.lock index fbf4b3f..3fbcbe3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,6 +146,25 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.59.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bd2a9a458e8f4304c52c43ebb0cfbd520289f8379a52e329a38afda99bf8eb8" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -205,6 +224,15 @@ dependencies = [ "jobserver", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "0.1.10" @@ -230,6 +258,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "clang-sys" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa66045b9cb23c2e9c1520732030608b02ee07e5cfaa5a521ec15ded7fa24c90" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "color_quant" version = "1.1.0" @@ -259,6 +298,7 @@ dependencies = [ "reqwest", "ring", "rocket", + "rocksdb", "ruma", "rusqlite", "rust-argon2", @@ -1167,12 +1207,40 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21" +[[package]] +name = "libloading" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afe203d669ec979b7128619bae5a63b7b42e9203c1b29146079ee05e2f604b52" +dependencies = [ + "cfg-if 1.0.0", + "winapi", +] + +[[package]] +name = "librocksdb-sys" +version = "6.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c309a9d2470844aceb9a4a098cf5286154d20596868b75a6b36357d2bb9ca25d" +dependencies = [ + "bindgen", + "cc", + "glob", + "libc", +] + [[package]] name = "libsqlite3-sys" version = "0.22.2" @@ -1289,6 +1357,12 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.3.7" @@ -1340,6 +1414,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "nom" +version = "7.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109" +dependencies = [ + "memchr", + "minimal-lexical", + "version_check", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -1539,6 +1624,12 @@ dependencies = [ "syn", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "pem" version = "0.8.3" @@ -1981,6 +2072,16 @@ dependencies = [ "uncased", ] +[[package]] +name = "rocksdb" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c749134fda8bfc90d0de643d59bfc841dcb3ac8a1062e12b6754bd60235c48b3" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "ruma" version = "0.4.0" @@ -2266,6 +2367,12 @@ dependencies = [ "crossbeam-utils 0.8.5", ] +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.2.3" @@ -2478,6 +2585,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" + [[package]] name = "signal-hook-registry" version = "1.4.0" diff --git a/Cargo.toml b/Cargo.toml index 02159e3..2ab993a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ crossbeam = { version = "0.8.1", optional = true } num_cpus = "1.13.0" threadpool = "1.8.1" heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true } +rocksdb = { version = "0.16.0", features = ["multi-threaded-cf"], optional = true } thread_local = "1.1.3" # used for TURN server authentication hmac = "0.11.0" @@ -88,6 +89,7 @@ default = ["conduit_bin", "backend_sqlite"] backend_sled = ["sled"] backend_sqlite = ["sqlite"] backend_heed = ["heed", "crossbeam"] +backend_rocksdb = ["rocksdb"] sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"] conduit_bin = [] # TODO: add rocket to this when it is optional diff --git a/src/database.rs b/src/database.rs index 84ca68d..a0f5cb5 100644 --- a/src/database.rs +++ b/src/database.rs @@ -154,6 +154,9 @@ pub type Engine = abstraction::sqlite::Engine; #[cfg(feature = "heed")] pub type Engine = abstraction::heed::Engine; +#[cfg(feature = "rocksdb")] +pub type Engine = abstraction::rocksdb::Engine; + pub struct Database { _db: Arc, pub globals: globals::Globals, @@ -315,10 +318,10 @@ impl Database { .expect("pdu cache capacity fits into usize"), )), auth_chain_cache: Mutex::new(LruCache::new(1_000_000)), - shorteventid_cache: Mutex::new(LruCache::new(1_000_000)), - eventidshort_cache: Mutex::new(LruCache::new(1_000_000)), - shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)), - statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), + shorteventid_cache: Mutex::new(LruCache::new(100_000_000)), + eventidshort_cache: Mutex::new(LruCache::new(100_000_000)), + shortstatekey_cache: Mutex::new(LruCache::new(100_000_000)), + statekeyshort_cache: Mutex::new(LruCache::new(100_000_000)), our_real_users_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()), stateinfo_cache: Mutex::new(LruCache::new(1000)), diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 11bbc3b..4c7f1d2 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -12,6 +12,9 @@ pub mod sqlite; #[cfg(feature = "heed")] pub mod heed; +#[cfg(feature = "rocksdb")] +pub mod rocksdb; + pub trait DatabaseEngine: Sized { fn open(config: &Config) -> Result>; fn open_tree(self: &Arc, name: &'static str) -> Result>; diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs new file mode 100644 index 0000000..a8077f0 --- /dev/null +++ b/src/database/abstraction/rocksdb.rs @@ -0,0 +1,215 @@ +use super::super::Config; +use crate::{utils, Result}; + +use std::{future::Future, pin::Pin, sync::Arc}; + +use super::{DatabaseEngine, Tree}; + +use std::{collections::HashMap, sync::RwLock}; + +pub struct Engine { + rocks: rocksdb::DBWithThreadMode, + old_cfs: Vec, +} + +pub struct RocksDbEngineTree<'a> { + db: Arc, + name: &'a str, + watchers: RwLock, Vec>>>, +} + +impl DatabaseEngine for Engine { + fn open(config: &Config) -> Result> { + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.set_max_open_files(16); + db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); + db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); + db_opts.set_target_file_size_base(256 << 20); + db_opts.set_write_buffer_size(256 << 20); + + let mut block_based_options = rocksdb::BlockBasedOptions::default(); + block_based_options.set_block_size(512 << 10); + db_opts.set_block_based_table_factory(&block_based_options); + + let cfs = rocksdb::DBWithThreadMode::::list_cf( + &db_opts, + &config.database_path, + ) + .unwrap_or_default(); + + let db = rocksdb::DBWithThreadMode::::open_cf_descriptors( + &db_opts, + &config.database_path, + cfs.iter().map(|name| { + let mut options = rocksdb::Options::default(); + let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1); + options.set_prefix_extractor(prefix_extractor); + options.set_merge_operator_associative("increment", utils::increment_rocksdb); + + rocksdb::ColumnFamilyDescriptor::new(name, options) + }), + )?; + + Ok(Arc::new(Engine { + rocks: db, + old_cfs: cfs, + })) + } + + fn open_tree(self: &Arc, name: &'static str) -> Result> { + if !self.old_cfs.contains(&name.to_owned()) { + // Create if it didn't exist + let mut options = rocksdb::Options::default(); + let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1); + options.set_prefix_extractor(prefix_extractor); + options.set_merge_operator_associative("increment", utils::increment_rocksdb); + + let _ = self.rocks.create_cf(name, &options); + println!("created cf"); + } + + Ok(Arc::new(RocksDbEngineTree { + name, + db: Arc::clone(self), + watchers: RwLock::new(HashMap::new()), + })) + } + + fn flush(self: &Arc) -> Result<()> { + // TODO? + Ok(()) + } +} + +impl RocksDbEngineTree<'_> { + fn cf(&self) -> rocksdb::BoundColumnFamily<'_> { + self.db.rocks.cf_handle(self.name).unwrap() + } +} + +impl Tree for RocksDbEngineTree<'_> { + fn get(&self, key: &[u8]) -> Result>> { + Ok(self.db.rocks.get_cf(self.cf(), key)?) + } + + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + let watchers = self.watchers.read().unwrap(); + let mut triggered = Vec::new(); + + for length in 0..=key.len() { + if watchers.contains_key(&key[..length]) { + triggered.push(&key[..length]); + } + } + + drop(watchers); + + if !triggered.is_empty() { + let mut watchers = self.watchers.write().unwrap(); + for prefix in triggered { + if let Some(txs) = watchers.remove(prefix) { + for tx in txs { + let _ = tx.send(()); + } + } + } + } + + Ok(self.db.rocks.put_cf(self.cf(), key, value)?) + } + + fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { + for (key, value) in iter { + self.db.rocks.put_cf(self.cf(), key, value)?; + } + + Ok(()) + } + + fn remove(&self, key: &[u8]) -> Result<()> { + Ok(self.db.rocks.delete_cf(self.cf(), key)?) + } + + fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { + Box::new( + self.db + .rocks + .iterator_cf(self.cf(), rocksdb::IteratorMode::Start) + .map(|(k, v)| (Vec::from(k), Vec::from(v))), + ) + } + + fn iter_from<'a>( + &'a self, + from: &[u8], + backwards: bool, + ) -> Box, Vec)> + 'a> { + Box::new( + self.db + .rocks + .iterator_cf( + self.cf(), + rocksdb::IteratorMode::From( + from, + if backwards { + rocksdb::Direction::Reverse + } else { + rocksdb::Direction::Forward + }, + ), + ) + .map(|(k, v)| (Vec::from(k), Vec::from(v))), + ) + } + + fn increment(&self, key: &[u8]) -> Result> { + // TODO: make atomic + let old = self.db.rocks.get_cf(self.cf(), &key)?; + let new = utils::increment(old.as_deref()).unwrap(); + self.db.rocks.put_cf(self.cf(), key, &new)?; + Ok(new) + } + + fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { + for key in iter { + let old = self.db.rocks.get_cf(self.cf(), &key)?; + let new = utils::increment(old.as_deref()).unwrap(); + self.db.rocks.put_cf(self.cf(), key, new)?; + } + + Ok(()) + } + + fn scan_prefix<'a>( + &'a self, + prefix: Vec, + ) -> Box, Vec)> + 'a> { + Box::new( + self.db + .rocks + .iterator_cf( + self.cf(), + rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward), + ) + .map(|(k, v)| (Vec::from(k), Vec::from(v))) + .take_while(move |(k, _)| k.starts_with(&prefix)), + ) + } + + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.watchers + .write() + .unwrap() + .entry(prefix.to_vec()) + .or_default() + .push(tx); + + Box::pin(async move { + // Tx is never destroyed + rx.await.unwrap(); + }) + } +} diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 1d2038c..e53ab42 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -134,7 +134,7 @@ type TupleOfBytes = (Vec, Vec); impl SqliteTable { #[tracing::instrument(skip(self, guard, key))] fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result>> { - //dbg!(&self.name); + dbg!(&self.name); Ok(guard .prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())? .query_row([key], |row| row.get(0)) @@ -143,7 +143,7 @@ impl SqliteTable { #[tracing::instrument(skip(self, guard, key, value))] fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> { - //dbg!(&self.name); + dbg!(&self.name); guard.execute( format!( "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)", @@ -170,14 +170,14 @@ impl SqliteTable { let statement_ref = NonAliasingBox(statement); - //let name = self.name.clone(); + let name = self.name.clone(); let iterator = Box::new( statement .query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() .map(move |r| { - //dbg!(&name); + dbg!(&name); r.unwrap() }), ); @@ -285,7 +285,7 @@ impl Tree for SqliteTable { let guard = self.engine.read_lock_iterator(); let from = from.to_vec(); // TODO change interface? - //let name = self.name.clone(); + let name = self.name.clone(); if backwards { let statement = Box::leak(Box::new( @@ -304,7 +304,7 @@ impl Tree for SqliteTable { .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() .map(move |r| { - //dbg!(&name); + dbg!(&name); r.unwrap() }), ); @@ -329,7 +329,7 @@ impl Tree for SqliteTable { .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() .map(move |r| { - //dbg!(&name); + dbg!(&name); r.unwrap() }), ); diff --git a/src/error.rs b/src/error.rs index 7faddc9..4d427da 100644 --- a/src/error.rs +++ b/src/error.rs @@ -39,6 +39,12 @@ pub enum Error { #[cfg(feature = "heed")] #[error("There was a problem with the connection to the heed database: {error}")] HeedError { error: String }, + #[cfg(feature = "rocksdb")] + #[error("There was a problem with the connection to the rocksdb database: {source}")] + RocksDbError { + #[from] + source: rocksdb::Error, + }, #[error("Could not generate an image.")] ImageError { #[from] diff --git a/src/utils.rs b/src/utils.rs index 26d71a8..4702d05 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -29,6 +29,17 @@ pub fn increment(old: Option<&[u8]>) -> Option> { Some(number.to_be_bytes().to_vec()) } +#[cfg(feature = "rocksdb")] +pub fn increment_rocksdb( + _new_key: &[u8], + old: Option<&[u8]>, + _operands: &mut rocksdb::MergeOperands, +) -> Option> { + dbg!(_new_key); + dbg!(old); + increment(old) +} + pub fn generate_keypair() -> Vec { let mut value = random_string(8).as_bytes().to_vec(); value.push(0xff); From 214795cae7edaec03aae7aa6c77941b9c2983061 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Fri, 10 Dec 2021 21:34:45 +0100 Subject: [PATCH 02/11] rocksdb as default --- Cargo.toml | 2 +- src/database.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2ab993a..a8099aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ hmac = "0.11.0" sha-1 = "0.9.8" [features] -default = ["conduit_bin", "backend_sqlite"] +default = ["conduit_bin", "backend_rocksdb"] backend_sled = ["sled"] backend_sqlite = ["sqlite"] backend_heed = ["heed", "crossbeam"] diff --git a/src/database.rs b/src/database.rs index a0f5cb5..bd05798 100644 --- a/src/database.rs +++ b/src/database.rs @@ -318,10 +318,10 @@ impl Database { .expect("pdu cache capacity fits into usize"), )), auth_chain_cache: Mutex::new(LruCache::new(1_000_000)), - shorteventid_cache: Mutex::new(LruCache::new(100_000_000)), - eventidshort_cache: Mutex::new(LruCache::new(100_000_000)), - shortstatekey_cache: Mutex::new(LruCache::new(100_000_000)), - statekeyshort_cache: Mutex::new(LruCache::new(100_000_000)), + shorteventid_cache: Mutex::new(LruCache::new(1_000_000)), + eventidshort_cache: Mutex::new(LruCache::new(1_000_000)), + shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)), + statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), our_real_users_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()), stateinfo_cache: Mutex::new(LruCache::new(1000)), From 08eed81139ba47d8d126f8255ef40ce4ea586bba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Thu, 16 Dec 2021 14:52:19 +0100 Subject: [PATCH 03/11] fix: stack overflows when fetching auth events --- src/database/abstraction/rocksdb.rs | 18 ++-- src/server_server.rs | 145 +++++++++++++++------------- 2 files changed, 88 insertions(+), 75 deletions(-) diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index a8077f0..c922156 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -22,14 +22,20 @@ impl DatabaseEngine for Engine { fn open(config: &Config) -> Result> { let mut db_opts = rocksdb::Options::default(); db_opts.create_if_missing(true); - db_opts.set_max_open_files(16); + db_opts.set_max_open_files(512); db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); - db_opts.set_target_file_size_base(256 << 20); - db_opts.set_write_buffer_size(256 << 20); + db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd); + db_opts.set_target_file_size_base(2 << 22); + db_opts.set_max_bytes_for_level_base(2 << 24); + db_opts.set_max_bytes_for_level_multiplier(2.0); + db_opts.set_num_levels(8); + db_opts.set_write_buffer_size(2 << 27); + + let rocksdb_cache = rocksdb::Cache::new_lru_cache((config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize).unwrap(); let mut block_based_options = rocksdb::BlockBasedOptions::default(); - block_based_options.set_block_size(512 << 10); + block_based_options.set_block_size(2 << 19); + block_based_options.set_block_cache(&rocksdb_cache); db_opts.set_block_based_table_factory(&block_based_options); let cfs = rocksdb::DBWithThreadMode::::list_cf( @@ -45,7 +51,6 @@ impl DatabaseEngine for Engine { let mut options = rocksdb::Options::default(); let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1); options.set_prefix_extractor(prefix_extractor); - options.set_merge_operator_associative("increment", utils::increment_rocksdb); rocksdb::ColumnFamilyDescriptor::new(name, options) }), @@ -63,7 +68,6 @@ impl DatabaseEngine for Engine { let mut options = rocksdb::Options::default(); let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1); options.set_prefix_extractor(prefix_extractor); - options.set_merge_operator_associative("increment", utils::increment_rocksdb); let _ = self.rocks.create_cf(name, &options); println!("created cf"); diff --git a/src/server_server.rs b/src/server_server.rs index 594152a..d6bc9b9 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1392,12 +1392,11 @@ async fn upgrade_outlier_to_timeline_pdu( let mut starting_events = Vec::with_capacity(leaf_state.len()); for (k, id) in leaf_state { - let k = db - .rooms - .get_statekey_from_short(k) - .map_err(|_| "Failed to get_statekey_from_short.".to_owned())?; - - state.insert(k, id.clone()); + if let Ok(k) = db.rooms.get_statekey_from_short(k) { + state.insert(k, id.clone()); + } else { + warn!("Failed to get_statekey_from_short."); + } starting_events.push(id); } @@ -1755,11 +1754,16 @@ async fn upgrade_outlier_to_timeline_pdu( .into_iter() .map(|map| { map.into_iter() - .map(|(k, id)| db.rooms.get_statekey_from_short(k).map(|k| (k, id))) - .collect::>>() + .filter_map(|(k, id)| { + db.rooms + .get_statekey_from_short(k) + .map(|k| (k, id)) + .map_err(|e| warn!("Failed to get_statekey_from_short: {}", e)) + .ok() + }) + .collect::>() }) - .collect::>() - .map_err(|_| "Failed to get_statekey_from_short.".to_owned())?; + .collect(); let state = match state_res::resolve( room_version_id, @@ -1871,73 +1875,78 @@ pub(crate) fn fetch_and_handle_outliers<'a>( // a. Look in the main timeline (pduid_pdu tree) // b. Look at outlier pdu tree // (get_pdu_json checks both) - let local_pdu = db.rooms.get_pdu(id); - let pdu = match local_pdu { - Ok(Some(pdu)) => { + if let Ok(Some(local_pdu)) = db.rooms.get_pdu(id) { + trace!("Found {} in db", id); + pdus.push((local_pdu, None)); + } + + // c. Ask origin server over federation + // We also handle its auth chain here so we don't get a stack overflow in + // handle_outlier_pdu. + let mut todo_auth_events = vec![id]; + let mut events_in_reverse_order = Vec::new(); + while let Some(next_id) = todo_auth_events.pop() { + if let Ok(Some(_)) = db.rooms.get_pdu(next_id) { trace!("Found {} in db", id); - (pdu, None) + continue; } - Ok(None) => { - // c. Ask origin server over federation - warn!("Fetching {} over federation.", id); - match db - .sending - .send_federation_request( - &db.globals, - origin, - get_event::v1::Request { event_id: id }, - ) - .await - { - Ok(res) => { - warn!("Got {} over federation", id); - let (calculated_event_id, value) = - match crate::pdu::gen_event_id_canonical_json(&res.pdu) { - Ok(t) => t, - Err(_) => { - back_off((**id).to_owned()); - continue; - } - }; - if calculated_event_id != **id { - warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}", - id, calculated_event_id, &res.pdu); - } - - // This will also fetch the auth chain - match handle_outlier_pdu( - origin, - create_event, - id, - room_id, - value.clone(), - db, - pub_key_map, - ) - .await - { - Ok((pdu, json)) => (pdu, Some(json)), - Err(e) => { - warn!("Authentication of event {} failed: {:?}", id, e); - back_off((**id).to_owned()); + warn!("Fetching {} over federation.", next_id); + match db + .sending + .send_federation_request( + &db.globals, + origin, + get_event::v1::Request { event_id: next_id }, + ) + .await + { + Ok(res) => { + warn!("Got {} over federation", next_id); + let (calculated_event_id, value) = + match crate::pdu::gen_event_id_canonical_json(&res.pdu) { + Ok(t) => t, + Err(_) => { + back_off((**next_id).to_owned()); continue; } - } - } - Err(_) => { - warn!("Failed to fetch event: {}", id); - back_off((**id).to_owned()); - continue; + }; + + if calculated_event_id != **next_id { + warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}", + next_id, calculated_event_id, &res.pdu); } + + events_in_reverse_order.push((next_id, value)); + } + Err(_) => { + warn!("Failed to fetch event: {}", next_id); + back_off((**next_id).to_owned()); } } - Err(e) => { - warn!("Error loading {}: {}", id, e); - continue; + } + + while let Some((next_id, value)) = events_in_reverse_order.pop() { + match handle_outlier_pdu( + origin, + create_event, + next_id, + room_id, + value.clone(), + db, + pub_key_map, + ) + .await + { + Ok((pdu, json)) => { + pdus.push((pdu, Some(json))); + } + Err(e) => { + warn!("Authentication of event {} failed: {:?}", next_id, e); + back_off((**next_id).to_owned()); + } } - }; - pdus.push(pdu); + } } pdus }) From 463ded00b2a67e58380aeb52c02a8500729705cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Thu, 16 Dec 2021 15:54:42 +0100 Subject: [PATCH 04/11] fix auth event fetching --- src/server_server.rs | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index d6bc9b9..28c3ea0 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1878,15 +1878,16 @@ pub(crate) fn fetch_and_handle_outliers<'a>( if let Ok(Some(local_pdu)) = db.rooms.get_pdu(id) { trace!("Found {} in db", id); pdus.push((local_pdu, None)); + continue; } // c. Ask origin server over federation // We also handle its auth chain here so we don't get a stack overflow in // handle_outlier_pdu. - let mut todo_auth_events = vec![id]; + let mut todo_auth_events = vec![Arc::clone(id)]; let mut events_in_reverse_order = Vec::new(); while let Some(next_id) = todo_auth_events.pop() { - if let Ok(Some(_)) = db.rooms.get_pdu(next_id) { + if let Ok(Some(_)) = db.rooms.get_pdu(&next_id) { trace!("Found {} in db", id); continue; } @@ -1897,7 +1898,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( .send_federation_request( &db.globals, origin, - get_event::v1::Request { event_id: next_id }, + get_event::v1::Request { event_id: &next_id }, ) .await { @@ -1907,21 +1908,35 @@ pub(crate) fn fetch_and_handle_outliers<'a>( match crate::pdu::gen_event_id_canonical_json(&res.pdu) { Ok(t) => t, Err(_) => { - back_off((**next_id).to_owned()); + back_off((*next_id).to_owned()); continue; } }; - if calculated_event_id != **next_id { + if calculated_event_id != *next_id { warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}", next_id, calculated_event_id, &res.pdu); } + + if let Some(auth_events) = value.get("auth_events").and_then(|c| c.as_array()) { + for auth_event in auth_events { + if let Some(Ok(auth_event)) = auth_event.as_str() + .map(|e| serde_json::from_str(e)) { + todo_auth_events.push(auth_event); + } else { + warn!("Auth event id is not valid"); + } + } + } else { + warn!("Auth event list invalid"); + } + events_in_reverse_order.push((next_id, value)); } Err(_) => { warn!("Failed to fetch event: {}", next_id); - back_off((**next_id).to_owned()); + back_off((*next_id).to_owned()); } } } @@ -1930,7 +1945,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( match handle_outlier_pdu( origin, create_event, - next_id, + &next_id, room_id, value.clone(), db, @@ -1943,7 +1958,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( } Err(e) => { warn!("Authentication of event {} failed: {:?}", next_id, e); - back_off((**next_id).to_owned()); + back_off((*next_id).to_owned()); } } } From fcacb225f36c8c782a4a149145bc371d41847c81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Thu, 16 Dec 2021 21:42:53 +0100 Subject: [PATCH 05/11] dbg --- src/server_server.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index 28c3ea0..b6bea0c 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1922,8 +1922,9 @@ pub(crate) fn fetch_and_handle_outliers<'a>( if let Some(auth_events) = value.get("auth_events").and_then(|c| c.as_array()) { for auth_event in auth_events { if let Some(Ok(auth_event)) = auth_event.as_str() - .map(|e| serde_json::from_str(e)) { - todo_auth_events.push(auth_event); + .map(|e| {let ev: std::result::Result, _> = dbg!(serde_json::from_str(dbg!(e))); ev}) { + let a: Arc = auth_event; + todo_auth_events.push(a); } else { warn!("Auth event id is not valid"); } From 28a04ce7dbe4050ab1166863adb558cd128e12d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Thu, 16 Dec 2021 22:25:24 +0100 Subject: [PATCH 06/11] fix? --- src/server_server.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index b6bea0c..8c5c09f 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1921,8 +1921,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( if let Some(auth_events) = value.get("auth_events").and_then(|c| c.as_array()) { for auth_event in auth_events { - if let Some(Ok(auth_event)) = auth_event.as_str() - .map(|e| {let ev: std::result::Result, _> = dbg!(serde_json::from_str(dbg!(e))); ev}) { + if let Ok(auth_event) = serde_json::from_value(auth_event.clone().into()) { let a: Arc = auth_event; todo_auth_events.push(a); } else { From 2ebe6853be1a3f6f5b3792eef32d870ab509374a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sun, 19 Dec 2021 10:48:28 +0100 Subject: [PATCH 07/11] improvement, maybe not safe --- src/server_server.rs | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index 8c5c09f..57f5586 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1686,25 +1686,6 @@ async fn upgrade_outlier_to_timeline_pdu( // We do this by adding the current state to the list of fork states extremity_sstatehashes.remove(¤t_sstatehash); fork_states.push(current_state_ids); - dbg!(&extremity_sstatehashes); - - for (sstatehash, leaf_pdu) in extremity_sstatehashes { - let mut leaf_state = db - .rooms - .state_full_ids(sstatehash) - .map_err(|_| "Failed to ask db for room state.".to_owned())?; - - if let Some(state_key) = &leaf_pdu.state_key { - let shortstatekey = db - .rooms - .get_or_create_shortstatekey(&leaf_pdu.kind, state_key, &db.globals) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; - leaf_state.insert(shortstatekey, Arc::from(&*leaf_pdu.event_id)); - // Now it's the state after the pdu - } - - fork_states.push(leaf_state); - } // We also add state after incoming event to the fork states let mut state_after = state_at_incoming_event.clone(); @@ -1941,7 +1922,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( } } - while let Some((next_id, value)) = events_in_reverse_order.pop() { + for (next_id, value) in events_in_reverse_order { match handle_outlier_pdu( origin, create_event, @@ -1954,7 +1935,9 @@ pub(crate) fn fetch_and_handle_outliers<'a>( .await { Ok((pdu, json)) => { - pdus.push((pdu, Some(json))); + if next_id == *id { + pdus.push((pdu, Some(json))); + } } Err(e) => { warn!("Authentication of event {} failed: {:?}", next_id, e); From 4556db411dd5b7c4600d9bb3df9b01dd8989519a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 20 Dec 2021 10:16:22 +0100 Subject: [PATCH 08/11] fix: atomic increment --- src/database/abstraction/rocksdb.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index c922156..17dbf66 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -16,6 +16,7 @@ pub struct RocksDbEngineTree<'a> { db: Arc, name: &'a str, watchers: RwLock, Vec>>>, + write_lock: RwLock<()> } impl DatabaseEngine for Engine { @@ -77,6 +78,7 @@ impl DatabaseEngine for Engine { name, db: Arc::clone(self), watchers: RwLock::new(HashMap::new()), + write_lock: RwLock::new(()), })) } @@ -120,7 +122,13 @@ impl Tree for RocksDbEngineTree<'_> { } } - Ok(self.db.rocks.put_cf(self.cf(), key, value)?) + let lock = self.write_lock.read().unwrap(); + + let result = self.db.rocks.put_cf(self.cf(), key, value)?; + + drop(lock); + + Ok(result) } fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { @@ -168,20 +176,27 @@ impl Tree for RocksDbEngineTree<'_> { } fn increment(&self, key: &[u8]) -> Result> { - // TODO: make atomic + let lock = self.write_lock.write().unwrap(); + let old = self.db.rocks.get_cf(self.cf(), &key)?; let new = utils::increment(old.as_deref()).unwrap(); self.db.rocks.put_cf(self.cf(), key, &new)?; + + drop(lock); Ok(new) } fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { + let lock = self.write_lock.write().unwrap(); + for key in iter { let old = self.db.rocks.get_cf(self.cf(), &key)?; let new = utils::increment(old.as_deref()).unwrap(); self.db.rocks.put_cf(self.cf(), key, new)?; } + drop(lock); + Ok(()) } From d7046ebb625701dd3f220a8ba5d58b69c808bd27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 21 Dec 2021 16:02:12 +0100 Subject: [PATCH 09/11] improvement: don't fetch event multiple times --- src/database/abstraction/rocksdb.rs | 6 ++++-- src/server_server.rs | 17 +++++++++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index 17dbf66..5444295 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -16,7 +16,7 @@ pub struct RocksDbEngineTree<'a> { db: Arc, name: &'a str, watchers: RwLock, Vec>>>, - write_lock: RwLock<()> + write_lock: RwLock<()>, } impl DatabaseEngine for Engine { @@ -32,7 +32,9 @@ impl DatabaseEngine for Engine { db_opts.set_num_levels(8); db_opts.set_write_buffer_size(2 << 27); - let rocksdb_cache = rocksdb::Cache::new_lru_cache((config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize).unwrap(); + let rocksdb_cache = + rocksdb::Cache::new_lru_cache((config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize) + .unwrap(); let mut block_based_options = rocksdb::BlockBasedOptions::default(); block_based_options.set_block_size(2 << 19); diff --git a/src/server_server.rs b/src/server_server.rs index 57f5586..6e8ebf3 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1867,7 +1867,12 @@ pub(crate) fn fetch_and_handle_outliers<'a>( // handle_outlier_pdu. let mut todo_auth_events = vec![Arc::clone(id)]; let mut events_in_reverse_order = Vec::new(); + let mut events_all = HashSet::new(); while let Some(next_id) = todo_auth_events.pop() { + if events_all.contains(&next_id) { + continue; + } + if let Ok(Some(_)) = db.rooms.get_pdu(&next_id) { trace!("Found {} in db", id); continue; @@ -1899,10 +1904,13 @@ pub(crate) fn fetch_and_handle_outliers<'a>( next_id, calculated_event_id, &res.pdu); } - - if let Some(auth_events) = value.get("auth_events").and_then(|c| c.as_array()) { + if let Some(auth_events) = + value.get("auth_events").and_then(|c| c.as_array()) + { for auth_event in auth_events { - if let Ok(auth_event) = serde_json::from_value(auth_event.clone().into()) { + if let Ok(auth_event) = + serde_json::from_value(auth_event.clone().into()) + { let a: Arc = auth_event; todo_auth_events.push(a); } else { @@ -1913,7 +1921,8 @@ pub(crate) fn fetch_and_handle_outliers<'a>( warn!("Auth event list invalid"); } - events_in_reverse_order.push((next_id, value)); + events_in_reverse_order.push((next_id.clone(), value)); + events_all.insert(next_id); } Err(_) => { warn!("Failed to fetch event: {}", next_id); From c9070df6c07d2237e15b315cbadf9dc589ba66e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 21 Dec 2021 22:10:31 +0100 Subject: [PATCH 10/11] fix: auth event fetch order --- src/server_server.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index 6e8ebf3..c76afd3 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1931,7 +1931,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( } } - for (next_id, value) in events_in_reverse_order { + for (next_id, value) in events_in_reverse_order.iter().rev() { match handle_outlier_pdu( origin, create_event, @@ -1944,13 +1944,13 @@ pub(crate) fn fetch_and_handle_outliers<'a>( .await { Ok((pdu, json)) => { - if next_id == *id { + if next_id == id { pdus.push((pdu, Some(json))); } } Err(e) => { warn!("Authentication of event {} failed: {:?}", next_id, e); - back_off((*next_id).to_owned()); + back_off((**next_id).to_owned()); } } } From 5f64e24a6e90bc75184aa6f194b136c7d0cc90c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Thu, 23 Dec 2021 10:36:14 +0100 Subject: [PATCH 11/11] improvement: copy watcher deduplication from sqlite --- src/database/abstraction/rocksdb.rs | 48 ++++++++++++++--------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index 5444295..c0fda30 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -1,11 +1,11 @@ use super::super::Config; use crate::{utils, Result}; - use std::{future::Future, pin::Pin, sync::Arc}; - +use std::{ + collections::{hash_map, HashMap}, + sync::RwLock}; use super::{DatabaseEngine, Tree}; - -use std::{collections::HashMap, sync::RwLock}; +use tokio::sync::watch; pub struct Engine { rocks: rocksdb::DBWithThreadMode, @@ -15,7 +15,7 @@ pub struct Engine { pub struct RocksDbEngineTree<'a> { db: Arc, name: &'a str, - watchers: RwLock, Vec>>>, + watchers: RwLock, (watch::Sender<()>, watch::Receiver<()>)>>, write_lock: RwLock<()>, } @@ -102,6 +102,12 @@ impl Tree for RocksDbEngineTree<'_> { } fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + let lock = self.write_lock.read().unwrap(); + + let result = self.db.rocks.put_cf(self.cf(), key, value)?; + + drop(lock); + let watchers = self.watchers.read().unwrap(); let mut triggered = Vec::new(); @@ -116,19 +122,11 @@ impl Tree for RocksDbEngineTree<'_> { if !triggered.is_empty() { let mut watchers = self.watchers.write().unwrap(); for prefix in triggered { - if let Some(txs) = watchers.remove(prefix) { - for tx in txs { - let _ = tx.send(()); - } + if let Some(tx) = watchers.remove(prefix) { + let _ = tx.0.send(()); } } - } - - let lock = self.write_lock.read().unwrap(); - - let result = self.db.rocks.put_cf(self.cf(), key, value)?; - - drop(lock); + }; Ok(result) } @@ -219,18 +217,18 @@ impl Tree for RocksDbEngineTree<'_> { } fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.watchers - .write() - .unwrap() - .entry(prefix.to_vec()) - .or_default() - .push(tx); + let mut rx = match self.watchers.write().unwrap().entry(prefix.to_vec()) { + hash_map::Entry::Occupied(o) => o.get().1.clone(), + hash_map::Entry::Vacant(v) => { + let (tx, rx) = tokio::sync::watch::channel(()); + v.insert((tx, rx.clone())); + rx + } + }; Box::pin(async move { // Tx is never destroyed - rx.await.unwrap(); + rx.changed().await.unwrap(); }) } }