From 558905bba487361ddd904788e7383db4ca9ebdda Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Mon, 14 Dec 2020 16:15:22 -0500 Subject: [PATCH] WIP: use resolve_incoming from state-res crate --- src/server_server.rs | 64 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index c4478d3..144ea0a 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -387,13 +387,25 @@ pub async fn get_public_rooms_route( .into()) } -#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)] -pub enum PrevEvents { +#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum AuthEvents { Sequential(T), Fork(Vec), } -impl PrevEvents { +impl IntoIterator for AuthEvents { + type Item = T; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + match self { + Self::Sequential(item) => vec![item].into_iter(), + Self::Fork(list) => list.into_iter(), + } + } +} + +impl AuthEvents { pub fn new(id: &[T]) -> Self { match id { [] => panic!("All events must have previous event"), @@ -490,15 +502,15 @@ pub async fn send_transaction_message_route<'a>( let mut missing = vec![]; let mut seen = state_res::StateMap::new(); - let mut prev_ids = vec![PrevEvents::new(&pdu.prev_events)]; + let mut prev_ids = vec![AuthEvents::new(&pdu.prev_events)]; // This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue // in the case of a failed request to the server that sent the event 'inner: loop { - // TODO: if this is ever more that 1 at a time we must do actual + // TODO: if this is ever more than 1 at a time we must do actual // full state resolution not just auth match prev_ids.pop() { - Some(PrevEvents::Sequential(id)) => match db + Some(AuthEvents::Sequential(id)) => match db .rooms .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? { @@ -528,8 +540,8 @@ pub async fn send_transaction_message_route<'a>( // TODO: do we need this assert_eq!(room_id, &prev_pdu.room_id); - prev_ids.push(PrevEvents::new(&prev_pdu.prev_events)); - missing.push(PrevEvents::Sequential(prev_pdu)); + prev_ids.push(AuthEvents::new(&prev_pdu.prev_events)); + missing.push(AuthEvents::Sequential(prev_pdu)); } // We can't hard fail because there are some valid errors, just // keep checking PDU's @@ -544,7 +556,7 @@ pub async fn send_transaction_message_route<'a>( }; } }, - Some(PrevEvents::Fork(ids)) => { + Some(AuthEvents::Fork(ids)) => { error!( "prev_events > 1: {}", serde_json::to_string_pretty(&pdu).unwrap() @@ -609,10 +621,10 @@ pub async fn send_transaction_message_route<'a>( }, } } - prev_ids.push(PrevEvents::new( + prev_ids.push(AuthEvents::new( &prev_fork_ids.into_iter().collect::>(), )); - missing.push(PrevEvents::new(&missing_fork)); + missing.push(AuthEvents::new(&missing_fork)); } // All done finding missing events None => { @@ -630,6 +642,32 @@ pub async fn send_transaction_message_route<'a>( .iter() .map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) .collect::>>(); + + if !missing.is_empty() { + // This is the state at incoming pdu + state_snapshot = match state_res::StateResolution::resolve_incoming( + room_id, + &RoomVersionId::Version6, + &state_snapshot, + missing + .iter() + .cloned() + .flatten() + .filter(|pdu| pdu.state_key.is_some()) // remove non state events + .map(|pdu| ((pdu.kind, pdu.state_key.unwrap()), pdu.event_id)) + .collect(), + Some(accum_event_map), + &db.rooms, + ) { + Ok(res) => res, + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); + continue; + } + } + } + // TODO: this only accounts for sequentially missing events no holes will be filled // and I'm still not sure what happens when a fork introduces multiple `prev_events` // @@ -637,7 +675,7 @@ pub async fn send_transaction_message_route<'a>( // prev_event of the incoming event so we reverse the order oldest -> most recent for missing_pdu in missing.into_iter().rev() { match missing_pdu { - PrevEvents::Sequential(missing_pdu) => { + AuthEvents::Sequential(missing_pdu) => { // For state events if missing_pdu.state_key.is_some() { let missing_pdu = missing_pdu.convert_for_state_res(); @@ -751,7 +789,7 @@ pub async fn send_transaction_message_route<'a>( // Continue this inner for loop adding all the missing events } } - PrevEvents::Fork(pdus) => { + AuthEvents::Fork(pdus) => { for missing_pdu in pdus.into_iter().rev() { // For state events if missing_pdu.state_key.is_some() {