|
|
|
@ -387,13 +387,25 @@ pub async fn get_public_rooms_route( |
|
|
|
.into()) |
|
|
|
.into()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)] |
|
|
|
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] |
|
|
|
pub enum PrevEvents<T> { |
|
|
|
pub enum AuthEvents<T> { |
|
|
|
Sequential(T), |
|
|
|
Sequential(T), |
|
|
|
Fork(Vec<T>), |
|
|
|
Fork(Vec<T>), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl<T: Clone> PrevEvents<T> { |
|
|
|
impl<T> IntoIterator for AuthEvents<T> { |
|
|
|
|
|
|
|
type Item = T; |
|
|
|
|
|
|
|
type IntoIter = std::vec::IntoIter<Self::Item>; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fn into_iter(self) -> Self::IntoIter { |
|
|
|
|
|
|
|
match self { |
|
|
|
|
|
|
|
Self::Sequential(item) => vec![item].into_iter(), |
|
|
|
|
|
|
|
Self::Fork(list) => list.into_iter(), |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
impl<T: Clone> AuthEvents<T> { |
|
|
|
pub fn new(id: &[T]) -> Self { |
|
|
|
pub fn new(id: &[T]) -> Self { |
|
|
|
match id { |
|
|
|
match id { |
|
|
|
[] => panic!("All events must have previous event"), |
|
|
|
[] => panic!("All events must have previous event"), |
|
|
|
@ -490,15 +502,15 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
let mut missing = vec![]; |
|
|
|
let mut missing = vec![]; |
|
|
|
let mut seen = state_res::StateMap::new(); |
|
|
|
let mut seen = state_res::StateMap::new(); |
|
|
|
|
|
|
|
|
|
|
|
let mut prev_ids = vec![PrevEvents::new(&pdu.prev_events)]; |
|
|
|
let mut prev_ids = vec![AuthEvents::new(&pdu.prev_events)]; |
|
|
|
|
|
|
|
|
|
|
|
// This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue
|
|
|
|
// This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue
|
|
|
|
// in the case of a failed request to the server that sent the event
|
|
|
|
// in the case of a failed request to the server that sent the event
|
|
|
|
'inner: loop { |
|
|
|
'inner: loop { |
|
|
|
// TODO: if this is ever more that 1 at a time we must do actual
|
|
|
|
// TODO: if this is ever more than 1 at a time we must do actual
|
|
|
|
// full state resolution not just auth
|
|
|
|
// full state resolution not just auth
|
|
|
|
match prev_ids.pop() { |
|
|
|
match prev_ids.pop() { |
|
|
|
Some(PrevEvents::Sequential(id)) => match db |
|
|
|
Some(AuthEvents::Sequential(id)) => match db |
|
|
|
.rooms |
|
|
|
.rooms |
|
|
|
.pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? |
|
|
|
.pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? |
|
|
|
{ |
|
|
|
{ |
|
|
|
@ -528,8 +540,8 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
// TODO: do we need this
|
|
|
|
// TODO: do we need this
|
|
|
|
assert_eq!(room_id, &prev_pdu.room_id); |
|
|
|
assert_eq!(room_id, &prev_pdu.room_id); |
|
|
|
|
|
|
|
|
|
|
|
prev_ids.push(PrevEvents::new(&prev_pdu.prev_events)); |
|
|
|
prev_ids.push(AuthEvents::new(&prev_pdu.prev_events)); |
|
|
|
missing.push(PrevEvents::Sequential(prev_pdu)); |
|
|
|
missing.push(AuthEvents::Sequential(prev_pdu)); |
|
|
|
} |
|
|
|
} |
|
|
|
// We can't hard fail because there are some valid errors, just
|
|
|
|
// We can't hard fail because there are some valid errors, just
|
|
|
|
// keep checking PDU's
|
|
|
|
// keep checking PDU's
|
|
|
|
@ -544,7 +556,7 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
}; |
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
}, |
|
|
|
}, |
|
|
|
Some(PrevEvents::Fork(ids)) => { |
|
|
|
Some(AuthEvents::Fork(ids)) => { |
|
|
|
error!( |
|
|
|
error!( |
|
|
|
"prev_events > 1: {}", |
|
|
|
"prev_events > 1: {}", |
|
|
|
serde_json::to_string_pretty(&pdu).unwrap() |
|
|
|
serde_json::to_string_pretty(&pdu).unwrap() |
|
|
|
@ -609,10 +621,10 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
}, |
|
|
|
}, |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
prev_ids.push(PrevEvents::new( |
|
|
|
prev_ids.push(AuthEvents::new( |
|
|
|
&prev_fork_ids.into_iter().collect::<Vec<_>>(), |
|
|
|
&prev_fork_ids.into_iter().collect::<Vec<_>>(), |
|
|
|
)); |
|
|
|
)); |
|
|
|
missing.push(PrevEvents::new(&missing_fork)); |
|
|
|
missing.push(AuthEvents::new(&missing_fork)); |
|
|
|
} |
|
|
|
} |
|
|
|
// All done finding missing events
|
|
|
|
// All done finding missing events
|
|
|
|
None => { |
|
|
|
None => { |
|
|
|
@ -630,6 +642,32 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
.iter() |
|
|
|
.iter() |
|
|
|
.map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) |
|
|
|
.map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) |
|
|
|
.collect::<BTreeMap<_, Arc<state_res::StateEvent>>>(); |
|
|
|
.collect::<BTreeMap<_, Arc<state_res::StateEvent>>>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if !missing.is_empty() { |
|
|
|
|
|
|
|
// This is the state at incoming pdu
|
|
|
|
|
|
|
|
state_snapshot = match state_res::StateResolution::resolve_incoming( |
|
|
|
|
|
|
|
room_id, |
|
|
|
|
|
|
|
&RoomVersionId::Version6, |
|
|
|
|
|
|
|
&state_snapshot, |
|
|
|
|
|
|
|
missing |
|
|
|
|
|
|
|
.iter() |
|
|
|
|
|
|
|
.cloned() |
|
|
|
|
|
|
|
.flatten() |
|
|
|
|
|
|
|
.filter(|pdu| pdu.state_key.is_some()) // remove non state events
|
|
|
|
|
|
|
|
.map(|pdu| ((pdu.kind, pdu.state_key.unwrap()), pdu.event_id)) |
|
|
|
|
|
|
|
.collect(), |
|
|
|
|
|
|
|
Some(accum_event_map), |
|
|
|
|
|
|
|
&db.rooms, |
|
|
|
|
|
|
|
) { |
|
|
|
|
|
|
|
Ok(res) => res, |
|
|
|
|
|
|
|
Err(err) => { |
|
|
|
|
|
|
|
resolved_map.insert(event_id, Err(err.to_string())); |
|
|
|
|
|
|
|
error!("{}", err); |
|
|
|
|
|
|
|
continue; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// TODO: this only accounts for sequentially missing events no holes will be filled
|
|
|
|
// TODO: this only accounts for sequentially missing events no holes will be filled
|
|
|
|
// and I'm still not sure what happens when a fork introduces multiple `prev_events`
|
|
|
|
// and I'm still not sure what happens when a fork introduces multiple `prev_events`
|
|
|
|
//
|
|
|
|
//
|
|
|
|
@ -637,7 +675,7 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
// prev_event of the incoming event so we reverse the order oldest -> most recent
|
|
|
|
// prev_event of the incoming event so we reverse the order oldest -> most recent
|
|
|
|
for missing_pdu in missing.into_iter().rev() { |
|
|
|
for missing_pdu in missing.into_iter().rev() { |
|
|
|
match missing_pdu { |
|
|
|
match missing_pdu { |
|
|
|
PrevEvents::Sequential(missing_pdu) => { |
|
|
|
AuthEvents::Sequential(missing_pdu) => { |
|
|
|
// For state events
|
|
|
|
// For state events
|
|
|
|
if missing_pdu.state_key.is_some() { |
|
|
|
if missing_pdu.state_key.is_some() { |
|
|
|
let missing_pdu = missing_pdu.convert_for_state_res(); |
|
|
|
let missing_pdu = missing_pdu.convert_for_state_res(); |
|
|
|
@ -751,7 +789,7 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
// Continue this inner for loop adding all the missing events
|
|
|
|
// Continue this inner for loop adding all the missing events
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
PrevEvents::Fork(pdus) => { |
|
|
|
AuthEvents::Fork(pdus) => { |
|
|
|
for missing_pdu in pdus.into_iter().rev() { |
|
|
|
for missing_pdu in pdus.into_iter().rev() { |
|
|
|
// For state events
|
|
|
|
// For state events
|
|
|
|
if missing_pdu.state_key.is_some() { |
|
|
|
if missing_pdu.state_key.is_some() { |
|
|
|
|