|
|
|
|
@ -88,7 +88,8 @@ pub struct Rooms {
@@ -88,7 +88,8 @@ pub struct Rooms {
|
|
|
|
|
pub(super) referencedevents: Arc<dyn Tree>, |
|
|
|
|
|
|
|
|
|
pub(super) pdu_cache: Mutex<LruCache<EventId, Arc<PduEvent>>>, |
|
|
|
|
pub(super) auth_chain_cache: Mutex<LruCache<Vec<EventId>, HashSet<EventId>>>, |
|
|
|
|
pub(super) auth_chain_cache: Mutex<LruCache<u64, HashSet<u64>>>, |
|
|
|
|
pub(super) shorteventid_cache: Mutex<LruCache<u64, EventId>>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl Rooms { |
|
|
|
|
@ -98,15 +99,11 @@ impl Rooms {
@@ -98,15 +99,11 @@ impl Rooms {
|
|
|
|
|
Ok(self |
|
|
|
|
.stateid_shorteventid |
|
|
|
|
.scan_prefix(shortstatehash.to_be_bytes().to_vec()) |
|
|
|
|
.map(|(_, bytes)| self.shorteventid_eventid.get(&bytes).ok().flatten()) |
|
|
|
|
.flatten() |
|
|
|
|
.map(|bytes| { |
|
|
|
|
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| { |
|
|
|
|
Error::bad_database("EventID in stateid_shorteventid is invalid unicode.") |
|
|
|
|
})?) |
|
|
|
|
.map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid.")) |
|
|
|
|
.map(|(_, bytes)| { |
|
|
|
|
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap()) |
|
|
|
|
.ok() |
|
|
|
|
}) |
|
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
|
.flatten() |
|
|
|
|
.collect()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -117,15 +114,11 @@ impl Rooms {
@@ -117,15 +114,11 @@ impl Rooms {
|
|
|
|
|
let state = self |
|
|
|
|
.stateid_shorteventid |
|
|
|
|
.scan_prefix(shortstatehash.to_be_bytes().to_vec()) |
|
|
|
|
.map(|(_, bytes)| self.shorteventid_eventid.get(&bytes).ok().flatten()) |
|
|
|
|
.flatten() |
|
|
|
|
.map(|bytes| { |
|
|
|
|
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| { |
|
|
|
|
Error::bad_database("EventID in stateid_shorteventid is invalid unicode.") |
|
|
|
|
})?) |
|
|
|
|
.map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid.")) |
|
|
|
|
.map(|(_, bytes)| { |
|
|
|
|
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap()) |
|
|
|
|
.ok() |
|
|
|
|
}) |
|
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
|
.flatten() |
|
|
|
|
.map(|eventid| self.get_pdu(&eventid)) |
|
|
|
|
.filter_map(|r| r.ok().flatten()) |
|
|
|
|
.map(|pdu| { |
|
|
|
|
@ -167,15 +160,10 @@ impl Rooms {
@@ -167,15 +160,10 @@ impl Rooms {
|
|
|
|
|
Ok(self |
|
|
|
|
.stateid_shorteventid |
|
|
|
|
.get(&stateid)? |
|
|
|
|
.map(|bytes| self.shorteventid_eventid.get(&bytes).ok().flatten()) |
|
|
|
|
.flatten() |
|
|
|
|
.map(|bytes| { |
|
|
|
|
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| { |
|
|
|
|
Error::bad_database("EventID in stateid_shorteventid is invalid unicode.") |
|
|
|
|
})?) |
|
|
|
|
.map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid.")) |
|
|
|
|
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap()) |
|
|
|
|
.ok() |
|
|
|
|
}) |
|
|
|
|
.map(|r| r.ok()) |
|
|
|
|
.flatten()) |
|
|
|
|
} else { |
|
|
|
|
Ok(None) |
|
|
|
|
@ -315,19 +303,7 @@ impl Rooms {
@@ -315,19 +303,7 @@ impl Rooms {
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
let (shortstatehash, already_existed) = |
|
|
|
|
match self.statehash_shortstatehash.get(&state_hash)? { |
|
|
|
|
Some(shortstatehash) => ( |
|
|
|
|
utils::u64_from_bytes(&shortstatehash) |
|
|
|
|
.map_err(|_| Error::bad_database("Invalid shortstatehash in db."))?, |
|
|
|
|
true, |
|
|
|
|
), |
|
|
|
|
None => { |
|
|
|
|
let shortstatehash = db.globals.next_count()?; |
|
|
|
|
self.statehash_shortstatehash |
|
|
|
|
.insert(&state_hash, &shortstatehash.to_be_bytes())?; |
|
|
|
|
(shortstatehash, false) |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
self.get_or_create_shortstatehash(&state_hash, &db.globals)?; |
|
|
|
|
|
|
|
|
|
let new_state = if !already_existed { |
|
|
|
|
let mut new_state = HashSet::new(); |
|
|
|
|
@ -352,25 +328,14 @@ impl Rooms {
@@ -352,25 +328,14 @@ impl Rooms {
|
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let shorteventid = |
|
|
|
|
match self.eventid_shorteventid.get(eventid.as_bytes()).ok()? { |
|
|
|
|
Some(shorteventid) => shorteventid.to_vec(), |
|
|
|
|
None => { |
|
|
|
|
let shorteventid = db.globals.next_count().ok()?; |
|
|
|
|
self.eventid_shorteventid |
|
|
|
|
.insert(eventid.as_bytes(), &shorteventid.to_be_bytes()) |
|
|
|
|
.ok()?; |
|
|
|
|
self.shorteventid_eventid |
|
|
|
|
.insert(&shorteventid.to_be_bytes(), eventid.as_bytes()) |
|
|
|
|
.ok()?; |
|
|
|
|
shorteventid.to_be_bytes().to_vec() |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let shorteventid = self |
|
|
|
|
.get_or_create_shorteventid(&eventid, &db.globals) |
|
|
|
|
.ok()?; |
|
|
|
|
|
|
|
|
|
let mut state_id = shortstatehash.to_be_bytes().to_vec(); |
|
|
|
|
state_id.extend_from_slice(&shortstatekey); |
|
|
|
|
|
|
|
|
|
Some((state_id, shorteventid)) |
|
|
|
|
Some((state_id, shorteventid.to_be_bytes().to_vec())) |
|
|
|
|
}) |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
|
|
|
|
|
@ -428,6 +393,76 @@ impl Rooms {
@@ -428,6 +393,76 @@ impl Rooms {
|
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns (shortstatehash, already_existed)
|
|
|
|
|
fn get_or_create_shortstatehash( |
|
|
|
|
&self, |
|
|
|
|
state_hash: &StateHashId, |
|
|
|
|
globals: &super::globals::Globals, |
|
|
|
|
) -> Result<(u64, bool)> { |
|
|
|
|
Ok(match self.statehash_shortstatehash.get(&state_hash)? { |
|
|
|
|
Some(shortstatehash) => ( |
|
|
|
|
utils::u64_from_bytes(&shortstatehash) |
|
|
|
|
.map_err(|_| Error::bad_database("Invalid shortstatehash in db."))?, |
|
|
|
|
true, |
|
|
|
|
), |
|
|
|
|
None => { |
|
|
|
|
let shortstatehash = globals.next_count()?; |
|
|
|
|
self.statehash_shortstatehash |
|
|
|
|
.insert(&state_hash, &shortstatehash.to_be_bytes())?; |
|
|
|
|
(shortstatehash, false) |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns (shortstatehash, already_existed)
|
|
|
|
|
pub fn get_or_create_shorteventid( |
|
|
|
|
&self, |
|
|
|
|
event_id: &EventId, |
|
|
|
|
globals: &super::globals::Globals, |
|
|
|
|
) -> Result<u64> { |
|
|
|
|
Ok(match self.eventid_shorteventid.get(event_id.as_bytes())? { |
|
|
|
|
Some(shorteventid) => utils::u64_from_bytes(&shorteventid) |
|
|
|
|
.map_err(|_| Error::bad_database("Invalid shorteventid in db."))?, |
|
|
|
|
None => { |
|
|
|
|
let shorteventid = globals.next_count()?; |
|
|
|
|
self.eventid_shorteventid |
|
|
|
|
.insert(event_id.as_bytes(), &shorteventid.to_be_bytes())?; |
|
|
|
|
self.shorteventid_eventid |
|
|
|
|
.insert(&shorteventid.to_be_bytes(), event_id.as_bytes())?; |
|
|
|
|
shorteventid |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result<EventId> { |
|
|
|
|
if let Some(id) = self |
|
|
|
|
.shorteventid_cache |
|
|
|
|
.lock() |
|
|
|
|
.unwrap() |
|
|
|
|
.get_mut(&shorteventid) |
|
|
|
|
{ |
|
|
|
|
return Ok(id.clone()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let bytes = self |
|
|
|
|
.shorteventid_eventid |
|
|
|
|
.get(&shorteventid.to_be_bytes())? |
|
|
|
|
.ok_or_else(|| Error::bad_database("Shorteventid does not exist"))?; |
|
|
|
|
|
|
|
|
|
let event_id = |
|
|
|
|
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| { |
|
|
|
|
Error::bad_database("EventID in roomid_pduleaves is invalid unicode.") |
|
|
|
|
})?) |
|
|
|
|
.map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))?; |
|
|
|
|
|
|
|
|
|
self.shorteventid_cache |
|
|
|
|
.lock() |
|
|
|
|
.unwrap() |
|
|
|
|
.insert(shorteventid, event_id.clone()); |
|
|
|
|
|
|
|
|
|
Ok(event_id) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns the full room state.
|
|
|
|
|
#[tracing::instrument(skip(self))] |
|
|
|
|
pub fn room_state_full( |
|
|
|
|
@ -1116,17 +1151,7 @@ impl Rooms {
@@ -1116,17 +1151,7 @@ impl Rooms {
|
|
|
|
|
state: &StateMap<Arc<PduEvent>>, |
|
|
|
|
globals: &super::globals::Globals, |
|
|
|
|
) -> Result<()> { |
|
|
|
|
let shorteventid = match self.eventid_shorteventid.get(event_id.as_bytes())? { |
|
|
|
|
Some(shorteventid) => shorteventid.to_vec(), |
|
|
|
|
None => { |
|
|
|
|
let shorteventid = globals.next_count()?; |
|
|
|
|
self.eventid_shorteventid |
|
|
|
|
.insert(event_id.as_bytes(), &shorteventid.to_be_bytes())?; |
|
|
|
|
self.shorteventid_eventid |
|
|
|
|
.insert(&shorteventid.to_be_bytes(), event_id.as_bytes())?; |
|
|
|
|
shorteventid.to_be_bytes().to_vec() |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let shorteventid = self.get_or_create_shorteventid(&event_id, globals)?; |
|
|
|
|
|
|
|
|
|
let state_hash = self.calculate_hash( |
|
|
|
|
&state |
|
|
|
|
@ -1135,69 +1160,45 @@ impl Rooms {
@@ -1135,69 +1160,45 @@ impl Rooms {
|
|
|
|
|
.collect::<Vec<_>>(), |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? { |
|
|
|
|
Some(shortstatehash) => { |
|
|
|
|
// State already existed in db
|
|
|
|
|
self.shorteventid_shortstatehash |
|
|
|
|
.insert(&shorteventid, &*shortstatehash)?; |
|
|
|
|
return Ok(()); |
|
|
|
|
} |
|
|
|
|
None => { |
|
|
|
|
let shortstatehash = globals.next_count()?; |
|
|
|
|
self.statehash_shortstatehash |
|
|
|
|
.insert(&state_hash, &shortstatehash.to_be_bytes())?; |
|
|
|
|
shortstatehash.to_be_bytes().to_vec() |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let (shortstatehash, already_existed) = |
|
|
|
|
self.get_or_create_shortstatehash(&state_hash, globals)?; |
|
|
|
|
|
|
|
|
|
let batch = state |
|
|
|
|
.iter() |
|
|
|
|
.filter_map(|((event_type, state_key), pdu)| { |
|
|
|
|
let mut statekey = event_type.as_ref().as_bytes().to_vec(); |
|
|
|
|
statekey.push(0xff); |
|
|
|
|
statekey.extend_from_slice(&state_key.as_bytes()); |
|
|
|
|
|
|
|
|
|
let shortstatekey = match self.statekey_shortstatekey.get(&statekey).ok()? { |
|
|
|
|
Some(shortstatekey) => shortstatekey.to_vec(), |
|
|
|
|
None => { |
|
|
|
|
let shortstatekey = globals.next_count().ok()?; |
|
|
|
|
self.statekey_shortstatekey |
|
|
|
|
.insert(&statekey, &shortstatekey.to_be_bytes()) |
|
|
|
|
.ok()?; |
|
|
|
|
shortstatekey.to_be_bytes().to_vec() |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
if !already_existed { |
|
|
|
|
let batch = state |
|
|
|
|
.iter() |
|
|
|
|
.filter_map(|((event_type, state_key), pdu)| { |
|
|
|
|
let mut statekey = event_type.as_ref().as_bytes().to_vec(); |
|
|
|
|
statekey.push(0xff); |
|
|
|
|
statekey.extend_from_slice(&state_key.as_bytes()); |
|
|
|
|
|
|
|
|
|
let shorteventid = match self |
|
|
|
|
.eventid_shorteventid |
|
|
|
|
.get(pdu.event_id.as_bytes()) |
|
|
|
|
.ok()? |
|
|
|
|
{ |
|
|
|
|
Some(shorteventid) => shorteventid.to_vec(), |
|
|
|
|
None => { |
|
|
|
|
let shorteventid = globals.next_count().ok()?; |
|
|
|
|
self.eventid_shorteventid |
|
|
|
|
.insert(pdu.event_id.as_bytes(), &shorteventid.to_be_bytes()) |
|
|
|
|
.ok()?; |
|
|
|
|
self.shorteventid_eventid |
|
|
|
|
.insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes()) |
|
|
|
|
.ok()?; |
|
|
|
|
shorteventid.to_be_bytes().to_vec() |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let shortstatekey = match self.statekey_shortstatekey.get(&statekey).ok()? { |
|
|
|
|
Some(shortstatekey) => shortstatekey.to_vec(), |
|
|
|
|
None => { |
|
|
|
|
let shortstatekey = globals.next_count().ok()?; |
|
|
|
|
self.statekey_shortstatekey |
|
|
|
|
.insert(&statekey, &shortstatekey.to_be_bytes()) |
|
|
|
|
.ok()?; |
|
|
|
|
shortstatekey.to_be_bytes().to_vec() |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let mut state_id = shortstatehash.clone(); |
|
|
|
|
state_id.extend_from_slice(&shortstatekey); |
|
|
|
|
let shorteventid = self |
|
|
|
|
.get_or_create_shorteventid(&pdu.event_id, globals) |
|
|
|
|
.ok()?; |
|
|
|
|
|
|
|
|
|
Some((state_id, shorteventid)) |
|
|
|
|
}) |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
let mut state_id = shortstatehash.to_be_bytes().to_vec(); |
|
|
|
|
state_id.extend_from_slice(&shortstatekey); |
|
|
|
|
|
|
|
|
|
self.stateid_shorteventid |
|
|
|
|
.insert_batch(&mut batch.into_iter())?; |
|
|
|
|
Some((state_id, shorteventid.to_be_bytes().to_vec())) |
|
|
|
|
}) |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
|
|
|
|
|
self.stateid_shorteventid |
|
|
|
|
.insert_batch(&mut batch.into_iter())?; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
self.shorteventid_shortstatehash |
|
|
|
|
.insert(&shorteventid, &*shortstatehash)?; |
|
|
|
|
.insert(&shorteventid.to_be_bytes(), &shortstatehash.to_be_bytes())?; |
|
|
|
|
|
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
@ -1212,26 +1213,16 @@ impl Rooms {
@@ -1212,26 +1213,16 @@ impl Rooms {
|
|
|
|
|
new_pdu: &PduEvent, |
|
|
|
|
globals: &super::globals::Globals, |
|
|
|
|
) -> Result<u64> { |
|
|
|
|
let shorteventid = self.get_or_create_shorteventid(&new_pdu.event_id, globals)?; |
|
|
|
|
|
|
|
|
|
let old_state = if let Some(old_shortstatehash) = |
|
|
|
|
self.roomid_shortstatehash.get(new_pdu.room_id.as_bytes())? |
|
|
|
|
{ |
|
|
|
|
// Store state for event. The state does not include the event itself.
|
|
|
|
|
// Instead it's the state before the pdu, so the room's old state.
|
|
|
|
|
|
|
|
|
|
let shorteventid = match self.eventid_shorteventid.get(new_pdu.event_id.as_bytes())? { |
|
|
|
|
Some(shorteventid) => shorteventid.to_vec(), |
|
|
|
|
None => { |
|
|
|
|
let shorteventid = globals.next_count()?; |
|
|
|
|
self.eventid_shorteventid |
|
|
|
|
.insert(new_pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?; |
|
|
|
|
self.shorteventid_eventid |
|
|
|
|
.insert(&shorteventid.to_be_bytes(), new_pdu.event_id.as_bytes())?; |
|
|
|
|
shorteventid.to_be_bytes().to_vec() |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
self.shorteventid_shortstatehash |
|
|
|
|
.insert(&shorteventid, &old_shortstatehash)?; |
|
|
|
|
.insert(&shorteventid.to_be_bytes(), &old_shortstatehash)?; |
|
|
|
|
|
|
|
|
|
if new_pdu.state_key.is_none() { |
|
|
|
|
return utils::u64_from_bytes(&old_shortstatehash).map_err(|_| { |
|
|
|
|
Error::bad_database("Invalid shortstatehash in roomid_shortstatehash.") |
|
|
|
|
@ -1264,19 +1255,7 @@ impl Rooms {
@@ -1264,19 +1255,7 @@ impl Rooms {
|
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let shorteventid = match self.eventid_shorteventid.get(new_pdu.event_id.as_bytes())? { |
|
|
|
|
Some(shorteventid) => shorteventid.to_vec(), |
|
|
|
|
None => { |
|
|
|
|
let shorteventid = globals.next_count()?; |
|
|
|
|
self.eventid_shorteventid |
|
|
|
|
.insert(new_pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?; |
|
|
|
|
self.shorteventid_eventid |
|
|
|
|
.insert(&shorteventid.to_be_bytes(), new_pdu.event_id.as_bytes())?; |
|
|
|
|
shorteventid.to_be_bytes().to_vec() |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
new_state.insert(shortstatekey, shorteventid); |
|
|
|
|
new_state.insert(shortstatekey, shorteventid.to_be_bytes().to_vec()); |
|
|
|
|
|
|
|
|
|
let new_state_hash = self.calculate_hash( |
|
|
|
|
&new_state |
|
|
|
|
@ -1516,11 +1495,7 @@ impl Rooms {
@@ -1516,11 +1495,7 @@ impl Rooms {
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
// Generate short event id
|
|
|
|
|
let shorteventid = db.globals.next_count()?; |
|
|
|
|
self.eventid_shorteventid |
|
|
|
|
.insert(pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?; |
|
|
|
|
self.shorteventid_eventid |
|
|
|
|
.insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes())?; |
|
|
|
|
let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id, &db.globals)?; |
|
|
|
|
|
|
|
|
|
// We append to state before appending the pdu, so we don't have a moment in time with the
|
|
|
|
|
// pdu without it's state. This is okay because append_pdu can't fail.
|
|
|
|
|
@ -2655,9 +2630,7 @@ impl Rooms {
@@ -2655,9 +2630,7 @@ impl Rooms {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[tracing::instrument(skip(self))] |
|
|
|
|
pub fn auth_chain_cache( |
|
|
|
|
&self, |
|
|
|
|
) -> std::sync::MutexGuard<'_, LruCache<Vec<EventId>, HashSet<EventId>>> { |
|
|
|
|
pub fn auth_chain_cache(&self) -> std::sync::MutexGuard<'_, LruCache<u64, HashSet<u64>>> { |
|
|
|
|
self.auth_chain_cache.lock().unwrap() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|