|
|
|
@ -24,7 +24,7 @@ use ruma::{ |
|
|
|
use std::{ |
|
|
|
use std::{ |
|
|
|
collections::{BTreeMap, BTreeSet, HashMap, HashSet}, |
|
|
|
collections::{BTreeMap, BTreeSet, HashMap, HashSet}, |
|
|
|
convert::{TryFrom, TryInto}, |
|
|
|
convert::{TryFrom, TryInto}, |
|
|
|
mem, |
|
|
|
mem::size_of, |
|
|
|
sync::{Arc, Mutex}, |
|
|
|
sync::{Arc, Mutex}, |
|
|
|
}; |
|
|
|
}; |
|
|
|
use tokio::sync::MutexGuard; |
|
|
|
use tokio::sync::MutexGuard; |
|
|
|
@ -37,10 +37,11 @@ use super::{abstraction::Tree, admin::AdminCommand, pusher}; |
|
|
|
/// This is created when a state group is added to the database by
|
|
|
|
/// This is created when a state group is added to the database by
|
|
|
|
/// hashing the entire state.
|
|
|
|
/// hashing the entire state.
|
|
|
|
pub type StateHashId = Vec<u8>; |
|
|
|
pub type StateHashId = Vec<u8>; |
|
|
|
|
|
|
|
pub type CompressedStateEvent = [u8; 2 * size_of::<u64>()]; |
|
|
|
|
|
|
|
|
|
|
|
pub struct Rooms { |
|
|
|
pub struct Rooms { |
|
|
|
pub edus: edus::RoomEdus, |
|
|
|
pub edus: edus::RoomEdus, |
|
|
|
pub(super) pduid_pdu: Arc<dyn Tree>, // PduId = RoomId + Count
|
|
|
|
pub(super) pduid_pdu: Arc<dyn Tree>, // PduId = ShortRoomId + Count
|
|
|
|
pub(super) eventid_pduid: Arc<dyn Tree>, |
|
|
|
pub(super) eventid_pduid: Arc<dyn Tree>, |
|
|
|
pub(super) roomid_pduleaves: Arc<dyn Tree>, |
|
|
|
pub(super) roomid_pduleaves: Arc<dyn Tree>, |
|
|
|
pub(super) alias_roomid: Arc<dyn Tree>, |
|
|
|
pub(super) alias_roomid: Arc<dyn Tree>, |
|
|
|
@ -79,9 +80,6 @@ pub struct Rooms { |
|
|
|
pub(super) eventid_shorteventid: Arc<dyn Tree>, |
|
|
|
pub(super) eventid_shorteventid: Arc<dyn Tree>, |
|
|
|
|
|
|
|
|
|
|
|
pub(super) statehash_shortstatehash: Arc<dyn Tree>, |
|
|
|
pub(super) statehash_shortstatehash: Arc<dyn Tree>, |
|
|
|
/// ShortStateHash = Count
|
|
|
|
|
|
|
|
/// StateId = ShortStateHash
|
|
|
|
|
|
|
|
pub(super) stateid_shorteventid: Arc<dyn Tree>, |
|
|
|
|
|
|
|
pub(super) shortstatehash_statediff: Arc<dyn Tree>, // StateDiff = parent (or 0) + (shortstatekey+shorteventid++) + 0_u64 + (shortstatekey+shorteventid--)
|
|
|
|
pub(super) shortstatehash_statediff: Arc<dyn Tree>, // StateDiff = parent (or 0) + (shortstatekey+shorteventid++) + 0_u64 + (shortstatekey+shorteventid--)
|
|
|
|
|
|
|
|
|
|
|
|
/// RoomId + EventId -> outlier PDU.
|
|
|
|
/// RoomId + EventId -> outlier PDU.
|
|
|
|
@ -100,29 +98,30 @@ impl Rooms { |
|
|
|
/// Builds a StateMap by iterating over all keys that start
|
|
|
|
/// Builds a StateMap by iterating over all keys that start
|
|
|
|
/// with state_hash, this gives the full state for the given state_hash.
|
|
|
|
/// with state_hash, this gives the full state for the given state_hash.
|
|
|
|
pub fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeSet<EventId>> { |
|
|
|
pub fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeSet<EventId>> { |
|
|
|
Ok(self |
|
|
|
let full_state = self |
|
|
|
.stateid_shorteventid |
|
|
|
.load_shortstatehash_info(shortstatehash)? |
|
|
|
.scan_prefix(shortstatehash.to_be_bytes().to_vec()) |
|
|
|
.pop() |
|
|
|
.map(|(_, bytes)| { |
|
|
|
.expect("there is always one layer") |
|
|
|
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap()) |
|
|
|
.1; |
|
|
|
.ok() |
|
|
|
full_state |
|
|
|
}) |
|
|
|
.into_iter() |
|
|
|
.flatten() |
|
|
|
.map(|compressed| self.parse_compressed_state_event(compressed)) |
|
|
|
.collect()) |
|
|
|
.collect() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub fn state_full( |
|
|
|
pub fn state_full( |
|
|
|
&self, |
|
|
|
&self, |
|
|
|
shortstatehash: u64, |
|
|
|
shortstatehash: u64, |
|
|
|
) -> Result<HashMap<(EventType, String), Arc<PduEvent>>> { |
|
|
|
) -> Result<HashMap<(EventType, String), Arc<PduEvent>>> { |
|
|
|
let state = self |
|
|
|
let full_state = self |
|
|
|
.stateid_shorteventid |
|
|
|
.load_shortstatehash_info(shortstatehash)? |
|
|
|
.scan_prefix(shortstatehash.to_be_bytes().to_vec()) |
|
|
|
.pop() |
|
|
|
.map(|(_, bytes)| { |
|
|
|
.expect("there is always one layer") |
|
|
|
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap()) |
|
|
|
.1; |
|
|
|
.ok() |
|
|
|
Ok(full_state |
|
|
|
}) |
|
|
|
.into_iter() |
|
|
|
.flatten() |
|
|
|
.map(|compressed| self.parse_compressed_state_event(compressed)) |
|
|
|
|
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
.map(|eventid| self.get_pdu(&eventid)) |
|
|
|
.map(|eventid| self.get_pdu(&eventid)) |
|
|
|
.filter_map(|r| r.ok().flatten()) |
|
|
|
.filter_map(|r| r.ok().flatten()) |
|
|
|
.map(|pdu| { |
|
|
|
.map(|pdu| { |
|
|
|
@ -138,9 +137,7 @@ impl Rooms { |
|
|
|
)) |
|
|
|
)) |
|
|
|
}) |
|
|
|
}) |
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
.collect(); |
|
|
|
.collect()) |
|
|
|
|
|
|
|
|
|
|
|
Ok(state) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
|
|
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
|
|
@ -151,27 +148,19 @@ impl Rooms { |
|
|
|
event_type: &EventType, |
|
|
|
event_type: &EventType, |
|
|
|
state_key: &str, |
|
|
|
state_key: &str, |
|
|
|
) -> Result<Option<EventId>> { |
|
|
|
) -> Result<Option<EventId>> { |
|
|
|
let mut key = event_type.as_ref().as_bytes().to_vec(); |
|
|
|
let shortstatekey = match self.get_shortstatekey(event_type, state_key)? { |
|
|
|
key.push(0xff); |
|
|
|
Some(s) => s, |
|
|
|
key.extend_from_slice(&state_key.as_bytes()); |
|
|
|
None => return Ok(None), |
|
|
|
|
|
|
|
}; |
|
|
|
let shortstatekey = self.statekey_shortstatekey.get(&key)?; |
|
|
|
let full_state = self |
|
|
|
|
|
|
|
.load_shortstatehash_info(shortstatehash)? |
|
|
|
if let Some(shortstatekey) = shortstatekey { |
|
|
|
.pop() |
|
|
|
let mut stateid = shortstatehash.to_be_bytes().to_vec(); |
|
|
|
.expect("there is always one layer") |
|
|
|
stateid.extend_from_slice(&shortstatekey); |
|
|
|
.1; |
|
|
|
|
|
|
|
Ok(full_state |
|
|
|
Ok(self |
|
|
|
.into_iter() |
|
|
|
.stateid_shorteventid |
|
|
|
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())) |
|
|
|
.get(&stateid)? |
|
|
|
.and_then(|compressed| self.parse_compressed_state_event(compressed).ok())) |
|
|
|
.map(|bytes| { |
|
|
|
|
|
|
|
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap()) |
|
|
|
|
|
|
|
.ok() |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
.flatten()) |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
Ok(None) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
|
|
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
|
|
@ -260,8 +249,7 @@ impl Rooms { |
|
|
|
|
|
|
|
|
|
|
|
/// Checks if a room exists.
|
|
|
|
/// Checks if a room exists.
|
|
|
|
pub fn exists(&self, room_id: &RoomId) -> Result<bool> { |
|
|
|
pub fn exists(&self, room_id: &RoomId) -> Result<bool> { |
|
|
|
let mut prefix = room_id.as_bytes().to_vec(); |
|
|
|
let prefix = self.get_shortroomid(room_id)?.to_be_bytes().to_vec(); |
|
|
|
prefix.push(0xff); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Look for PDUs in that room.
|
|
|
|
// Look for PDUs in that room.
|
|
|
|
Ok(self |
|
|
|
Ok(self |
|
|
|
@ -274,8 +262,7 @@ impl Rooms { |
|
|
|
|
|
|
|
|
|
|
|
/// Checks if a room exists.
|
|
|
|
/// Checks if a room exists.
|
|
|
|
pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> { |
|
|
|
pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> { |
|
|
|
let mut prefix = room_id.as_bytes().to_vec(); |
|
|
|
let prefix = self.get_shortroomid(room_id)?.to_be_bytes().to_vec(); |
|
|
|
prefix.push(0xff); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Look for PDUs in that room.
|
|
|
|
// Look for PDUs in that room.
|
|
|
|
self.pduid_pdu |
|
|
|
self.pduid_pdu |
|
|
|
@ -292,74 +279,78 @@ impl Rooms { |
|
|
|
|
|
|
|
|
|
|
|
/// Force the creation of a new StateHash and insert it into the db.
|
|
|
|
/// Force the creation of a new StateHash and insert it into the db.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot.
|
|
|
|
/// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot.
|
|
|
|
pub fn force_state( |
|
|
|
pub fn force_state( |
|
|
|
&self, |
|
|
|
&self, |
|
|
|
room_id: &RoomId, |
|
|
|
room_id: &RoomId, |
|
|
|
state: HashMap<(EventType, String), EventId>, |
|
|
|
new_state: HashMap<(EventType, String), EventId>, |
|
|
|
db: &Database, |
|
|
|
db: &Database, |
|
|
|
) -> Result<()> { |
|
|
|
) -> Result<()> { |
|
|
|
|
|
|
|
let previous_shortstatehash = self.current_shortstatehash(&room_id)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let new_state_ids_compressed = new_state |
|
|
|
|
|
|
|
.iter() |
|
|
|
|
|
|
|
.filter_map(|((event_type, state_key), event_id)| { |
|
|
|
|
|
|
|
let shortstatekey = self |
|
|
|
|
|
|
|
.get_or_create_shortstatekey(event_type, state_key, &db.globals) |
|
|
|
|
|
|
|
.ok()?; |
|
|
|
|
|
|
|
Some( |
|
|
|
|
|
|
|
self.compress_state_event(shortstatekey, event_id, &db.globals) |
|
|
|
|
|
|
|
.ok()?, |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
.collect::<HashSet<_>>(); |
|
|
|
|
|
|
|
|
|
|
|
let state_hash = self.calculate_hash( |
|
|
|
let state_hash = self.calculate_hash( |
|
|
|
&state |
|
|
|
&new_state |
|
|
|
.values() |
|
|
|
.values() |
|
|
|
.map(|event_id| event_id.as_bytes()) |
|
|
|
.map(|event_id| event_id.as_bytes()) |
|
|
|
.collect::<Vec<_>>(), |
|
|
|
.collect::<Vec<_>>(), |
|
|
|
); |
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
let (shortstatehash, already_existed) = |
|
|
|
let (new_shortstatehash, already_existed) = |
|
|
|
self.get_or_create_shortstatehash(&state_hash, &db.globals)?; |
|
|
|
self.get_or_create_shortstatehash(&state_hash, &db.globals)?; |
|
|
|
|
|
|
|
|
|
|
|
let new_state = if !already_existed { |
|
|
|
if Some(new_shortstatehash) == previous_shortstatehash { |
|
|
|
let mut new_state = HashSet::new(); |
|
|
|
return Ok(()); |
|
|
|
|
|
|
|
} |
|
|
|
let batch = state |
|
|
|
|
|
|
|
.iter() |
|
|
|
|
|
|
|
.filter_map(|((event_type, state_key), eventid)| { |
|
|
|
|
|
|
|
new_state.insert(eventid.clone()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut statekey = event_type.as_ref().as_bytes().to_vec(); |
|
|
|
|
|
|
|
statekey.push(0xff); |
|
|
|
|
|
|
|
statekey.extend_from_slice(&state_key.as_bytes()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let shortstatekey = match self.statekey_shortstatekey.get(&statekey).ok()? { |
|
|
|
|
|
|
|
Some(shortstatekey) => shortstatekey.to_vec(), |
|
|
|
|
|
|
|
None => { |
|
|
|
|
|
|
|
let shortstatekey = db.globals.next_count().ok()?; |
|
|
|
|
|
|
|
self.statekey_shortstatekey |
|
|
|
|
|
|
|
.insert(&statekey, &shortstatekey.to_be_bytes()) |
|
|
|
|
|
|
|
.ok()?; |
|
|
|
|
|
|
|
shortstatekey.to_be_bytes().to_vec() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let shorteventid = self |
|
|
|
|
|
|
|
.get_or_create_shorteventid(&eventid, &db.globals) |
|
|
|
|
|
|
|
.ok()?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut state_id = shortstatehash.to_be_bytes().to_vec(); |
|
|
|
|
|
|
|
state_id.extend_from_slice(&shortstatekey); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Some((state_id, shorteventid.to_be_bytes().to_vec())) |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.stateid_shorteventid |
|
|
|
let states_parents = previous_shortstatehash |
|
|
|
.insert_batch(&mut batch.into_iter())?; |
|
|
|
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?; |
|
|
|
|
|
|
|
|
|
|
|
new_state |
|
|
|
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last() |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
let statediffnew = new_state_ids_compressed |
|
|
|
|
|
|
|
.difference(&parent_stateinfo.1) |
|
|
|
|
|
|
|
.cloned() |
|
|
|
|
|
|
|
.collect::<HashSet<_>>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let statediffremoved = parent_stateinfo |
|
|
|
|
|
|
|
.1 |
|
|
|
|
|
|
|
.difference(&new_state_ids_compressed) |
|
|
|
|
|
|
|
.cloned() |
|
|
|
|
|
|
|
.collect::<HashSet<_>>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
(statediffnew, statediffremoved) |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
self.state_full_ids(shortstatehash)?.into_iter().collect() |
|
|
|
(new_state_ids_compressed, HashSet::new()) |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
let old_state = self |
|
|
|
if !already_existed { |
|
|
|
.current_shortstatehash(&room_id)? |
|
|
|
self.save_state_from_diff( |
|
|
|
.map(|s| self.state_full_ids(s)) |
|
|
|
new_shortstatehash, |
|
|
|
.transpose()? |
|
|
|
statediffnew.clone(), |
|
|
|
.map(|vec| vec.into_iter().collect::<HashSet<_>>()) |
|
|
|
statediffremoved.clone(), |
|
|
|
.unwrap_or_default(); |
|
|
|
2, // every state change is 2 event changes on average
|
|
|
|
|
|
|
|
states_parents, |
|
|
|
|
|
|
|
)?; |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
for event_id in new_state.difference(&old_state) { |
|
|
|
for event_id in statediffnew |
|
|
|
if let Some(pdu) = self.get_pdu_json(event_id)? { |
|
|
|
.into_iter() |
|
|
|
|
|
|
|
.filter_map(|new| self.parse_compressed_state_event(new).ok()) |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
if let Some(pdu) = self.get_pdu_json(&event_id)? { |
|
|
|
if pdu.get("type").and_then(|val| val.as_str()) == Some("m.room.member") { |
|
|
|
if pdu.get("type").and_then(|val| val.as_str()) == Some("m.room.member") { |
|
|
|
if let Ok(pdu) = serde_json::from_value::<PduEvent>( |
|
|
|
if let Ok(pdu) = serde_json::from_value::<PduEvent>( |
|
|
|
serde_json::to_value(&pdu).expect("CanonicalJsonObj is a valid JsonValue"), |
|
|
|
serde_json::to_value(&pdu).expect("CanonicalJsonObj is a valid JsonValue"), |
|
|
|
@ -392,7 +383,206 @@ impl Rooms { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
self.roomid_shortstatehash |
|
|
|
self.roomid_shortstatehash |
|
|
|
.insert(room_id.as_bytes(), &shortstatehash.to_be_bytes())?; |
|
|
|
.insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Ok(()) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer.
|
|
|
|
|
|
|
|
pub fn load_shortstatehash_info( |
|
|
|
|
|
|
|
&self, |
|
|
|
|
|
|
|
shortstatehash: u64, |
|
|
|
|
|
|
|
) -> Result< |
|
|
|
|
|
|
|
Vec<( |
|
|
|
|
|
|
|
u64, // sstatehash
|
|
|
|
|
|
|
|
HashSet<CompressedStateEvent>, // full state
|
|
|
|
|
|
|
|
HashSet<CompressedStateEvent>, // added
|
|
|
|
|
|
|
|
HashSet<CompressedStateEvent>, // removed
|
|
|
|
|
|
|
|
)>, |
|
|
|
|
|
|
|
> { |
|
|
|
|
|
|
|
let value = self |
|
|
|
|
|
|
|
.shortstatehash_statediff |
|
|
|
|
|
|
|
.get(&shortstatehash.to_be_bytes())? |
|
|
|
|
|
|
|
.ok_or_else(|| Error::bad_database("State hash does not exist"))?; |
|
|
|
|
|
|
|
let parent = |
|
|
|
|
|
|
|
utils::u64_from_bytes(&value[0..size_of::<u64>()]).expect("bytes have right length"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut add_mode = true; |
|
|
|
|
|
|
|
let mut added = HashSet::new(); |
|
|
|
|
|
|
|
let mut removed = HashSet::new(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut i = size_of::<u64>(); |
|
|
|
|
|
|
|
while let Some(v) = value.get(i..i + 2 * size_of::<u64>()) { |
|
|
|
|
|
|
|
if add_mode && v.starts_with(&0_u64.to_be_bytes()) { |
|
|
|
|
|
|
|
add_mode = false; |
|
|
|
|
|
|
|
i += size_of::<u64>(); |
|
|
|
|
|
|
|
continue; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if add_mode { |
|
|
|
|
|
|
|
added.insert(v.try_into().expect("we checked the size above")); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
removed.insert(v.try_into().expect("we checked the size above")); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
i += 2 * size_of::<u64>(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if parent != 0_u64 { |
|
|
|
|
|
|
|
let mut response = self.load_shortstatehash_info(parent)?; |
|
|
|
|
|
|
|
let mut state = response.last().unwrap().1.clone(); |
|
|
|
|
|
|
|
state.extend(added.iter().cloned()); |
|
|
|
|
|
|
|
for r in &removed { |
|
|
|
|
|
|
|
state.remove(r); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
response.push((shortstatehash, state, added, removed)); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Ok(response) |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
let mut response = Vec::new(); |
|
|
|
|
|
|
|
response.push((shortstatehash, added.clone(), added, removed)); |
|
|
|
|
|
|
|
Ok(response) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub fn compress_state_event( |
|
|
|
|
|
|
|
&self, |
|
|
|
|
|
|
|
shortstatekey: u64, |
|
|
|
|
|
|
|
event_id: &EventId, |
|
|
|
|
|
|
|
globals: &super::globals::Globals, |
|
|
|
|
|
|
|
) -> Result<CompressedStateEvent> { |
|
|
|
|
|
|
|
let mut v = shortstatekey.to_be_bytes().to_vec(); |
|
|
|
|
|
|
|
v.extend_from_slice( |
|
|
|
|
|
|
|
&self |
|
|
|
|
|
|
|
.get_or_create_shorteventid(event_id, globals)? |
|
|
|
|
|
|
|
.to_be_bytes(), |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
Ok(v.try_into().expect("we checked the size above")) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub fn parse_compressed_state_event( |
|
|
|
|
|
|
|
&self, |
|
|
|
|
|
|
|
compressed_event: CompressedStateEvent, |
|
|
|
|
|
|
|
) -> Result<EventId> { |
|
|
|
|
|
|
|
self.get_eventid_from_short( |
|
|
|
|
|
|
|
utils::u64_from_bytes(&compressed_event[size_of::<u64>()..]) |
|
|
|
|
|
|
|
.expect("bytes have right length"), |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Creates a new shortstatehash that often is just a diff to an already existing
|
|
|
|
|
|
|
|
/// shortstatehash and therefore very efficient.
|
|
|
|
|
|
|
|
///
|
|
|
|
|
|
|
|
/// There are multiple layers of diffs. The bottom layer 0 always contains the full state. Layer
|
|
|
|
|
|
|
|
/// 1 contains diffs to states of layer 0, layer 2 diffs to layer 1 and so on. If layer n > 0
|
|
|
|
|
|
|
|
/// grows too big, it will be combined with layer n-1 to create a new diff on layer n-1 that's
|
|
|
|
|
|
|
|
/// based on layer n-2. If that layer is also too big, it will recursively fix above layers too.
|
|
|
|
|
|
|
|
///
|
|
|
|
|
|
|
|
/// * `shortstatehash` - Shortstatehash of this state
|
|
|
|
|
|
|
|
/// * `statediffnew` - Added to base. Each vec is shortstatekey+shorteventid
|
|
|
|
|
|
|
|
/// * `statediffremoved` - Removed from base. Each vec is shortstatekey+shorteventid
|
|
|
|
|
|
|
|
/// * `diff_to_sibling` - Approximately how much the diff grows each time for this layer
|
|
|
|
|
|
|
|
/// * `parent_states` - A stack with info on shortstatehash, full state, added diff and removed diff for each parent layer
|
|
|
|
|
|
|
|
pub fn save_state_from_diff( |
|
|
|
|
|
|
|
&self, |
|
|
|
|
|
|
|
shortstatehash: u64, |
|
|
|
|
|
|
|
statediffnew: HashSet<CompressedStateEvent>, |
|
|
|
|
|
|
|
statediffremoved: HashSet<CompressedStateEvent>, |
|
|
|
|
|
|
|
diff_to_sibling: usize, |
|
|
|
|
|
|
|
mut parent_states: Vec<( |
|
|
|
|
|
|
|
u64, // sstatehash
|
|
|
|
|
|
|
|
HashSet<CompressedStateEvent>, // full state
|
|
|
|
|
|
|
|
HashSet<CompressedStateEvent>, // added
|
|
|
|
|
|
|
|
HashSet<CompressedStateEvent>, // removed
|
|
|
|
|
|
|
|
)>, |
|
|
|
|
|
|
|
) -> Result<()> { |
|
|
|
|
|
|
|
let diffsum = statediffnew.len() + statediffremoved.len(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if parent_states.len() > 3 { |
|
|
|
|
|
|
|
// Number of layers
|
|
|
|
|
|
|
|
// To many layers, we have to go deeper
|
|
|
|
|
|
|
|
let parent = parent_states.pop().unwrap(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut parent_new = parent.2; |
|
|
|
|
|
|
|
let mut parent_removed = parent.3; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for removed in statediffremoved { |
|
|
|
|
|
|
|
if !parent_new.remove(&removed) { |
|
|
|
|
|
|
|
parent_removed.insert(removed); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
parent_new.extend(statediffnew); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.save_state_from_diff( |
|
|
|
|
|
|
|
shortstatehash, |
|
|
|
|
|
|
|
parent_new, |
|
|
|
|
|
|
|
parent_removed, |
|
|
|
|
|
|
|
diffsum, |
|
|
|
|
|
|
|
parent_states, |
|
|
|
|
|
|
|
)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Ok(()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if parent_states.len() == 0 { |
|
|
|
|
|
|
|
// There is no parent layer, create a new state
|
|
|
|
|
|
|
|
let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent
|
|
|
|
|
|
|
|
for new in &statediffnew { |
|
|
|
|
|
|
|
value.extend_from_slice(&new[..]); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if !statediffremoved.is_empty() { |
|
|
|
|
|
|
|
warn!("Tried to create new state with removals"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.shortstatehash_statediff |
|
|
|
|
|
|
|
.insert(&shortstatehash.to_be_bytes(), &value)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Ok(()); |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Else we have two options.
|
|
|
|
|
|
|
|
// 1. We add the current diff on top of the parent layer.
|
|
|
|
|
|
|
|
// 2. We replace a layer above
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let parent = parent_states.pop().unwrap(); |
|
|
|
|
|
|
|
let parent_diff = parent.2.len() + parent.3.len(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff { |
|
|
|
|
|
|
|
// Diff too big, we replace above layer(s)
|
|
|
|
|
|
|
|
let mut parent_new = parent.2; |
|
|
|
|
|
|
|
let mut parent_removed = parent.3; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for removed in statediffremoved { |
|
|
|
|
|
|
|
if !parent_new.remove(&removed) { |
|
|
|
|
|
|
|
parent_removed.insert(removed); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
parent_new.extend(statediffnew); |
|
|
|
|
|
|
|
self.save_state_from_diff( |
|
|
|
|
|
|
|
shortstatehash, |
|
|
|
|
|
|
|
parent_new, |
|
|
|
|
|
|
|
parent_removed, |
|
|
|
|
|
|
|
diffsum, |
|
|
|
|
|
|
|
parent_states, |
|
|
|
|
|
|
|
)?; |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
// Diff small enough, we add diff as layer on top of parent
|
|
|
|
|
|
|
|
let mut value = parent.0.to_be_bytes().to_vec(); |
|
|
|
|
|
|
|
for new in &statediffnew { |
|
|
|
|
|
|
|
value.extend_from_slice(&new[..]); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if !statediffremoved.is_empty() { |
|
|
|
|
|
|
|
value.extend_from_slice(&0_u64.to_be_bytes()); |
|
|
|
|
|
|
|
for removed in &statediffremoved { |
|
|
|
|
|
|
|
value.extend_from_slice(&removed[..]); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.shortstatehash_statediff |
|
|
|
|
|
|
|
.insert(&shortstatehash.to_be_bytes(), &value)?; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Ok(()) |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
} |
|
|
|
@ -418,7 +608,6 @@ impl Rooms { |
|
|
|
}) |
|
|
|
}) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Returns (shortstatehash, already_existed)
|
|
|
|
|
|
|
|
pub fn get_or_create_shorteventid( |
|
|
|
pub fn get_or_create_shorteventid( |
|
|
|
&self, |
|
|
|
&self, |
|
|
|
event_id: &EventId, |
|
|
|
event_id: &EventId, |
|
|
|
@ -438,6 +627,71 @@ impl Rooms { |
|
|
|
}) |
|
|
|
}) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub fn get_shortroomid(&self, room_id: &RoomId) -> Result<u64> { |
|
|
|
|
|
|
|
let bytes = self |
|
|
|
|
|
|
|
.roomid_shortroomid |
|
|
|
|
|
|
|
.get(&room_id.as_bytes())? |
|
|
|
|
|
|
|
.expect("every room has a shortroomid"); |
|
|
|
|
|
|
|
utils::u64_from_bytes(&bytes).map_err(|_| Error::bad_database("Invalid shortroomid in db.")) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub fn get_shortstatekey( |
|
|
|
|
|
|
|
&self, |
|
|
|
|
|
|
|
event_type: &EventType, |
|
|
|
|
|
|
|
state_key: &str, |
|
|
|
|
|
|
|
) -> Result<Option<u64>> { |
|
|
|
|
|
|
|
let mut statekey = event_type.as_ref().as_bytes().to_vec(); |
|
|
|
|
|
|
|
statekey.push(0xff); |
|
|
|
|
|
|
|
statekey.extend_from_slice(&state_key.as_bytes()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.statekey_shortstatekey |
|
|
|
|
|
|
|
.get(&statekey)? |
|
|
|
|
|
|
|
.map(|shortstatekey| { |
|
|
|
|
|
|
|
utils::u64_from_bytes(&shortstatekey) |
|
|
|
|
|
|
|
.map_err(|_| Error::bad_database("Invalid shortstatekey in db.")) |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
.transpose() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub fn get_or_create_shortroomid( |
|
|
|
|
|
|
|
&self, |
|
|
|
|
|
|
|
room_id: &RoomId, |
|
|
|
|
|
|
|
globals: &super::globals::Globals, |
|
|
|
|
|
|
|
) -> Result<u64> { |
|
|
|
|
|
|
|
Ok(match self.roomid_shortroomid.get(&room_id.as_bytes())? { |
|
|
|
|
|
|
|
Some(short) => utils::u64_from_bytes(&short) |
|
|
|
|
|
|
|
.map_err(|_| Error::bad_database("Invalid shortroomid in db."))?, |
|
|
|
|
|
|
|
None => { |
|
|
|
|
|
|
|
let short = globals.next_count()?; |
|
|
|
|
|
|
|
self.roomid_shortroomid |
|
|
|
|
|
|
|
.insert(&room_id.as_bytes(), &short.to_be_bytes())?; |
|
|
|
|
|
|
|
short |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub fn get_or_create_shortstatekey( |
|
|
|
|
|
|
|
&self, |
|
|
|
|
|
|
|
event_type: &EventType, |
|
|
|
|
|
|
|
state_key: &str, |
|
|
|
|
|
|
|
globals: &super::globals::Globals, |
|
|
|
|
|
|
|
) -> Result<u64> { |
|
|
|
|
|
|
|
let mut statekey = event_type.as_ref().as_bytes().to_vec(); |
|
|
|
|
|
|
|
statekey.push(0xff); |
|
|
|
|
|
|
|
statekey.extend_from_slice(&state_key.as_bytes()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Ok(match self.statekey_shortstatekey.get(&statekey)? { |
|
|
|
|
|
|
|
Some(shortstatekey) => utils::u64_from_bytes(&shortstatekey) |
|
|
|
|
|
|
|
.map_err(|_| Error::bad_database("Invalid shortstatekey in db."))?, |
|
|
|
|
|
|
|
None => { |
|
|
|
|
|
|
|
let shortstatekey = globals.next_count()?; |
|
|
|
|
|
|
|
self.statekey_shortstatekey |
|
|
|
|
|
|
|
.insert(&statekey, &shortstatekey.to_be_bytes())?; |
|
|
|
|
|
|
|
shortstatekey |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result<EventId> { |
|
|
|
pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result<EventId> { |
|
|
|
if let Some(id) = self |
|
|
|
if let Some(id) = self |
|
|
|
.shorteventid_cache |
|
|
|
.shorteventid_cache |
|
|
|
@ -514,7 +768,7 @@ impl Rooms { |
|
|
|
#[tracing::instrument(skip(self))] |
|
|
|
#[tracing::instrument(skip(self))] |
|
|
|
pub fn pdu_count(&self, pdu_id: &[u8]) -> Result<u64> { |
|
|
|
pub fn pdu_count(&self, pdu_id: &[u8]) -> Result<u64> { |
|
|
|
Ok( |
|
|
|
Ok( |
|
|
|
utils::u64_from_bytes(&pdu_id[pdu_id.len() - mem::size_of::<u64>()..pdu_id.len()]) |
|
|
|
utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::<u64>()..]) |
|
|
|
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))?, |
|
|
|
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))?, |
|
|
|
) |
|
|
|
) |
|
|
|
} |
|
|
|
} |
|
|
|
@ -527,8 +781,7 @@ impl Rooms { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub fn latest_pdu_count(&self, room_id: &RoomId) -> Result<u64> { |
|
|
|
pub fn latest_pdu_count(&self, room_id: &RoomId) -> Result<u64> { |
|
|
|
let mut prefix = room_id.as_bytes().to_vec(); |
|
|
|
let prefix = self.get_shortroomid(room_id)?.to_be_bytes().to_vec(); |
|
|
|
prefix.push(0xff); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut last_possible_key = prefix.clone(); |
|
|
|
let mut last_possible_key = prefix.clone(); |
|
|
|
last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); |
|
|
|
last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); |
|
|
|
@ -758,6 +1011,8 @@ impl Rooms { |
|
|
|
///
|
|
|
|
///
|
|
|
|
/// By this point the incoming event should be fully authenticated, no auth happens
|
|
|
|
/// By this point the incoming event should be fully authenticated, no auth happens
|
|
|
|
/// in `append_pdu`.
|
|
|
|
/// in `append_pdu`.
|
|
|
|
|
|
|
|
///
|
|
|
|
|
|
|
|
/// Returns pdu id
|
|
|
|
#[tracing::instrument(skip(self, pdu, pdu_json, leaves, db))] |
|
|
|
#[tracing::instrument(skip(self, pdu, pdu_json, leaves, db))] |
|
|
|
pub fn append_pdu( |
|
|
|
pub fn append_pdu( |
|
|
|
&self, |
|
|
|
&self, |
|
|
|
@ -766,7 +1021,8 @@ impl Rooms { |
|
|
|
leaves: &[EventId], |
|
|
|
leaves: &[EventId], |
|
|
|
db: &Database, |
|
|
|
db: &Database, |
|
|
|
) -> Result<Vec<u8>> { |
|
|
|
) -> Result<Vec<u8>> { |
|
|
|
// returns pdu id
|
|
|
|
let shortroomid = self.get_shortroomid(&pdu.room_id)?; |
|
|
|
|
|
|
|
|
|
|
|
// Make unsigned fields correct. This is not properly documented in the spec, but state
|
|
|
|
// Make unsigned fields correct. This is not properly documented in the spec, but state
|
|
|
|
// events need to have previous content in the unsigned field, so clients can easily
|
|
|
|
// events need to have previous content in the unsigned field, so clients can easily
|
|
|
|
// interpret things like membership changes
|
|
|
|
// interpret things like membership changes
|
|
|
|
@ -821,8 +1077,7 @@ impl Rooms { |
|
|
|
self.reset_notification_counts(&pdu.sender, &pdu.room_id)?; |
|
|
|
self.reset_notification_counts(&pdu.sender, &pdu.room_id)?; |
|
|
|
|
|
|
|
|
|
|
|
let count2 = db.globals.next_count()?; |
|
|
|
let count2 = db.globals.next_count()?; |
|
|
|
let mut pdu_id = pdu.room_id.as_bytes().to_vec(); |
|
|
|
let mut pdu_id = shortroomid.to_be_bytes().to_vec(); |
|
|
|
pdu_id.push(0xff); |
|
|
|
|
|
|
|
pdu_id.extend_from_slice(&count2.to_be_bytes()); |
|
|
|
pdu_id.extend_from_slice(&count2.to_be_bytes()); |
|
|
|
|
|
|
|
|
|
|
|
// There's a brief moment of time here where the count is updated but the pdu does not
|
|
|
|
// There's a brief moment of time here where the count is updated but the pdu does not
|
|
|
|
@ -968,8 +1223,7 @@ impl Rooms { |
|
|
|
.filter(|word| word.len() <= 50) |
|
|
|
.filter(|word| word.len() <= 50) |
|
|
|
.map(str::to_lowercase) |
|
|
|
.map(str::to_lowercase) |
|
|
|
.map(|word| { |
|
|
|
.map(|word| { |
|
|
|
let mut key = pdu.room_id.as_bytes().to_vec(); |
|
|
|
let mut key = shortroomid.to_be_bytes().to_vec(); |
|
|
|
key.push(0xff); |
|
|
|
|
|
|
|
key.extend_from_slice(word.as_bytes()); |
|
|
|
key.extend_from_slice(word.as_bytes()); |
|
|
|
key.push(0xff); |
|
|
|
key.push(0xff); |
|
|
|
key.extend_from_slice(&pdu_id); |
|
|
|
key.extend_from_slice(&pdu_id); |
|
|
|
@ -1152,11 +1406,27 @@ impl Rooms { |
|
|
|
pub fn set_event_state( |
|
|
|
pub fn set_event_state( |
|
|
|
&self, |
|
|
|
&self, |
|
|
|
event_id: &EventId, |
|
|
|
event_id: &EventId, |
|
|
|
|
|
|
|
room_id: &RoomId, |
|
|
|
state: &StateMap<Arc<PduEvent>>, |
|
|
|
state: &StateMap<Arc<PduEvent>>, |
|
|
|
globals: &super::globals::Globals, |
|
|
|
globals: &super::globals::Globals, |
|
|
|
) -> Result<()> { |
|
|
|
) -> Result<()> { |
|
|
|
let shorteventid = self.get_or_create_shorteventid(&event_id, globals)?; |
|
|
|
let shorteventid = self.get_or_create_shorteventid(&event_id, globals)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let previous_shortstatehash = self.current_shortstatehash(&room_id)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let state_ids_compressed = state |
|
|
|
|
|
|
|
.iter() |
|
|
|
|
|
|
|
.filter_map(|((event_type, state_key), pdu)| { |
|
|
|
|
|
|
|
let shortstatekey = self |
|
|
|
|
|
|
|
.get_or_create_shortstatekey(event_type, state_key, globals) |
|
|
|
|
|
|
|
.ok()?; |
|
|
|
|
|
|
|
Some( |
|
|
|
|
|
|
|
self.compress_state_event(shortstatekey, &pdu.event_id, globals) |
|
|
|
|
|
|
|
.ok()?, |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
.collect::<HashSet<_>>(); |
|
|
|
|
|
|
|
|
|
|
|
let state_hash = self.calculate_hash( |
|
|
|
let state_hash = self.calculate_hash( |
|
|
|
&state |
|
|
|
&state |
|
|
|
.values() |
|
|
|
.values() |
|
|
|
@ -1168,37 +1438,33 @@ impl Rooms { |
|
|
|
self.get_or_create_shortstatehash(&state_hash, globals)?; |
|
|
|
self.get_or_create_shortstatehash(&state_hash, globals)?; |
|
|
|
|
|
|
|
|
|
|
|
if !already_existed { |
|
|
|
if !already_existed { |
|
|
|
let batch = state |
|
|
|
let states_parents = previous_shortstatehash |
|
|
|
.iter() |
|
|
|
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?; |
|
|
|
.filter_map(|((event_type, state_key), pdu)| { |
|
|
|
|
|
|
|
let mut statekey = event_type.as_ref().as_bytes().to_vec(); |
|
|
|
let (statediffnew, statediffremoved) = |
|
|
|
statekey.push(0xff); |
|
|
|
if let Some(parent_stateinfo) = states_parents.last() { |
|
|
|
statekey.extend_from_slice(&state_key.as_bytes()); |
|
|
|
let statediffnew = state_ids_compressed |
|
|
|
|
|
|
|
.difference(&parent_stateinfo.1) |
|
|
|
let shortstatekey = match self.statekey_shortstatekey.get(&statekey).ok()? { |
|
|
|
.cloned() |
|
|
|
Some(shortstatekey) => shortstatekey.to_vec(), |
|
|
|
.collect::<HashSet<_>>(); |
|
|
|
None => { |
|
|
|
|
|
|
|
let shortstatekey = globals.next_count().ok()?; |
|
|
|
let statediffremoved = parent_stateinfo |
|
|
|
self.statekey_shortstatekey |
|
|
|
.1 |
|
|
|
.insert(&statekey, &shortstatekey.to_be_bytes()) |
|
|
|
.difference(&state_ids_compressed) |
|
|
|
.ok()?; |
|
|
|
.cloned() |
|
|
|
shortstatekey.to_be_bytes().to_vec() |
|
|
|
.collect::<HashSet<_>>(); |
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
(statediffnew, statediffremoved) |
|
|
|
|
|
|
|
} else { |
|
|
|
let shorteventid = self |
|
|
|
(state_ids_compressed, HashSet::new()) |
|
|
|
.get_or_create_shorteventid(&pdu.event_id, globals) |
|
|
|
}; |
|
|
|
.ok()?; |
|
|
|
self.save_state_from_diff( |
|
|
|
|
|
|
|
shortstatehash, |
|
|
|
let mut state_id = shortstatehash.to_be_bytes().to_vec(); |
|
|
|
statediffnew.clone(), |
|
|
|
state_id.extend_from_slice(&shortstatekey); |
|
|
|
statediffremoved.clone(), |
|
|
|
|
|
|
|
1_000_000, // high number because no state will be based on this one
|
|
|
|
Some((state_id, shorteventid.to_be_bytes().to_vec())) |
|
|
|
states_parents, |
|
|
|
}) |
|
|
|
)?; |
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.stateid_shorteventid |
|
|
|
|
|
|
|
.insert_batch(&mut batch.into_iter())?; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
self.shorteventid_shortstatehash |
|
|
|
self.shorteventid_shortstatehash |
|
|
|
@ -1219,82 +1485,52 @@ impl Rooms { |
|
|
|
) -> Result<u64> { |
|
|
|
) -> Result<u64> { |
|
|
|
let shorteventid = self.get_or_create_shorteventid(&new_pdu.event_id, globals)?; |
|
|
|
let shorteventid = self.get_or_create_shorteventid(&new_pdu.event_id, globals)?; |
|
|
|
|
|
|
|
|
|
|
|
let old_state = if let Some(old_shortstatehash) = |
|
|
|
let previous_shortstatehash = self.current_shortstatehash(&new_pdu.room_id)?; |
|
|
|
self.roomid_shortstatehash.get(new_pdu.room_id.as_bytes())? |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
// Store state for event. The state does not include the event itself.
|
|
|
|
|
|
|
|
// Instead it's the state before the pdu, so the room's old state.
|
|
|
|
|
|
|
|
self.shorteventid_shortstatehash |
|
|
|
|
|
|
|
.insert(&shorteventid.to_be_bytes(), &old_shortstatehash)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if new_pdu.state_key.is_none() { |
|
|
|
|
|
|
|
return utils::u64_from_bytes(&old_shortstatehash).map_err(|_| { |
|
|
|
|
|
|
|
Error::bad_database("Invalid shortstatehash in roomid_shortstatehash.") |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.stateid_shorteventid |
|
|
|
if let Some(p) = previous_shortstatehash { |
|
|
|
.scan_prefix(old_shortstatehash.clone()) |
|
|
|
self.shorteventid_shortstatehash |
|
|
|
// Chop the old_shortstatehash out leaving behind the short state key
|
|
|
|
.insert(&shorteventid.to_be_bytes(), &p.to_be_bytes())?; |
|
|
|
.map(|(k, v)| (k[old_shortstatehash.len()..].to_vec(), v)) |
|
|
|
} |
|
|
|
.collect::<HashMap<Vec<u8>, Vec<u8>>>() |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
HashMap::new() |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if let Some(state_key) = &new_pdu.state_key { |
|
|
|
if let Some(state_key) = &new_pdu.state_key { |
|
|
|
let mut new_state: HashMap<Vec<u8>, Vec<u8>> = old_state; |
|
|
|
let states_parents = previous_shortstatehash |
|
|
|
|
|
|
|
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?; |
|
|
|
let mut new_state_key = new_pdu.kind.as_ref().as_bytes().to_vec(); |
|
|
|
|
|
|
|
new_state_key.push(0xff); |
|
|
|
|
|
|
|
new_state_key.extend_from_slice(state_key.as_bytes()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let shortstatekey = match self.statekey_shortstatekey.get(&new_state_key)? { |
|
|
|
|
|
|
|
Some(shortstatekey) => shortstatekey.to_vec(), |
|
|
|
|
|
|
|
None => { |
|
|
|
|
|
|
|
let shortstatekey = globals.next_count()?; |
|
|
|
|
|
|
|
self.statekey_shortstatekey |
|
|
|
|
|
|
|
.insert(&new_state_key, &shortstatekey.to_be_bytes())?; |
|
|
|
|
|
|
|
shortstatekey.to_be_bytes().to_vec() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
new_state.insert(shortstatekey, shorteventid.to_be_bytes().to_vec()); |
|
|
|
let shortstatekey = |
|
|
|
|
|
|
|
self.get_or_create_shortstatekey(&new_pdu.kind, &state_key, globals)?; |
|
|
|
|
|
|
|
|
|
|
|
let new_state_hash = self.calculate_hash( |
|
|
|
let replaces = states_parents |
|
|
|
&new_state |
|
|
|
.last() |
|
|
|
.values() |
|
|
|
.map(|info| { |
|
|
|
.map(|event_id| &**event_id) |
|
|
|
info.1 |
|
|
|
.collect::<Vec<_>>(), |
|
|
|
.iter() |
|
|
|
); |
|
|
|
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())) |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
.unwrap_or_default(); |
|
|
|
|
|
|
|
|
|
|
|
let shortstatehash = match self.statehash_shortstatehash.get(&new_state_hash)? { |
|
|
|
// TODO: statehash with deterministic inputs
|
|
|
|
Some(shortstatehash) => { |
|
|
|
let shortstatehash = globals.next_count()?; |
|
|
|
warn!("state hash already existed?!"); |
|
|
|
|
|
|
|
utils::u64_from_bytes(&shortstatehash) |
|
|
|
|
|
|
|
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))? |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
None => { |
|
|
|
|
|
|
|
let shortstatehash = globals.next_count()?; |
|
|
|
|
|
|
|
self.statehash_shortstatehash |
|
|
|
|
|
|
|
.insert(&new_state_hash, &shortstatehash.to_be_bytes())?; |
|
|
|
|
|
|
|
shortstatehash |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut batch = new_state.into_iter().map(|(shortstatekey, shorteventid)| { |
|
|
|
let mut statediffnew = HashSet::new(); |
|
|
|
let mut state_id = shortstatehash.to_be_bytes().to_vec(); |
|
|
|
let new = self.compress_state_event(shortstatekey, &new_pdu.event_id, globals)?; |
|
|
|
state_id.extend_from_slice(&shortstatekey); |
|
|
|
statediffnew.insert(new); |
|
|
|
(state_id, shorteventid) |
|
|
|
|
|
|
|
}); |
|
|
|
let mut statediffremoved = HashSet::new(); |
|
|
|
|
|
|
|
if let Some(replaces) = replaces { |
|
|
|
|
|
|
|
statediffremoved.insert(replaces.clone()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
self.stateid_shorteventid.insert_batch(&mut batch)?; |
|
|
|
self.save_state_from_diff( |
|
|
|
|
|
|
|
shortstatehash, |
|
|
|
|
|
|
|
statediffnew, |
|
|
|
|
|
|
|
statediffremoved, |
|
|
|
|
|
|
|
2, |
|
|
|
|
|
|
|
states_parents, |
|
|
|
|
|
|
|
)?; |
|
|
|
|
|
|
|
|
|
|
|
Ok(shortstatehash) |
|
|
|
Ok(shortstatehash) |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
Err(Error::bad_database( |
|
|
|
Ok(previous_shortstatehash.expect("first event in room must be a state event")) |
|
|
|
"Tried to insert non-state event into room without a state.", |
|
|
|
|
|
|
|
)) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -1597,7 +1833,7 @@ impl Rooms { |
|
|
|
&'a self, |
|
|
|
&'a self, |
|
|
|
user_id: &UserId, |
|
|
|
user_id: &UserId, |
|
|
|
room_id: &RoomId, |
|
|
|
room_id: &RoomId, |
|
|
|
) -> impl Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a { |
|
|
|
) -> Result<impl Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a> { |
|
|
|
self.pdus_since(user_id, room_id, 0) |
|
|
|
self.pdus_since(user_id, room_id, 0) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -1609,16 +1845,17 @@ impl Rooms { |
|
|
|
user_id: &UserId, |
|
|
|
user_id: &UserId, |
|
|
|
room_id: &RoomId, |
|
|
|
room_id: &RoomId, |
|
|
|
since: u64, |
|
|
|
since: u64, |
|
|
|
) -> impl Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a { |
|
|
|
) -> Result<impl Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a> { |
|
|
|
let mut prefix = room_id.as_bytes().to_vec(); |
|
|
|
let prefix = self.get_shortroomid(room_id)?.to_be_bytes().to_vec(); |
|
|
|
prefix.push(0xff); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Skip the first pdu if it's exactly at since, because we sent that last time
|
|
|
|
// Skip the first pdu if it's exactly at since, because we sent that last time
|
|
|
|
let mut first_pdu_id = prefix.clone(); |
|
|
|
let mut first_pdu_id = prefix.clone(); |
|
|
|
first_pdu_id.extend_from_slice(&(since + 1).to_be_bytes()); |
|
|
|
first_pdu_id.extend_from_slice(&(since + 1).to_be_bytes()); |
|
|
|
|
|
|
|
|
|
|
|
let user_id = user_id.clone(); |
|
|
|
let user_id = user_id.clone(); |
|
|
|
self.pduid_pdu |
|
|
|
|
|
|
|
|
|
|
|
Ok(self |
|
|
|
|
|
|
|
.pduid_pdu |
|
|
|
.iter_from(&first_pdu_id, false) |
|
|
|
.iter_from(&first_pdu_id, false) |
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix)) |
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix)) |
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
@ -1628,7 +1865,7 @@ impl Rooms { |
|
|
|
pdu.unsigned.remove("transaction_id"); |
|
|
|
pdu.unsigned.remove("transaction_id"); |
|
|
|
} |
|
|
|
} |
|
|
|
Ok((pdu_id, pdu)) |
|
|
|
Ok((pdu_id, pdu)) |
|
|
|
}) |
|
|
|
})) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Returns an iterator over all events and their tokens in a room that happened before the
|
|
|
|
/// Returns an iterator over all events and their tokens in a room that happened before the
|
|
|
|
@ -1639,10 +1876,9 @@ impl Rooms { |
|
|
|
user_id: &UserId, |
|
|
|
user_id: &UserId, |
|
|
|
room_id: &RoomId, |
|
|
|
room_id: &RoomId, |
|
|
|
until: u64, |
|
|
|
until: u64, |
|
|
|
) -> impl Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a { |
|
|
|
) -> Result<impl Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a> { |
|
|
|
// Create the first part of the full pdu id
|
|
|
|
// Create the first part of the full pdu id
|
|
|
|
let mut prefix = room_id.as_bytes().to_vec(); |
|
|
|
let prefix = self.get_shortroomid(room_id)?.to_be_bytes().to_vec(); |
|
|
|
prefix.push(0xff); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut current = prefix.clone(); |
|
|
|
let mut current = prefix.clone(); |
|
|
|
current.extend_from_slice(&(until.saturating_sub(1)).to_be_bytes()); // -1 because we don't want event at `until`
|
|
|
|
current.extend_from_slice(&(until.saturating_sub(1)).to_be_bytes()); // -1 because we don't want event at `until`
|
|
|
|
@ -1650,7 +1886,9 @@ impl Rooms { |
|
|
|
let current: &[u8] = ¤t; |
|
|
|
let current: &[u8] = ¤t; |
|
|
|
|
|
|
|
|
|
|
|
let user_id = user_id.clone(); |
|
|
|
let user_id = user_id.clone(); |
|
|
|
self.pduid_pdu |
|
|
|
|
|
|
|
|
|
|
|
Ok(self |
|
|
|
|
|
|
|
.pduid_pdu |
|
|
|
.iter_from(current, true) |
|
|
|
.iter_from(current, true) |
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix)) |
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix)) |
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
@ -1660,7 +1898,7 @@ impl Rooms { |
|
|
|
pdu.unsigned.remove("transaction_id"); |
|
|
|
pdu.unsigned.remove("transaction_id"); |
|
|
|
} |
|
|
|
} |
|
|
|
Ok((pdu_id, pdu)) |
|
|
|
Ok((pdu_id, pdu)) |
|
|
|
}) |
|
|
|
})) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Returns an iterator over all events and their token in a room that happened after the event
|
|
|
|
/// Returns an iterator over all events and their token in a room that happened after the event
|
|
|
|
@ -1671,10 +1909,9 @@ impl Rooms { |
|
|
|
user_id: &UserId, |
|
|
|
user_id: &UserId, |
|
|
|
room_id: &RoomId, |
|
|
|
room_id: &RoomId, |
|
|
|
from: u64, |
|
|
|
from: u64, |
|
|
|
) -> impl Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a { |
|
|
|
) -> Result<impl Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a> { |
|
|
|
// Create the first part of the full pdu id
|
|
|
|
// Create the first part of the full pdu id
|
|
|
|
let mut prefix = room_id.as_bytes().to_vec(); |
|
|
|
let prefix = self.get_shortroomid(room_id)?.to_be_bytes().to_vec(); |
|
|
|
prefix.push(0xff); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut current = prefix.clone(); |
|
|
|
let mut current = prefix.clone(); |
|
|
|
current.extend_from_slice(&(from + 1).to_be_bytes()); // +1 so we don't send the base event
|
|
|
|
current.extend_from_slice(&(from + 1).to_be_bytes()); // +1 so we don't send the base event
|
|
|
|
@ -1682,7 +1919,9 @@ impl Rooms { |
|
|
|
let current: &[u8] = ¤t; |
|
|
|
let current: &[u8] = ¤t; |
|
|
|
|
|
|
|
|
|
|
|
let user_id = user_id.clone(); |
|
|
|
let user_id = user_id.clone(); |
|
|
|
self.pduid_pdu |
|
|
|
|
|
|
|
|
|
|
|
Ok(self |
|
|
|
|
|
|
|
.pduid_pdu |
|
|
|
.iter_from(current, false) |
|
|
|
.iter_from(current, false) |
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix)) |
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix)) |
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
.map(move |(pdu_id, v)| { |
|
|
|
@ -1692,7 +1931,7 @@ impl Rooms { |
|
|
|
pdu.unsigned.remove("transaction_id"); |
|
|
|
pdu.unsigned.remove("transaction_id"); |
|
|
|
} |
|
|
|
} |
|
|
|
Ok((pdu_id, pdu)) |
|
|
|
Ok((pdu_id, pdu)) |
|
|
|
}) |
|
|
|
})) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Replace a PDU with the redacted form.
|
|
|
|
/// Replace a PDU with the redacted form.
|
|
|
|
@ -2223,8 +2462,8 @@ impl Rooms { |
|
|
|
room_id: &RoomId, |
|
|
|
room_id: &RoomId, |
|
|
|
search_string: &str, |
|
|
|
search_string: &str, |
|
|
|
) -> Result<(impl Iterator<Item = Vec<u8>> + 'a, Vec<String>)> { |
|
|
|
) -> Result<(impl Iterator<Item = Vec<u8>> + 'a, Vec<String>)> { |
|
|
|
let mut prefix = room_id.as_bytes().to_vec(); |
|
|
|
let prefix = self.get_shortroomid(room_id)?.to_be_bytes().to_vec(); |
|
|
|
prefix.push(0xff); |
|
|
|
let prefix_clone = prefix.clone(); |
|
|
|
|
|
|
|
|
|
|
|
let words = search_string |
|
|
|
let words = search_string |
|
|
|
.split_terminator(|c: char| !c.is_alphanumeric()) |
|
|
|
.split_terminator(|c: char| !c.is_alphanumeric()) |
|
|
|
@ -2243,16 +2482,7 @@ impl Rooms { |
|
|
|
.iter_from(&last_possible_id, true) // Newest pdus first
|
|
|
|
.iter_from(&last_possible_id, true) // Newest pdus first
|
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix2)) |
|
|
|
.take_while(move |(k, _)| k.starts_with(&prefix2)) |
|
|
|
.map(|(key, _)| { |
|
|
|
.map(|(key, _)| { |
|
|
|
let pduid_index = key |
|
|
|
let pdu_id = key[key.len() - size_of::<u64>()..].to_vec(); |
|
|
|
.iter() |
|
|
|
|
|
|
|
.enumerate() |
|
|
|
|
|
|
|
.filter(|(_, &b)| b == 0xff) |
|
|
|
|
|
|
|
.nth(1) |
|
|
|
|
|
|
|
.ok_or_else(|| Error::bad_database("Invalid tokenid in db."))? |
|
|
|
|
|
|
|
.0 |
|
|
|
|
|
|
|
+ 1; // +1 because the pdu id starts AFTER the separator
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let pdu_id = key[pduid_index..].to_vec(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Ok::<_, Error>(pdu_id) |
|
|
|
Ok::<_, Error>(pdu_id) |
|
|
|
}) |
|
|
|
}) |
|
|
|
@ -2264,7 +2494,12 @@ impl Rooms { |
|
|
|
// We compare b with a because we reversed the iterator earlier
|
|
|
|
// We compare b with a because we reversed the iterator earlier
|
|
|
|
b.cmp(a) |
|
|
|
b.cmp(a) |
|
|
|
}) |
|
|
|
}) |
|
|
|
.unwrap(), |
|
|
|
.unwrap() |
|
|
|
|
|
|
|
.map(move |id| { |
|
|
|
|
|
|
|
let mut pduid = prefix_clone.clone(); |
|
|
|
|
|
|
|
pduid.extend_from_slice(&id); |
|
|
|
|
|
|
|
pduid |
|
|
|
|
|
|
|
}), |
|
|
|
words, |
|
|
|
words, |
|
|
|
)) |
|
|
|
)) |
|
|
|
} |
|
|
|
} |
|
|
|
|