Browse Source

Wip work to correctly resolve incoming PDUs

merge-requests/10/head
Devin Ragotzy 5 years ago
parent
commit
2864a7097f
  1. 2
      src/database/rooms.rs
  2. 103
      src/server_server.rs

2
src/database/rooms.rs

@ -205,7 +205,7 @@ impl Rooms { @@ -205,7 +205,7 @@ impl Rooms {
})
}
/// Returns the last state hash key added to the db.
/// Returns the state hash key for the given pdu.
pub fn pdu_state_hash(&self, pdu_id: &[u8]) -> Result<Option<StateHashId>> {
Ok(self.pduid_statehash.get(pdu_id)?)
}

103
src/server_server.rs

@ -466,6 +466,68 @@ pub async fn send_transaction_message_route<'a>( @@ -466,6 +466,68 @@ pub async fn send_transaction_message_route<'a>(
continue;
}
// The events that must be resolved to catch up to the incoming event
let mut missing = BTreeMap::new();
let mut seen = BTreeMap::new();
let mut prev_ids = pdu.prev_events.to_vec();
// Don't kill our server with state-res
if prev_ids.len() > 20 {
resolved_map.insert(
event_id,
Err("Event has abnormally large prev_events count".into()),
);
continue;
}
while let Some(id) = prev_ids.pop() {
match db
.rooms
.pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())?
{
Some(state_hash) => {
seen = db.rooms.state_full(&state_hash)?;
break;
}
None => {
// TODO: as soon as https://github.com/ruma/ruma/pull/364 is accepted
// use `::get_event::v1::Request...`
let last_event = vec![id.clone()];
let mut req =
ruma::api::federation::event::get_missing_events::v1::Request::new(
room_id,
&[],
&last_event,
);
req.limit = ruma::uint!(1);
// We have a state event so we need info for state-res
let get_event = match send_request(&db.globals, body.body.origin.clone(), req)
.await
{
Ok(res) => {
for ev in &res.events {
let (id, val) = crate::pdu::process_incoming_pdu(ev);
let pdu = state_res::StateEvent::from_id_canon_obj(id.clone(), val)
.expect("Pdu is a valid StateEvent");
prev_ids.extend(pdu.prev_event_ids());
missing.insert(id, pdu);
}
}
// We can't hard fail because there are some valid errors, just
// keep checking PDU's
//
// As an example a possible error
// {"errcode":"M_FORBIDDEN","error":"Host not in room."}
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));
continue;
}
};
}
}
}
// If it is not a state event, we can skip state-res... maybe
if value.get("state_key").is_none() {
if !db.rooms.is_joined(&pdu.sender, room_id)? {
@ -480,7 +542,6 @@ pub async fn send_transaction_message_route<'a>( @@ -480,7 +542,6 @@ pub async fn send_transaction_message_route<'a>(
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_to_state(&pdu_id, &pdu)?;
db.rooms.append_pdu(
&pdu,
&value,
@ -521,7 +582,7 @@ pub async fn send_transaction_message_route<'a>( @@ -521,7 +582,7 @@ pub async fn send_transaction_message_route<'a>(
let their_current_state = get_state_response
.pdus
.iter()
.chain(get_state_response.auth_chain.iter()) // add auth events
// .chain(get_state_response.auth_chain.iter()) // add auth events
.map(|pdu| {
let (event_id, json) = crate::pdu::process_incoming_pdu(pdu);
(
@ -535,6 +596,15 @@ pub async fn send_transaction_message_route<'a>( @@ -535,6 +596,15 @@ pub async fn send_transaction_message_route<'a>(
),
)
})
// Add the incoming event to their state this will ensure it is within the
// resolved state if indeed it passes state-res
.chain(Some((
event_id.clone(),
Arc::new(
state_res::StateEvent::from_id_canon_obj(event_id.clone(), value.clone())
.expect("valid pdu json"),
),
)))
.collect::<BTreeMap<_, _>>();
let our_current_state = db.rooms.room_state_full(room_id)?;
@ -554,6 +624,18 @@ pub async fn send_transaction_message_route<'a>( @@ -554,6 +624,18 @@ pub async fn send_transaction_message_route<'a>(
their_current_state
.iter()
.map(|(_id, v)| ((v.kind(), v.state_key()), v.event_id()))
// We must ensure that our incoming event is part of state-res and not
// accidentally removed from the BTree because of being sorted first by
// event_id
.chain(Some((
(
pdu.kind.clone(),
pdu.state_key
.clone()
.expect("Found state event without state_key"),
),
pdu.event_id.clone(),
)))
.collect::<BTreeMap<_, _>>(),
],
Some(
@ -570,6 +652,14 @@ pub async fn send_transaction_message_route<'a>( @@ -570,6 +652,14 @@ pub async fn send_transaction_message_route<'a>(
&db.rooms,
) {
Ok(resolved) if resolved.values().any(|id| &event_id == id) => {
let res_state = resolved.iter().map(|(k, v)| {
Ok::<_, Error>((
k.clone(),
db.rooms
.get_pdu(v)?
.ok_or_else(|| Error::bad_database("Pdu with eventId not found"))?,
))
});
// If the event is older than the last event in pduid_pdu Tree then find the
// closest ancestor we know of and insert after the known ancestor by
// altering the known events pduid to = same roomID + same count bytes + 0x1
@ -585,6 +675,8 @@ pub async fn send_transaction_message_route<'a>( @@ -585,6 +675,8 @@ pub async fn send_transaction_message_route<'a>(
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_to_state(&pdu_id, &pdu)?;
db.rooms.append_pdu(
&pdu,
&value,
@ -604,6 +696,8 @@ pub async fn send_transaction_message_route<'a>( @@ -604,6 +696,8 @@ pub async fn send_transaction_message_route<'a>(
// the pdu appended after
pdu_id.push(1);
db.rooms.append_to_state(&pdu_id, &pdu)?;
db.rooms.append_pdu(
&pdu,
&value,
@ -623,7 +717,8 @@ pub async fn send_transaction_message_route<'a>( @@ -623,7 +717,8 @@ pub async fn send_transaction_message_route<'a>(
resolved_map.insert(event_id, Ok::<(), String>(()));
}
// If the eventId is not found in the resolved state auth has failed
Ok(_) => {
Ok(res) => {
dbg!(res);
resolved_map.insert(
event_id,
Err("This event failed authentication, not found in resolved set".into()),
@ -635,7 +730,7 @@ pub async fn send_transaction_message_route<'a>( @@ -635,7 +730,7 @@ pub async fn send_transaction_message_route<'a>(
};
}
Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into())
Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into())
}
#[cfg_attr(

Loading…
Cancel
Save