Browse Source

Resolve state when forked and keep track of state snapshots

merge-requests/10/head
Devin Ragotzy 5 years ago
parent
commit
26afeb0c6d
  1. 2
      src/client_server/membership.rs
  2. 53
      src/database/rooms.rs
  3. 502
      src/server_server.rs

2
src/client_server/membership.rs

@ -667,7 +667,7 @@ async fn join_room_by_id_helper(
// this is a `state_res::StateEvent` that holds a `ruma::Pdu` // this is a `state_res::StateEvent` that holds a `ruma::Pdu`
let pdu = event_map let pdu = event_map
.get(ev_id) .get(ev_id)
.expect("Found event_id in sorted events that is not in resolved state"); .expect("found event_id in sorted events that is not in resolved state");
// We do not rebuild the PDU in this case only insert to DB // We do not rebuild the PDU in this case only insert to DB
let count = db.globals.next_count()?; let count = db.globals.next_count()?;

53
src/database/rooms.rs

@ -135,6 +135,29 @@ impl Rooms {
.collect::<Result<StateMap<_>>>() .collect::<Result<StateMap<_>>>()
} }
/// Returns all state entries we know of from a specific event_id onward.
/// The PDU's are ordered most recent to least recent.
///
/// This is used to resolve a forward extremity. We have the other servers
/// arm now we need ours.
/// TODO: Probably return Vec<PrevEvents<PduEvent>>
pub fn state_from(&self, room_id: &RoomId, event_id: &EventId) -> Result<Vec<PduEvent>> {
let mut last_ids = self.get_pdu_leaves(room_id)?;
let mut collected = vec![];
while let Some(find_id) = last_ids.pop() {
if event_id == &find_id {
break;
}
if let Some(pdu) = self.get_pdu(&find_id)? {
last_ids.extend(pdu.prev_events.to_vec());
collected.push(pdu);
}
}
Ok(collected)
}
/// Returns all state entries for this type. /// Returns all state entries for this type.
pub fn state_type( pub fn state_type(
&self, &self,
@ -253,6 +276,36 @@ impl Rooms {
.is_some()) .is_some())
} }
/// Force the creation of a new StateHash and insert it into the db. This also associates
/// the given `pdu` with the new StateHash.
///
/// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot.
pub fn force_state_with_pdu(
&self,
new_pdu_id: &[u8],
room_id: &RoomId,
state: HashMap<(EventType, String), Vec<u8>>,
) -> Result<()> {
let state_hash =
self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::<Vec<_>>())?;
let mut prefix = state_hash.to_vec();
prefix.push(0xff);
for ((event_type, state_key), pdu_id) in state {
let mut state_id = prefix.clone();
state_id.extend_from_slice(&event_type.as_ref().as_bytes());
state_id.push(0xff);
state_id.extend_from_slice(&state_key.as_bytes());
self.stateid_pduid.insert(state_id, pdu_id.to_vec())?;
}
self.pduid_statehash.insert(new_pdu_id, &*state_hash)?;
self.roomid_statehash
.insert(room_id.as_bytes(), &*state_hash)?;
Ok(())
}
/// Force the creation of a new StateHash and insert it into the db. /// Force the creation of a new StateHash and insert it into the db.
/// ///
/// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot. /// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot.

502
src/server_server.rs

