From 67749364aae4719d0d688e578fc6f4e89d8a172b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Wed, 11 Aug 2021 10:24:16 +0200 Subject: [PATCH 1/5] improvement: smaller cache, better prev event fetching --- src/database.rs | 4 ++-- src/server_server.rs | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/database.rs b/src/database.rs index bdff386..1bf9434 100644 --- a/src/database.rs +++ b/src/database.rs @@ -270,8 +270,8 @@ impl Database { eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?, referencedevents: builder.open_tree("referencedevents")?, - pdu_cache: Mutex::new(LruCache::new(1_000_000)), - auth_chain_cache: Mutex::new(LruCache::new(1_000_000)), + pdu_cache: Mutex::new(LruCache::new(100_000)), + auth_chain_cache: Mutex::new(LruCache::new(100_000)), }, account_data: account_data::AccountData { roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, diff --git a/src/server_server.rs b/src/server_server.rs index bf5e4f3..68adcd0 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1022,7 +1022,7 @@ pub fn handle_incoming_pdu<'a>( return Ok(None); } - // Load missing prev events first + // 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events fetch_and_handle_events( db, origin, @@ -1033,8 +1033,6 @@ pub fn handle_incoming_pdu<'a>( ) .await; - // TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events - // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities // doing all the checks in this list starting at 1. These are not timeline events. From 03d78b25f2498fdd332ed90eaf5d8f035d260ef9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Wed, 11 Aug 2021 19:15:38 +0200 Subject: [PATCH 2/5] improvement: use u64s in auth chain cache --- src/database/rooms.rs | 232 ++++++++++++++++++------------------------ src/server_server.rs | 43 +++++--- 2 files changed, 132 insertions(+), 143 deletions(-) diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 0f42235..c53fa9e 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -88,7 +88,7 @@ pub struct Rooms { pub(super) referencedevents: Arc, pub(super) pdu_cache: Mutex>>, - pub(super) auth_chain_cache: Mutex, HashSet>>, + pub(super) auth_chain_cache: Mutex>>, } impl Rooms { @@ -315,19 +315,7 @@ impl Rooms { ); let (shortstatehash, already_existed) = - match self.statehash_shortstatehash.get(&state_hash)? { - Some(shortstatehash) => ( - utils::u64_from_bytes(&shortstatehash) - .map_err(|_| Error::bad_database("Invalid shortstatehash in db."))?, - true, - ), - None => { - let shortstatehash = db.globals.next_count()?; - self.statehash_shortstatehash - .insert(&state_hash, &shortstatehash.to_be_bytes())?; - (shortstatehash, false) - } - }; + self.get_or_create_shortstatehash(&state_hash, &db.globals)?; let new_state = if !already_existed { let mut new_state = HashSet::new(); @@ -352,25 +340,14 @@ impl Rooms { } }; - let shorteventid = - match self.eventid_shorteventid.get(eventid.as_bytes()).ok()? { - Some(shorteventid) => shorteventid.to_vec(), - None => { - let shorteventid = db.globals.next_count().ok()?; - self.eventid_shorteventid - .insert(eventid.as_bytes(), &shorteventid.to_be_bytes()) - .ok()?; - self.shorteventid_eventid - .insert(&shorteventid.to_be_bytes(), eventid.as_bytes()) - .ok()?; - shorteventid.to_be_bytes().to_vec() - } - }; + let shorteventid = self + .get_or_create_shorteventid(&eventid, &db.globals) + .ok()?; let mut state_id = shortstatehash.to_be_bytes().to_vec(); state_id.extend_from_slice(&shortstatekey); - Some((state_id, shorteventid)) + Some((state_id, shorteventid.to_be_bytes().to_vec())) }) .collect::>(); @@ -428,6 +405,61 @@ impl Rooms { Ok(()) } + /// Returns (shortstatehash, already_existed) + fn get_or_create_shortstatehash( + &self, + state_hash: &StateHashId, + globals: &super::globals::Globals, + ) -> Result<(u64, bool)> { + Ok(match self.statehash_shortstatehash.get(&state_hash)? { + Some(shortstatehash) => ( + utils::u64_from_bytes(&shortstatehash) + .map_err(|_| Error::bad_database("Invalid shortstatehash in db."))?, + true, + ), + None => { + let shortstatehash = globals.next_count()?; + self.statehash_shortstatehash + .insert(&state_hash, &shortstatehash.to_be_bytes())?; + (shortstatehash, false) + } + }) + } + + /// Returns (shortstatehash, already_existed) + pub fn get_or_create_shorteventid( + &self, + event_id: &EventId, + globals: &super::globals::Globals, + ) -> Result { + Ok(match self.eventid_shorteventid.get(event_id.as_bytes())? { + Some(shorteventid) => utils::u64_from_bytes(&shorteventid) + .map_err(|_| Error::bad_database("Invalid shorteventid in db."))?, + None => { + let shorteventid = globals.next_count()?; + self.eventid_shorteventid + .insert(event_id.as_bytes(), &shorteventid.to_be_bytes())?; + self.shorteventid_eventid + .insert(&shorteventid.to_be_bytes(), event_id.as_bytes())?; + shorteventid + } + }) + } + + pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result { + let bytes = self + .shorteventid_eventid + .get(&shorteventid.to_be_bytes())? + .ok_or_else(|| Error::bad_database("Shorteventid does not exist"))?; + + EventId::try_from( + utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database("EventID in roomid_pduleaves is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid.")) + } + /// Returns the full room state. #[tracing::instrument(skip(self))] pub fn room_state_full( @@ -1116,17 +1148,7 @@ impl Rooms { state: &StateMap>, globals: &super::globals::Globals, ) -> Result<()> { - let shorteventid = match self.eventid_shorteventid.get(event_id.as_bytes())? { - Some(shorteventid) => shorteventid.to_vec(), - None => { - let shorteventid = globals.next_count()?; - self.eventid_shorteventid - .insert(event_id.as_bytes(), &shorteventid.to_be_bytes())?; - self.shorteventid_eventid - .insert(&shorteventid.to_be_bytes(), event_id.as_bytes())?; - shorteventid.to_be_bytes().to_vec() - } - }; + let shorteventid = self.get_or_create_shorteventid(&event_id, globals)?; let state_hash = self.calculate_hash( &state @@ -1135,69 +1157,45 @@ impl Rooms { .collect::>(), ); - let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? { - Some(shortstatehash) => { - // State already existed in db - self.shorteventid_shortstatehash - .insert(&shorteventid, &*shortstatehash)?; - return Ok(()); - } - None => { - let shortstatehash = globals.next_count()?; - self.statehash_shortstatehash - .insert(&state_hash, &shortstatehash.to_be_bytes())?; - shortstatehash.to_be_bytes().to_vec() - } - }; + let (shortstatehash, already_existed) = + self.get_or_create_shortstatehash(&state_hash, globals)?; - let batch = state - .iter() - .filter_map(|((event_type, state_key), pdu)| { - let mut statekey = event_type.as_ref().as_bytes().to_vec(); - statekey.push(0xff); - statekey.extend_from_slice(&state_key.as_bytes()); - - let shortstatekey = match self.statekey_shortstatekey.get(&statekey).ok()? { - Some(shortstatekey) => shortstatekey.to_vec(), - None => { - let shortstatekey = globals.next_count().ok()?; - self.statekey_shortstatekey - .insert(&statekey, &shortstatekey.to_be_bytes()) - .ok()?; - shortstatekey.to_be_bytes().to_vec() - } - }; + if !already_existed { + let batch = state + .iter() + .filter_map(|((event_type, state_key), pdu)| { + let mut statekey = event_type.as_ref().as_bytes().to_vec(); + statekey.push(0xff); + statekey.extend_from_slice(&state_key.as_bytes()); - let shorteventid = match self - .eventid_shorteventid - .get(pdu.event_id.as_bytes()) - .ok()? - { - Some(shorteventid) => shorteventid.to_vec(), - None => { - let shorteventid = globals.next_count().ok()?; - self.eventid_shorteventid - .insert(pdu.event_id.as_bytes(), &shorteventid.to_be_bytes()) - .ok()?; - self.shorteventid_eventid - .insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes()) - .ok()?; - shorteventid.to_be_bytes().to_vec() - } - }; + let shortstatekey = match self.statekey_shortstatekey.get(&statekey).ok()? { + Some(shortstatekey) => shortstatekey.to_vec(), + None => { + let shortstatekey = globals.next_count().ok()?; + self.statekey_shortstatekey + .insert(&statekey, &shortstatekey.to_be_bytes()) + .ok()?; + shortstatekey.to_be_bytes().to_vec() + } + }; - let mut state_id = shortstatehash.clone(); - state_id.extend_from_slice(&shortstatekey); + let shorteventid = self + .get_or_create_shorteventid(&pdu.event_id, globals) + .ok()?; - Some((state_id, shorteventid)) - }) - .collect::>(); + let mut state_id = shortstatehash.to_be_bytes().to_vec(); + state_id.extend_from_slice(&shortstatekey); + + Some((state_id, shorteventid.to_be_bytes().to_vec())) + }) + .collect::>(); - self.stateid_shorteventid - .insert_batch(&mut batch.into_iter())?; + self.stateid_shorteventid + .insert_batch(&mut batch.into_iter())?; + } self.shorteventid_shortstatehash - .insert(&shorteventid, &*shortstatehash)?; + .insert(&shorteventid.to_be_bytes(), &shortstatehash.to_be_bytes())?; Ok(()) } @@ -1212,26 +1210,16 @@ impl Rooms { new_pdu: &PduEvent, globals: &super::globals::Globals, ) -> Result { + let shorteventid = self.get_or_create_shorteventid(&new_pdu.event_id, globals)?; + let old_state = if let Some(old_shortstatehash) = self.roomid_shortstatehash.get(new_pdu.room_id.as_bytes())? { // Store state for event. The state does not include the event itself. // Instead it's the state before the pdu, so the room's old state. - - let shorteventid = match self.eventid_shorteventid.get(new_pdu.event_id.as_bytes())? { - Some(shorteventid) => shorteventid.to_vec(), - None => { - let shorteventid = globals.next_count()?; - self.eventid_shorteventid - .insert(new_pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?; - self.shorteventid_eventid - .insert(&shorteventid.to_be_bytes(), new_pdu.event_id.as_bytes())?; - shorteventid.to_be_bytes().to_vec() - } - }; - self.shorteventid_shortstatehash - .insert(&shorteventid, &old_shortstatehash)?; + .insert(&shorteventid.to_be_bytes(), &old_shortstatehash)?; + if new_pdu.state_key.is_none() { return utils::u64_from_bytes(&old_shortstatehash).map_err(|_| { Error::bad_database("Invalid shortstatehash in roomid_shortstatehash.") @@ -1264,19 +1252,7 @@ impl Rooms { } }; - let shorteventid = match self.eventid_shorteventid.get(new_pdu.event_id.as_bytes())? { - Some(shorteventid) => shorteventid.to_vec(), - None => { - let shorteventid = globals.next_count()?; - self.eventid_shorteventid - .insert(new_pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?; - self.shorteventid_eventid - .insert(&shorteventid.to_be_bytes(), new_pdu.event_id.as_bytes())?; - shorteventid.to_be_bytes().to_vec() - } - }; - - new_state.insert(shortstatekey, shorteventid); + new_state.insert(shortstatekey, shorteventid.to_be_bytes().to_vec()); let new_state_hash = self.calculate_hash( &new_state @@ -1516,11 +1492,7 @@ impl Rooms { ); // Generate short event id - let shorteventid = db.globals.next_count()?; - self.eventid_shorteventid - .insert(pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?; - self.shorteventid_eventid - .insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes())?; + let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id, &db.globals)?; // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. @@ -2655,9 +2627,7 @@ impl Rooms { } #[tracing::instrument(skip(self))] - pub fn auth_chain_cache( - &self, - ) -> std::sync::MutexGuard<'_, LruCache, HashSet>> { + pub fn auth_chain_cache(&self) -> std::sync::MutexGuard<'_, LruCache>> { self.auth_chain_cache.lock().unwrap() } } diff --git a/src/server_server.rs b/src/server_server.rs index 68adcd0..23c80ee 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1044,13 +1044,16 @@ pub fn handle_incoming_pdu<'a>( if incoming_pdu.prev_events.len() == 1 { let prev_event = &incoming_pdu.prev_events[0]; - let state = db + let prev_event_sstatehash = db .rooms .pdu_shortstatehash(prev_event) - .map_err(|_| "Failed talking to db".to_owned())? - .map(|shortstatehash| db.rooms.state_full_ids(shortstatehash).ok()) - .flatten(); - if let Some(state) = state { + .map_err(|_| "Failed talking to db".to_owned())?; + + let state = + prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash)); + + if let Some(Ok(state)) = state { + warn!("Using cached state"); let mut state = fetch_and_handle_events( db, origin, @@ -1088,6 +1091,7 @@ pub fn handle_incoming_pdu<'a>( } if state_at_incoming_event.is_none() { + warn!("Calling /state_ids"); // Call /state_ids to find out what the state at this pdu is. We trust the server's // response to some extend, but we still do a lot of checks on the events match db @@ -1755,35 +1759,50 @@ fn append_incoming_pdu( fn get_auth_chain(starting_events: Vec, db: &Database) -> Result> { let mut full_auth_chain = HashSet::new(); + let starting_events = starting_events + .iter() + .map(|id| { + (db.rooms + .get_or_create_shorteventid(id, &db.globals) + .map(|s| (s, id))) + }) + .collect::>>()?; + let mut cache = db.rooms.auth_chain_cache(); - for event_id in &starting_events { - if let Some(cached) = cache.get_mut(&[event_id.clone()][..]) { + for (sevent_id, event_id) in starting_events { + if let Some(cached) = cache.get_mut(&sevent_id) { full_auth_chain.extend(cached.iter().cloned()); } else { drop(cache); let mut auth_chain = HashSet::new(); get_auth_chain_recursive(&event_id, &mut auth_chain, db)?; cache = db.rooms.auth_chain_cache(); - cache.insert(vec![event_id.clone()], auth_chain.clone()); + cache.insert(sevent_id, auth_chain.clone()); full_auth_chain.extend(auth_chain); }; } - Ok(full_auth_chain) + full_auth_chain + .into_iter() + .map(|sid| db.rooms.get_eventid_from_short(sid)) + .collect() } fn get_auth_chain_recursive( event_id: &EventId, - found: &mut HashSet, + found: &mut HashSet, db: &Database, ) -> Result<()> { let r = db.rooms.get_pdu(&event_id); match r { Ok(Some(pdu)) => { for auth_event in &pdu.auth_events { - if !found.contains(auth_event) { - found.insert(auth_event.clone()); + let sauthevent = db + .rooms + .get_or_create_shorteventid(auth_event, &db.globals)?; + if !found.contains(&sauthevent) { + found.insert(sauthevent); get_auth_chain_recursive(&auth_event, found, db)?; } } From e46f2199c9f59887b4bdd2fe16becb0e85186311 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Wed, 11 Aug 2021 21:14:22 +0200 Subject: [PATCH 3/5] improvement: cache for short event ids --- src/database.rs | 1 + src/database/rooms.rs | 16 ++++++++++++++-- src/server_server.rs | 20 +++++++++----------- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/database.rs b/src/database.rs index 1bf9434..7996057 100644 --- a/src/database.rs +++ b/src/database.rs @@ -272,6 +272,7 @@ impl Database { referencedevents: builder.open_tree("referencedevents")?, pdu_cache: Mutex::new(LruCache::new(100_000)), auth_chain_cache: Mutex::new(LruCache::new(100_000)), + shorteventid_cache: Mutex::new(LruCache::new(100_000)), }, account_data: account_data::AccountData { roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, diff --git a/src/database/rooms.rs b/src/database/rooms.rs index c53fa9e..246aa0b 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -89,6 +89,7 @@ pub struct Rooms { pub(super) pdu_cache: Mutex>>, pub(super) auth_chain_cache: Mutex>>, + pub(super) shorteventid_cache: Mutex>, } impl Rooms { @@ -447,17 +448,28 @@ impl Rooms { } pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result { + if let Some(id) = self.shorteventid_cache.lock().unwrap().get_mut(&shorteventid) { + return Ok(id.clone()); + } + let bytes = self .shorteventid_eventid .get(&shorteventid.to_be_bytes())? .ok_or_else(|| Error::bad_database("Shorteventid does not exist"))?; - EventId::try_from( + let event_id = EventId::try_from( utils::string_from_bytes(&bytes).map_err(|_| { Error::bad_database("EventID in roomid_pduleaves is invalid unicode.") })?, ) - .map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid.")) + .map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))?; + + self.shorteventid_cache + .lock() + .unwrap() + .insert(shorteventid, event_id.clone()); + + Ok(event_id) } /// Returns the full room state. diff --git a/src/server_server.rs b/src/server_server.rs index 23c80ee..0b9a7e6 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1311,7 +1311,7 @@ pub fn handle_incoming_pdu<'a>( for state in fork_states { auth_chain_sets.push( get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db) - .map_err(|_| "Failed to load auth chain.".to_owned())?, + .map_err(|_| "Failed to load auth chain.".to_owned())?.collect(), ); } @@ -1756,15 +1756,15 @@ fn append_incoming_pdu( Ok(pdu_id) } -fn get_auth_chain(starting_events: Vec, db: &Database) -> Result> { +fn get_auth_chain(starting_events: Vec, db: &Database) -> Result + '_> { let mut full_auth_chain = HashSet::new(); let starting_events = starting_events .iter() .map(|id| { - (db.rooms + db.rooms .get_or_create_shorteventid(id, &db.globals) - .map(|s| (s, id))) + .map(|s| (s, id)) }) .collect::>>()?; @@ -1783,10 +1783,11 @@ fn get_auth_chain(starting_events: Vec, db: &Database) -> Result(PduEvent::convert_to_outgoing_federation_event( db.rooms.get_pdu_json(&id)?.unwrap(), @@ -1996,7 +1995,7 @@ pub fn get_room_state_ids_route( let auth_chain_ids = get_auth_chain(vec![body.event_id.clone()], &db)?; Ok(get_room_state_ids::v1::Response { - auth_chain_ids: auth_chain_ids.into_iter().collect(), + auth_chain_ids: auth_chain_ids.collect(), pdu_ids, } .into()) @@ -2265,7 +2264,6 @@ pub async fn create_join_event_route( Ok(create_join_event::v2::Response { room_state: RoomState { auth_chain: auth_chain_ids - .iter() .filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten()) .map(PduEvent::convert_to_outgoing_federation_event) .collect(), From b0a2877f2091ff04c516ada06abef04f483f22fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Wed, 11 Aug 2021 21:17:01 +0200 Subject: [PATCH 4/5] less warnings --- src/database/abstraction/sqlite.rs | 39 +----------------------------- src/server_server.rs | 3 ++- 2 files changed, 3 insertions(+), 39 deletions(-) diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 72fb5f7..f420021 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -12,9 +12,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::sync::oneshot::Sender; -use tracing::{debug, warn}; - -pub const MILLI: Duration = Duration::from_millis(1); +use tracing::debug; thread_local! { static READ_CONNECTION: RefCell> = RefCell::new(None); @@ -164,16 +162,7 @@ impl Tree for SqliteTable { #[tracing::instrument(skip(self, key, value))] fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { let guard = self.engine.write_lock(); - - let start = Instant::now(); - self.insert_with_guard(&guard, key, value)?; - - let elapsed = start.elapsed(); - if elapsed > MILLI { - warn!("insert took {:?} : {}", elapsed, &self.name); - } - drop(guard); let watchers = self.watchers.read(); @@ -220,20 +209,11 @@ impl Tree for SqliteTable { fn remove(&self, key: &[u8]) -> Result<()> { let guard = self.engine.write_lock(); - let start = Instant::now(); - guard.execute( format!("DELETE FROM {} WHERE key = ?", self.name).as_str(), [key], )?; - let elapsed = start.elapsed(); - - if elapsed > MILLI { - debug!("remove: took {:012?} : {}", elapsed, &self.name); - } - // debug!("remove key: {:?}", &key); - Ok(()) } @@ -326,8 +306,6 @@ impl Tree for SqliteTable { fn increment(&self, key: &[u8]) -> Result> { let guard = self.engine.write_lock(); - let start = Instant::now(); - let old = self.get_with_guard(&guard, key)?; let new = @@ -335,26 +313,11 @@ impl Tree for SqliteTable { self.insert_with_guard(&guard, key, &new)?; - let elapsed = start.elapsed(); - - if elapsed > MILLI { - debug!("increment: took {:012?} : {}", elapsed, &self.name); - } - // debug!("increment key: {:?}", &key); - Ok(new) } #[tracing::instrument(skip(self, prefix))] fn scan_prefix<'a>(&'a self, prefix: Vec) -> Box + 'a> { - // let name = self.name.clone(); - // self.iter_from_thread( - // format!( - // "SELECT key, value FROM {} WHERE key BETWEEN ?1 AND ?1 || X'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF' ORDER BY key ASC", - // name - // ) - // [prefix] - // ) Box::new( self.iter_from(&prefix, false) .take_while(move |(key, _)| key.starts_with(&prefix)), diff --git a/src/server_server.rs b/src/server_server.rs index 0b9a7e6..a4c90a7 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -721,7 +721,8 @@ pub async fn send_transaction_message_route( &db.globals, )?; } else { - warn!("No known event ids in read receipt: {:?}", user_updates); + // TODO fetch missing events + debug!("No known event ids in read receipt: {:?}", user_updates); } } } From 3e79b15bbf18e9d9d62f7f23b950894c491d1a7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Thu, 12 Aug 2021 17:55:16 +0200 Subject: [PATCH 5/5] fix: long prev event fetch times for huge rooms --- src/database.rs | 2 +- src/database/rooms.rs | 49 ++++++++++++++++++------------------------- src/server_server.rs | 36 ++++++++++++++++++++++++------- 3 files changed, 49 insertions(+), 38 deletions(-) diff --git a/src/database.rs b/src/database.rs index 7996057..7a17e53 100644 --- a/src/database.rs +++ b/src/database.rs @@ -272,7 +272,7 @@ impl Database { referencedevents: builder.open_tree("referencedevents")?, pdu_cache: Mutex::new(LruCache::new(100_000)), auth_chain_cache: Mutex::new(LruCache::new(100_000)), - shorteventid_cache: Mutex::new(LruCache::new(100_000)), + shorteventid_cache: Mutex::new(LruCache::new(1_000_000)), }, account_data: account_data::AccountData { roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 246aa0b..88878e9 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -99,15 +99,11 @@ impl Rooms { Ok(self .stateid_shorteventid .scan_prefix(shortstatehash.to_be_bytes().to_vec()) - .map(|(_, bytes)| self.shorteventid_eventid.get(&bytes).ok().flatten()) - .flatten() - .map(|bytes| { - EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| { - Error::bad_database("EventID in stateid_shorteventid is invalid unicode.") - })?) - .map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid.")) + .map(|(_, bytes)| { + self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap()) + .ok() }) - .filter_map(|r| r.ok()) + .flatten() .collect()) } @@ -118,15 +114,11 @@ impl Rooms { let state = self .stateid_shorteventid .scan_prefix(shortstatehash.to_be_bytes().to_vec()) - .map(|(_, bytes)| self.shorteventid_eventid.get(&bytes).ok().flatten()) - .flatten() - .map(|bytes| { - EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| { - Error::bad_database("EventID in stateid_shorteventid is invalid unicode.") - })?) - .map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid.")) + .map(|(_, bytes)| { + self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap()) + .ok() }) - .filter_map(|r| r.ok()) + .flatten() .map(|eventid| self.get_pdu(&eventid)) .filter_map(|r| r.ok().flatten()) .map(|pdu| { @@ -168,15 +160,10 @@ impl Rooms { Ok(self .stateid_shorteventid .get(&stateid)? - .map(|bytes| self.shorteventid_eventid.get(&bytes).ok().flatten()) - .flatten() .map(|bytes| { - EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| { - Error::bad_database("EventID in stateid_shorteventid is invalid unicode.") - })?) - .map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid.")) + self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap()) + .ok() }) - .map(|r| r.ok()) .flatten()) } else { Ok(None) @@ -448,7 +435,12 @@ impl Rooms { } pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result { - if let Some(id) = self.shorteventid_cache.lock().unwrap().get_mut(&shorteventid) { + if let Some(id) = self + .shorteventid_cache + .lock() + .unwrap() + .get_mut(&shorteventid) + { return Ok(id.clone()); } @@ -457,12 +449,11 @@ impl Rooms { .get(&shorteventid.to_be_bytes())? .ok_or_else(|| Error::bad_database("Shorteventid does not exist"))?; - let event_id = EventId::try_from( - utils::string_from_bytes(&bytes).map_err(|_| { + let event_id = + EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| { Error::bad_database("EventID in roomid_pduleaves is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))?; + })?) + .map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))?; self.shorteventid_cache .lock() diff --git a/src/server_server.rs b/src/server_server.rs index a4c90a7..b3f0353 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -668,7 +668,7 @@ pub async fn send_transaction_message_route( let elapsed = start_time.elapsed(); warn!( - "Handling event {} took {}m{}s", + "Handling transaction of event {} took {}m{}s", event_id, elapsed.as_secs() / 60, elapsed.as_secs() % 60 @@ -850,6 +850,8 @@ pub fn handle_incoming_pdu<'a>( pub_key_map: &'a RwLock>>, ) -> AsyncRecursiveType<'a, StdResult>, String>> { Box::pin(async move { + let start_time = Instant::now(); + // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json match db.rooms.exists(&room_id) { Ok(true) => {} @@ -1014,12 +1016,18 @@ pub fn handle_incoming_pdu<'a>( // 8. if not timeline event: stop if !is_timeline_event || incoming_pdu.origin_server_ts - < db.rooms - .first_pdu_in_room(&room_id) - .map_err(|_| "Error loading first room event.".to_owned())? - .expect("Room exists") - .origin_server_ts + < (utils::millis_since_unix_epoch() - 1000 * 60 * 20) + .try_into() + .expect("time is valid") + // Not older than 20 mins { + let elapsed = start_time.elapsed(); + warn!( + "Handling outlier event {} took {}m{}s", + event_id, + elapsed.as_secs() / 60, + elapsed.as_secs() % 60 + ); return Ok(None); } @@ -1312,7 +1320,8 @@ pub fn handle_incoming_pdu<'a>( for state in fork_states { auth_chain_sets.push( get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db) - .map_err(|_| "Failed to load auth chain.".to_owned())?.collect(), + .map_err(|_| "Failed to load auth chain.".to_owned())? + .collect(), ); } @@ -1385,6 +1394,14 @@ pub fn handle_incoming_pdu<'a>( // Event has passed all auth/stateres checks drop(state_lock); + + let elapsed = start_time.elapsed(); + warn!( + "Handling timeline event {} took {}m{}s", + event_id, + elapsed.as_secs() / 60, + elapsed.as_secs() % 60 + ); Ok(pdu_id) }) } @@ -1757,7 +1774,10 @@ fn append_incoming_pdu( Ok(pdu_id) } -fn get_auth_chain(starting_events: Vec, db: &Database) -> Result + '_> { +fn get_auth_chain( + starting_events: Vec, + db: &Database, +) -> Result + '_> { let mut full_auth_chain = HashSet::new(); let starting_events = starting_events