From 2864a7097f571c3dfa62f2e8fbcc47c63d9b2e05 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Mon, 7 Dec 2020 10:32:09 -0500 Subject: [PATCH] Wip work to correctly resolve incoming PDUs --- src/database/rooms.rs | 2 +- src/server_server.rs | 103 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/src/database/rooms.rs b/src/database/rooms.rs index fb139a6..5080594 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -205,7 +205,7 @@ impl Rooms { }) } - /// Returns the last state hash key added to the db. + /// Returns the state hash key for the given pdu. pub fn pdu_state_hash(&self, pdu_id: &[u8]) -> Result> { Ok(self.pduid_statehash.get(pdu_id)?) } diff --git a/src/server_server.rs b/src/server_server.rs index da046d3..0f0c1a4 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -466,6 +466,68 @@ pub async fn send_transaction_message_route<'a>( continue; } + // The events that must be resolved to catch up to the incoming event + let mut missing = BTreeMap::new(); + let mut seen = BTreeMap::new(); + + let mut prev_ids = pdu.prev_events.to_vec(); + // Don't kill our server with state-res + if prev_ids.len() > 20 { + resolved_map.insert( + event_id, + Err("Event has abnormally large prev_events count".into()), + ); + continue; + } + + while let Some(id) = prev_ids.pop() { + match db + .rooms + .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? + { + Some(state_hash) => { + seen = db.rooms.state_full(&state_hash)?; + break; + } + None => { + // TODO: as soon as https://github.com/ruma/ruma/pull/364 is accepted + // use `::get_event::v1::Request...` + let last_event = vec![id.clone()]; + let mut req = + ruma::api::federation::event::get_missing_events::v1::Request::new( + room_id, + &[], + &last_event, + ); + req.limit = ruma::uint!(1); + // We have a state event so we need info for state-res + let get_event = match send_request(&db.globals, body.body.origin.clone(), req) + .await + { + Ok(res) => { + for ev in &res.events { + let (id, val) = crate::pdu::process_incoming_pdu(ev); + let pdu = state_res::StateEvent::from_id_canon_obj(id.clone(), val) + .expect("Pdu is a valid StateEvent"); + + prev_ids.extend(pdu.prev_event_ids()); + missing.insert(id, pdu); + } + } + // We can't hard fail because there are some valid errors, just + // keep checking PDU's + // + // As an example a possible error + // {"errcode":"M_FORBIDDEN","error":"Host not in room."} + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + continue; + } + }; + } + } + } + // If it is not a state event, we can skip state-res... maybe if value.get("state_key").is_none() { if !db.rooms.is_joined(&pdu.sender, room_id)? { @@ -480,7 +542,6 @@ pub async fn send_transaction_message_route<'a>( pdu_id.extend_from_slice(&count.to_be_bytes()); db.rooms.append_to_state(&pdu_id, &pdu)?; - db.rooms.append_pdu( &pdu, &value, @@ -521,7 +582,7 @@ pub async fn send_transaction_message_route<'a>( let their_current_state = get_state_response .pdus .iter() - .chain(get_state_response.auth_chain.iter()) // add auth events + // .chain(get_state_response.auth_chain.iter()) // add auth events .map(|pdu| { let (event_id, json) = crate::pdu::process_incoming_pdu(pdu); ( @@ -535,6 +596,15 @@ pub async fn send_transaction_message_route<'a>( ), ) }) + // Add the incoming event to their state this will ensure it is within the + // resolved state if indeed it passes state-res + .chain(Some(( + event_id.clone(), + Arc::new( + state_res::StateEvent::from_id_canon_obj(event_id.clone(), value.clone()) + .expect("valid pdu json"), + ), + ))) .collect::>(); let our_current_state = db.rooms.room_state_full(room_id)?; @@ -554,6 +624,18 @@ pub async fn send_transaction_message_route<'a>( their_current_state .iter() .map(|(_id, v)| ((v.kind(), v.state_key()), v.event_id())) + // We must ensure that our incoming event is part of state-res and not + // accidentally removed from the BTree because of being sorted first by + // event_id + .chain(Some(( + ( + pdu.kind.clone(), + pdu.state_key + .clone() + .expect("Found state event without state_key"), + ), + pdu.event_id.clone(), + ))) .collect::>(), ], Some( @@ -570,6 +652,14 @@ pub async fn send_transaction_message_route<'a>( &db.rooms, ) { Ok(resolved) if resolved.values().any(|id| &event_id == id) => { + let res_state = resolved.iter().map(|(k, v)| { + Ok::<_, Error>(( + k.clone(), + db.rooms + .get_pdu(v)? + .ok_or_else(|| Error::bad_database("Pdu with eventId not found"))?, + )) + }); // If the event is older than the last event in pduid_pdu Tree then find the // closest ancestor we know of and insert after the known ancestor by // altering the known events pduid to = same roomID + same count bytes + 0x1 @@ -585,6 +675,8 @@ pub async fn send_transaction_message_route<'a>( pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); + db.rooms.append_to_state(&pdu_id, &pdu)?; + db.rooms.append_pdu( &pdu, &value, @@ -604,6 +696,8 @@ pub async fn send_transaction_message_route<'a>( // the pdu appended after pdu_id.push(1); + db.rooms.append_to_state(&pdu_id, &pdu)?; + db.rooms.append_pdu( &pdu, &value, @@ -623,7 +717,8 @@ pub async fn send_transaction_message_route<'a>( resolved_map.insert(event_id, Ok::<(), String>(())); } // If the eventId is not found in the resolved state auth has failed - Ok(_) => { + Ok(res) => { + dbg!(res); resolved_map.insert( event_id, Err("This event failed authentication, not found in resolved set".into()), @@ -635,7 +730,7 @@ pub async fn send_transaction_message_route<'a>( }; } - Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) + Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) } #[cfg_attr(