@ -388,12 +388,12 @@ pub async fn get_public_rooms_route(
} }
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] #[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum AuthEvents<T> { pub enum PrevEvents<T> {
Sequential(T), Sequential(T),
Fork(Vec<T>), Fork(Vec<T>),
} }
impl<T> IntoIterator for AuthEvents<T> { impl<T> IntoIterator for PrevEvents<T> {
type Item = T; type Item = T;
type IntoIter = std::vec::IntoIter<Self::Item>; type IntoIter = std::vec::IntoIter<Self::Item>;
@ -405,7 +405,7 @@ impl<T> IntoIterator for AuthEvents<T> {
} }
} }
impl<T: Clone> AuthEvents<T> { impl<T: Clone> PrevEvents<T> {
pub fn new(id: &[T]) -> Self { pub fn new(id: &[T]) -> Self {
match id { match id {
[] => panic!("All events must have previous event"), [] => panic!("All events must have previous event"),
@ -478,15 +478,15 @@ pub async fn send_transaction_message_route<'a>(
} }
let pdu = &body.pdus[pdu_idx]; let pdu = &body.pdus[pdu_idx];
pdu_idx += 1; pdu_idx += 1;
// Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped.
// Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped.
// state-res checks signatures - 2. Passes signature checks, otherwise event is dropped. // state-res checks signatures - 2. Passes signature checks, otherwise event is dropped.
// 3. Passes hash checks, otherwise it is redacted before being processed further. // 3. Passes hash checks, otherwise it is redacted before being processed further.
// TODO: redact event if hashing fails // TODO: redact event if hashing fails
let (event_id, value) = crate::pdu::process_incoming_pdu(pdu); let (event_id, value) = crate::pdu::process_incoming_pdu(pdu);
let pdu = serde_json::from_value::<PduEvent>( let mut pdu = serde_json::from_value::<PduEvent>(
serde_json::to_value(&value).expect("CanonicalJsonObj is a valid JsonValue"), serde_json::to_value(&value).expect("CanonicalJsonObj is a valid JsonValue"),
) )
.expect("all ruma pdus are conduit pdus"); .expect("all ruma pdus are conduit pdus");
@ -501,23 +501,31 @@ pub async fn send_transaction_message_route<'a>(
// The events that must be resolved to catch up to the incoming event // The events that must be resolved to catch up to the incoming event
let mut missing = vec![]; let mut missing = vec![];
let mut seen = state_res::StateMap::new(); let mut seen = state_res::StateMap::new();
let mut seen_id = None;
let mut prev_ids = vec![AuthEvents::new(&pdu.prev_events)]; let mut prev_ids = vec![PrevEvents::new(&pdu.prev_events)];
// This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue // This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue
// in the case of a failed request to the server that sent the event // in the case of a failed request to the server that sent the event
'inner: loop { 'inner: loop {
// TODO: if this is ever more than 1 at a time we must do actual
// full state resolution not just auth
match prev_ids.pop() { match prev_ids.pop() {
Some(AuthEvents::Sequential(id)) => match db Some(PrevEvents::Sequential(id)) => match db
.rooms .rooms
.pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())?
{ {
// We know the state snapshot for this events parents so we can simply auth the // We found a common ancestor
// incoming event and append to DB and append to state if it passes
Some(state_hash) => { Some(state_hash) => {
seen_id = Some(id.clone());
seen = db.rooms.state_full(&state_hash)?; seen = db.rooms.state_full(&state_hash)?;
if let Some(pdu) = db.rooms.get_pdu(&id)? {
if pdu.state_key.is_some() {
// This becomes the state after the common event
seen.insert(
(pdu.kind.clone(), pdu.state_key.clone().unwrap()),
pdu,
);
}
}
break 'inner; break 'inner;
} }
// We need to fill in information about this event's `prev_events` (parents) // We need to fill in information about this event's `prev_events` (parents)
@ -540,8 +548,8 @@ pub async fn send_transaction_message_route<'a>(
// TODO: do we need this // TODO: do we need this
assert_eq!(room_id, &prev_pdu.room_id); assert_eq!(room_id, &prev_pdu.room_id);
prev_ids.push(AuthEvents::new(&prev_pdu.prev_events)); prev_ids.push(PrevEvents::new(&prev_pdu.prev_events));
missing.push(AuthEvents::Sequential(prev_pdu)); missing.push(PrevEvents::Sequential(prev_pdu));
} }
// We can't hard fail because there are some valid errors, just // We can't hard fail because there are some valid errors, just
// keep checking PDU's // keep checking PDU's
@ -556,7 +564,7 @@ pub async fn send_transaction_message_route<'a>(
}; };
} }
}, },
Some(AuthEvents::Fork(ids)) => { Some(PrevEvents::Fork(ids)) => {
error!( error!(
"prev_events > 1: {}", "prev_events > 1: {}",
serde_json::to_string_pretty(&pdu).unwrap() serde_json::to_string_pretty(&pdu).unwrap()
@ -583,10 +591,19 @@ pub async fn send_transaction_message_route<'a>(
.rooms .rooms
.pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())?
{ {
// We know the state snapshot for this events parents so we can simply auth the // We found a common ancestor
// incoming event and append to DB and append to state if it passes
Some(state_hash) => { Some(state_hash) => {
seen_id = Some(id.clone());
seen = db.rooms.state_full(&state_hash)?; seen = db.rooms.state_full(&state_hash)?;
if let Some(pdu) = db.rooms.get_pdu(&id)? {
if pdu.state_key.is_some() {
// This becomes the state after the common event
seen.insert(
(pdu.kind.clone(), pdu.state_key.clone().unwrap()),
pdu,
);
}
}
break 'inner; break 'inner;
} }
None => match send_request( None => match send_request(
@ -621,10 +638,10 @@ pub async fn send_transaction_message_route<'a>(
}, },
} }
} }
prev_ids.push(AuthEvents::new( prev_ids.push(PrevEvents::new(
&prev_fork_ids.into_iter().collect::<Vec<_>>(), &prev_fork_ids.into_iter().collect::<Vec<_>>(),
)); ));
missing.push(AuthEvents::new(&missing_fork)); missing.push(PrevEvents::new(&missing_fork));
} }
// All done finding missing events // All done finding missing events
None => { None => {
@ -633,49 +650,94 @@ pub async fn send_transaction_message_route<'a>(
} }
} }
// Now build up state // We can treat this event as sequential and simply apply it against the current state of the room
let mut state_snapshot = seen // because we know that state
.iter() if missing.is_empty() {
.map(|(k, v)| (k.clone(), v.event_id.clone())) // Back to the original incoming event
.collect(); // If it is a non state event we still must add it and associate a statehash with the pdu_id
let mut accum_event_map = seen if value.get("state_key").is_none() {
.iter() // TODO: Some auth needs to be done for non state events
.map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) if !db.rooms.is_joined(&pdu.sender, room_id)? {
.collect::<BTreeMap<_, Arc<state_res::StateEvent>>>(); error!("Sender is not joined {}", pdu.kind);
resolved_map.insert(event_id, Err("Sender not found in room".into()));
continue 'outer;
}
if !missing.is_empty() { append_state(&db, &pdu)?;
// This is the state at incoming pdu resolved_map.insert(event_id, Ok::<(), String>(()));
state_snapshot = match state_res::StateResolution::resolve_incoming( continue 'outer;
} else {
let incoming = pdu.convert_for_state_res();
match state_res::StateResolution::apply_event(
room_id, room_id,
&RoomVersionId::Version6, &RoomVersionId::Version6,
&state_snapshot, incoming.clone(),
missing &seen
.iter() .iter()
.cloned() .map(|(k, v)| (k.clone(), v.event_id.clone()))
.flatten() .collect::<BTreeMap<_, _>>(),
.filter(|pdu| pdu.state_key.is_some()) // remove non state events Some(
.map(|pdu| ((pdu.kind, pdu.state_key.unwrap()), pdu.event_id)) seen.iter()
.collect(), .map(|(_k, v)| (v.event_id.clone(), v.convert_for_state_res()))
Some(accum_event_map), .collect::<BTreeMap<_, _>>(),
), // TODO: make mut and keep around, this is all the auth events
&db.rooms, &db.rooms,
) { ) {
Ok(res) => res, Ok(true) => {
append_state(&db, &pdu)?;
resolved_map.insert(event_id, Ok::<(), String>(()));
continue 'outer;
}
Ok(false) => {
resolved_map.insert(event_id, Err("Failed event auth".to_string()));
error!("Failed sequential event auth for incoming");
continue 'outer;
}
Err(err) => { Err(err) => {
resolved_map.insert(event_id, Err(err.to_string())); resolved_map.insert(event_id, Err(err.to_string()));
error!("{}", err); error!("{}", err);
continue; continue 'outer;
}
} }
} }
} }
// TODO: this only accounts for sequentially missing events no holes will be filled // Well, now we have to actually do a bunch of work :(
// and I'm still not sure what happens when a fork introduces multiple `prev_events` // The steps are as follows
// // 1. Rebuild the sending servers forward extremity, ignoring our own fork
// We need to go from oldest (furthest ancestor of the incoming event) to the // a) iterate "oldest" -> most recent authenticating each event with the state after the previous
// prev_event of the incoming event so we reverse the order oldest -> most recent // b) build a `snapshot_map` containing the state after the event for each missing event (EventId -> StateMap<EventId>)
for missing_pdu in missing.into_iter().rev() { // 2. Build our side of the fork (TODO do we have to re-auth these, is state at an event relative to the server its from)
// 3. resolve the two states (our current with the state after the most recent missing event)
// Now build up state
let mut state_snapshot = seen
.iter()
.map(|(k, v)| (k.clone(), v.event_id.clone()))
.collect::<BTreeMap<_, _>>();
// TODO: So this is super memory inefficient we clone the state_snapshot every time
// we need it for state events and non
let mut snapshot_map = BTreeMap::new();
snapshot_map.insert(seen_id.clone().unwrap(), state_snapshot.clone());
let accum_event_map = seen
.iter()
.map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res()))
.chain(
missing
.iter()
.cloned()
.flatten()
.map(|pdu| (pdu.event_id.clone(), pdu.convert_for_state_res())),
)
.collect::<BTreeMap<_, Arc<state_res::StateEvent>>>();
// 4. Passes authorization rules based on the event's auth events, otherwise it is rejected.
// 5. Passes authorization rules based on the state at the event, otherwise it is rejected.
for missing_pdu in missing.iter().rev() {
match missing_pdu { match missing_pdu {
AuthEvents::Sequential(missing_pdu) => { PrevEvents::Sequential(missing_pdu) => {
// For state events // For state events
if missing_pdu.state_key.is_some() { if missing_pdu.state_key.is_some() {
let missing_pdu = missing_pdu.convert_for_state_res(); let missing_pdu = missing_pdu.convert_for_state_res();
@ -691,46 +753,9 @@ pub async fn send_transaction_message_route<'a>(
// TODO: do we need this // TODO: do we need this
assert_eq!(room_id, missing_pdu.room_id()); assert_eq!(room_id, missing_pdu.room_id());
let count = db.globals.next_count()?; // We can't add to DB yet since we don't know if it passes
let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); // current room state
pdu_id.push(0xff); // append_state(&db, &PduEvent::from(&*missing_pdu))?;
pdu_id.extend_from_slice(&count.to_be_bytes());
// // Since we know the state from event to event we can do the
// // slightly more efficient force_state
// db.rooms.force_state(
// room_id,
// state_snapshot
// .iter()
// .map(|(k, v)| {
// (
// k.clone(),
// serde_json::to_vec(
// &db.rooms
// .get_pdu(v)
// .expect("db err")
// .expect("we know of all the pdus"),
// )
// .expect("serde can serialize pdus"),
// )
// })
// .collect(),
// )?;
db.rooms
.append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?;
// Now append the new missing event to DB it will be part of the
// next events state_snapshot
db.rooms.append_pdu(
&PduEvent::from(&*missing_pdu),
&utils::to_canonical_object(&*missing_pdu)
.expect("Pdu is valid canonical object"),
count,
pdu_id.clone().into(),
&db.globals,
&db.account_data,
&db.admin,
)?;
// Only after the state is recorded in the DB can we update the state_snapshot // Only after the state is recorded in the DB can we update the state_snapshot
// This will update the state snapshot so it is correct next loop through // This will update the state snapshot so it is correct next loop through
@ -738,7 +763,8 @@ pub async fn send_transaction_message_route<'a>(
(missing_pdu.kind(), missing_pdu.state_key()), (missing_pdu.kind(), missing_pdu.state_key()),
missing_pdu.event_id(), missing_pdu.event_id(),
); );
accum_event_map.insert(missing_pdu.event_id(), missing_pdu); // Keep track of the state after for resolution
snapshot_map.insert(missing_pdu.event_id(), state_snapshot.clone());
} }
Ok(false) => { Ok(false) => {
error!( error!(
@ -754,43 +780,37 @@ pub async fn send_transaction_message_route<'a>(
// TODO: what to do when missing events fail (not incoming events) // TODO: what to do when missing events fail (not incoming events)
} }
} }
// All events between the event we know about and the incoming event need to be accounted
// for
} else { } else {
// TODO: Some auth needs to be done for non state events // TODO: Some auth needs to be done for non state events
if !db if !db
.rooms .rooms
.is_joined(&missing_pdu.sender, &missing_pdu.room_id)? .is_joined(&missing_pdu.sender, &missing_pdu.room_id)?
{ {
error!("Sender is not joined {}", missing_pdu.kind); error!("fork Sender is not joined {}", missing_pdu.kind);
// TODO: we probably should not be getting events for different rooms // TODO: we probably should not be getting events for different rooms
//
// I think we need to keep going until we reach
// the incoming pdu so do not error here
continue; continue;
} }
let count = db.globals.next_count()?; // TODO: a better way to signal non state events...
let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); snapshot_map.insert(missing_pdu.event_id.clone(), state_snapshot.clone());
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_to_state(&pdu_id, &missing_pdu)?;
db.rooms.append_pdu(
&missing_pdu,
&utils::to_canonical_object(&missing_pdu)
.expect("Pdu is valid canonical object"),
count,
pdu_id.into(),
&db.globals,
&db.account_data,
&db.admin,
)?;
// Continue this inner for loop adding all the missing events // Continue this inner for loop adding all the missing events
} }
} }
AuthEvents::Fork(pdus) => { PrevEvents::Fork(pdus) => {
for missing_pdu in pdus.into_iter().rev() { let mut state_sets = BTreeSet::new();
for pdu in pdus {
let mut state_at = state_snapshot.clone();
if pdu.state_key.is_some() {
state_at.insert(
(pdu.kind.clone(), pdu.state_key.clone().unwrap()),
pdu.event_id.clone(),
);
}
state_sets.insert(state_at);
}
if state_sets.len() <= 1 {
for missing_pdu in pdus.iter() {
// For state events // For state events
if missing_pdu.state_key.is_some() { if missing_pdu.state_key.is_some() {
let missing_pdu = missing_pdu.convert_for_state_res(); let missing_pdu = missing_pdu.convert_for_state_res();
@ -806,31 +826,12 @@ pub async fn send_transaction_message_route<'a>(
// TODO: do we need this // TODO: do we need this
assert_eq!(room_id, missing_pdu.room_id()); assert_eq!(room_id, missing_pdu.room_id());
let count = db.globals.next_count()?;
let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms
.append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?;
db.rooms.append_pdu(
&PduEvent::from(&*missing_pdu),
&utils::to_canonical_object(&*missing_pdu)
.expect("Pdu is valid canonical object"),
count,
pdu_id.clone().into(),
&db.globals,
&db.account_data,
&db.admin,
)?;
// Only after the state is recorded in the DB can we update the state_snapshot
// This will update the state snapshot so it is correct next loop through
state_snapshot.insert( state_snapshot.insert(
(missing_pdu.kind(), missing_pdu.state_key()), (missing_pdu.kind(), missing_pdu.state_key()),
missing_pdu.event_id(), missing_pdu.event_id(),
); );
accum_event_map.insert(missing_pdu.event_id(), missing_pdu); snapshot_map
.insert(missing_pdu.event_id(), state_snapshot.clone());
} }
Ok(false) => { Ok(false) => {
error!( error!(
@ -854,38 +855,118 @@ pub async fn send_transaction_message_route<'a>(
// TODO: we probably should not be getting events for different rooms // TODO: we probably should not be getting events for different rooms
continue; continue;
} }
snapshot_map
.insert(missing_pdu.event_id.clone(), state_snapshot.clone());
// Continue this inner for loop adding all the missing events
}
}
} else {
match state_res::StateResolution::resolve(
room_id,
&RoomVersionId::Version6,
&state_sets.into_iter().collect::<Vec<_>>(),
Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(resolved) => {
for id in resolved
.values()
.filter(|id| pdus.iter().any(|pduid| pduid.event_id == **id))
{
snapshot_map.insert(id.clone(), resolved.clone());
}
}
Err(err) => {
error!("{}", err);
}
}
}
}
}
}
// We know these events are valid for each state at the event
// we have already authed them
let known_fork = if let Some(id) = seen_id {
db.rooms.state_from(room_id, &id)?
} else {
vec![]
};
for pdu in known_fork.clone().into_iter().rev() {
if pdu.state_key.is_some() {
seen.insert((pdu.kind.clone(), pdu.state_key.clone().unwrap()), pdu);
}
}
let current_room_state = seen
.into_iter()
.map(|(k, v)| (k, v.event_id))
.collect::<BTreeMap<_, _>>();
let mut state_sets = BTreeSet::new();
state_sets.insert(current_room_state.clone());
// The first item is the most recent
if let Some(fork) = missing.first() {
for pdu in fork.clone().into_iter() {
if let Some(map) = snapshot_map.get(&pdu.event_id) {
state_sets.insert(map.clone());
}
}
}
let mut state_sets = state_sets.into_iter().collect::<Vec<_>>();
// If we have actual differences resolve
if state_sets.len() > 1 {
// Set the incoming event to have parents from both forks
// ours/theirs
pdu.prev_events = missing
.first()
.cloned()
.into_iter()
.flatten()
.map(|pdu| pdu.event_id)
.chain(known_fork.first().cloned().map(|pdu| pdu.event_id))
.collect();
// The incoming event is a child of both forks now
for set in &mut state_sets {
set.insert(
(pdu.kind.clone(), pdu.state_key.clone().unwrap()),
pdu.event_id.clone(),
);
}
// If we have holes or a fork I am less sure what can be guaranteed about our state?
// Or what must be done to fix holes and forks?
match state_res::StateResolution::resolve(
room_id,
&RoomVersionId::Version6,
&state_sets,
Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(resolved) if resolved.values().any(|id| id == &event_id) => {
for resolved_missing in missing
.into_iter()
.rev() // We want the oldest pdu's first
.flatten()
.filter(|pdu| resolved.values().any(|res_id| res_id == &pdu.event_id))
{
let count = db.globals.next_count()?; let count = db.globals.next_count()?;
let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff); pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes()); pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_to_state(&pdu_id, &missing_pdu)?;
db.rooms.append_pdu( db.rooms.append_pdu(
&missing_pdu, &resolved_missing,
&utils::to_canonical_object(&missing_pdu) &crate::utils::to_canonical_object(&resolved_missing)
.expect("Pdu is valid canonical object"), .expect("PDU is valid canonical JSON"),
count, count,
pdu_id.into(), pdu_id.into(),
&db.globals, &db.globals,
&db.account_data, &db.account_data,
&db.admin, &db.admin,
)?; )?;
// Continue this inner for loop adding all the missing events
}
}
}
}
}
// Back to the original incoming event
// If it is a non state event we still must add it and associate a statehash with the pdu_id
if value.get("state_key").is_none() {
// TODO: Some auth needs to be done for non state events
if !db.rooms.is_joined(&pdu.sender, room_id)? {
error!("Sender is not joined {}", pdu.kind);
resolved_map.insert(event_id, Err("Sender not found in room".into()));
continue;
} }
let count = db.globals.next_count()?; let count = db.globals.next_count()?;
@ -893,7 +974,27 @@ pub async fn send_transaction_message_route<'a>(
pdu_id.push(0xff); pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes()); pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_to_state(&pdu_id, &pdu)?; // Since we know the state we force_state
// saving the incoming pdu's id with our new state
db.rooms.force_state_with_pdu(
&*pdu_id,
room_id,
resolved
.iter()
.map(|(k, v)| {
(
k.clone(),
serde_json::to_vec(
&db.rooms
.get_pdu(v)
.expect("db err")
.expect("we know of all the pdus"),
)
.expect("serde can serialize pdus"),
)
})
.collect(),
)?;
db.rooms.append_pdu( db.rooms.append_pdu(
&pdu, &pdu,
&value, &value,
@ -904,41 +1005,28 @@ pub async fn send_transaction_message_route<'a>(
&db.admin, &db.admin,
)?; )?;
resolved_map.insert(event_id, Ok::<(), String>(())); resolved_map.insert(event_id, Ok::<(), String>(()));
continue;
} }
Ok(resolved) => {
// If we have iterated through the incoming missing events sequentially we know that for resolved_missing in missing
// the original incoming event is the youngest child and so can be simply authed and append .into_iter()
// to the state .flatten()
// If we have holes or a fork I am less sure what can be guaranteed about our state? .filter(|pdu| resolved.values().any(|res_id| res_id == &pdu.event_id))
// Or what must be done to fix holes and forks? {
match state_res::StateResolution::apply_event(
room_id,
&RoomVersionId::Version6,
pdu.convert_for_state_res(), // We know this a state event
&state_snapshot,
Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(true) => {
let count = db.globals.next_count()?; let count = db.globals.next_count()?;
let mut pdu_id = room_id.as_bytes().to_vec(); let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff); pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes()); pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_to_state(&pdu_id, &pdu)?;
db.rooms.append_pdu( db.rooms.append_pdu(
&pdu, &resolved_missing,
&value, &crate::utils::to_canonical_object(&resolved_missing)
.expect("PDU is valid canonical JSON"),
count, count,
pdu_id.into(), pdu_id.into(),
&db.globals, &db.globals,
&db.account_data, &db.account_data,
&db.admin, &db.admin,
)?; )?;
resolved_map.insert(event_id, Ok::<(), String>(()));
} }
Ok(false) => {
resolved_map.insert(event_id, Err("Failed event auth".into())); resolved_map.insert(event_id, Err("Failed event auth".into()));
error!( error!(
"auth failed: {}", "auth failed: {}",
@ -950,11 +1038,69 @@ pub async fn send_transaction_message_route<'a>(
error!("{}", err); error!("{}", err);
} }
} }
// If we have iterated through the incoming missing events sequentially we know that
// the original incoming event is the youngest child and so can be simply authed and appended
// to the state
} else {
// If it is a non state event we still must add it and associate a statehash with the pdu_id
if pdu.state_key.is_none() {
// TODO: Some auth needs to be done for non state events
if !db.rooms.is_joined(&pdu.sender, room_id)? {
error!("Sender is not joined {}", pdu.kind);
resolved_map.insert(event_id, Err("Sender not found in room".into()));
continue 'outer;
}
append_state(&db, &pdu)?;
resolved_map.insert(event_id, Ok::<(), String>(()));
} else {
let incoming = pdu.convert_for_state_res();
match state_res::StateResolution::apply_event(
room_id,
&RoomVersionId::Version6,
incoming,
&current_room_state,
Some(accum_event_map), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(true) => {
append_state(&db, &pdu)?;
resolved_map.insert(event_id, Ok::<(), String>(()));
}
Ok(false) => {
resolved_map.insert(event_id, Err("Failed event auth".to_string()));
error!("Failed sequential event auth for incoming");
}
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));
error!("{}", err);
}
}
}
}
} }
Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into())
} }
fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> {
let count = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_to_state(&pdu_id, pdu)?;
db.rooms.append_pdu(
pdu,
&utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
count,
pdu_id.clone().into(),
&db.globals,
&db.account_data,
&db.admin,
)
}
#[cfg_attr( #[cfg_attr(
feature = "conduit_bin", feature = "conduit_bin",
post("/_matrix/federation/v1/get_missing_events/<_>", data = "<body>") post("/_matrix/federation/v1/get_missing_events/<_>", data = "<body>")

Loading…
Cancel
Save