|
|
|
|
@ -69,6 +69,10 @@ pub struct Rooms {
@@ -69,6 +69,10 @@ pub struct Rooms {
|
|
|
|
|
/// RoomId + EventId -> outlier PDU.
|
|
|
|
|
/// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.
|
|
|
|
|
pub(super) roomeventid_outlierpdu: sled::Tree, |
|
|
|
|
/// RoomId + EventId -> count of the last known pdu when the outlier was inserted.
|
|
|
|
|
/// This allows us to skip any state snapshots that would for sure not have the outlier.
|
|
|
|
|
pub(super) roomeventid_outlierpducount: sled::Tree, |
|
|
|
|
|
|
|
|
|
/// RoomId + EventId -> Parent PDU EventId.
|
|
|
|
|
pub(super) prevevent_parent: sled::Tree, |
|
|
|
|
} |
|
|
|
|
@ -323,6 +327,15 @@ impl Rooms {
@@ -323,6 +327,15 @@ impl Rooms {
|
|
|
|
|
.map_or(Ok(None), |pdu_id| self.pdu_count(&pdu_id).map(Some)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn latest_pdu_count(&self, room_id: &RoomId) -> Result<u64> { |
|
|
|
|
self.pduid_pdu |
|
|
|
|
.scan_prefix(room_id.as_bytes()) |
|
|
|
|
.last() |
|
|
|
|
.map(|b| self.pdu_count(&b?.0)) |
|
|
|
|
.transpose() |
|
|
|
|
.map(|op| op.unwrap_or_default()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns the json of a pdu.
|
|
|
|
|
pub fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<serde_json::Value>> { |
|
|
|
|
self.eventid_pduid |
|
|
|
|
@ -490,6 +503,8 @@ impl Rooms {
@@ -490,6 +503,8 @@ impl Rooms {
|
|
|
|
|
&key, |
|
|
|
|
&*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"), |
|
|
|
|
)?; |
|
|
|
|
self.roomeventid_outlierpducount |
|
|
|
|
.insert(&key, &self.latest_pdu_count(pdu.room_id())?.to_be_bytes())?; |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -537,7 +552,45 @@ impl Rooms {
@@ -537,7 +552,45 @@ impl Rooms {
|
|
|
|
|
let mut key = pdu.room_id().as_bytes().to_vec(); |
|
|
|
|
key.push(0xff); |
|
|
|
|
key.extend_from_slice(pdu.event_id().as_bytes()); |
|
|
|
|
self.roomeventid_outlierpdu.remove(key)?; |
|
|
|
|
if self.roomeventid_outlierpdu.remove(&key)?.is_some() { |
|
|
|
|
if let Some(state_key) = pdu.state_key.as_deref() { |
|
|
|
|
let mut statekey = pdu.kind().as_ref().as_bytes().to_vec(); |
|
|
|
|
statekey.extend_from_slice(state_key.as_bytes()); |
|
|
|
|
|
|
|
|
|
let short = match self.statekey_short.get(&statekey)? { |
|
|
|
|
Some(short) => utils::u64_from_bytes(&short).map_err(|_| { |
|
|
|
|
Error::bad_database("Invalid short bytes in statekey_short.") |
|
|
|
|
})?, |
|
|
|
|
None => { |
|
|
|
|
error!( |
|
|
|
|
"This event has been inserted into the state snapshot tree previously." |
|
|
|
|
); |
|
|
|
|
let short = db.globals.next_count()?; |
|
|
|
|
self.statekey_short |
|
|
|
|
.insert(&statekey, &short.to_be_bytes())?; |
|
|
|
|
short |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let mut start = pdu.room_id().as_bytes().to_vec(); |
|
|
|
|
start.extend_from_slice( |
|
|
|
|
&self |
|
|
|
|
.roomeventid_outlierpducount |
|
|
|
|
.get(&key)? |
|
|
|
|
.unwrap_or_default(), |
|
|
|
|
); |
|
|
|
|
for hash in self.pduid_statehash.range(start..).values() { |
|
|
|
|
let mut hash = hash?.to_vec(); |
|
|
|
|
hash.extend_from_slice(&short.to_be_bytes()); |
|
|
|
|
|
|
|
|
|
let _ = self.stateid_pduid.compare_and_swap( |
|
|
|
|
hash, |
|
|
|
|
Some(pdu.event_id().as_bytes()), |
|
|
|
|
Some(pdu_id.as_ref()), |
|
|
|
|
)?; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// We must keep track of all events that have been referenced.
|
|
|
|
|
for leaf in leaves { |
|
|
|
|
|