|
|
|
|
@ -867,17 +867,19 @@ pub async fn handle_incoming_pdu<'a>(
@@ -867,17 +867,19 @@ pub async fn handle_incoming_pdu<'a>(
|
|
|
|
|
.map_err(|_| "Failed to ask database for event.".to_owned())? |
|
|
|
|
.ok_or_else(|| "Failed to find create event in db.".to_owned())?; |
|
|
|
|
|
|
|
|
|
let (incoming_pdu, val) = handle_outlier_pdu(origin, &create_event, event_id, room_id, value, db, pub_key_map).await?; |
|
|
|
|
let (incoming_pdu, val) = handle_outlier_pdu( |
|
|
|
|
origin, |
|
|
|
|
&create_event, |
|
|
|
|
event_id, |
|
|
|
|
room_id, |
|
|
|
|
value, |
|
|
|
|
db, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
|
|
|
|
|
// 8. if not timeline event: stop
|
|
|
|
|
if !is_timeline_event |
|
|
|
|
|| incoming_pdu.origin_server_ts |
|
|
|
|
< db.rooms |
|
|
|
|
.first_pdu_in_room(&room_id) |
|
|
|
|
.map_err(|_| "Error loading first room event.".to_owned())? |
|
|
|
|
.expect("Room exists") |
|
|
|
|
.origin_server_ts |
|
|
|
|
{ |
|
|
|
|
if !is_timeline_event { |
|
|
|
|
return Ok(None); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -893,16 +895,45 @@ pub async fn handle_incoming_pdu<'a>(
@@ -893,16 +895,45 @@ pub async fn handle_incoming_pdu<'a>(
|
|
|
|
|
&room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await.pop() { |
|
|
|
|
todo_timeline_stack.push((pdu, json)); |
|
|
|
|
.await |
|
|
|
|
.pop() |
|
|
|
|
{ |
|
|
|
|
if incoming_pdu.origin_server_ts |
|
|
|
|
> db.rooms |
|
|
|
|
.first_pdu_in_room(&room_id) |
|
|
|
|
.map_err(|_| "Error loading first room event.".to_owned())? |
|
|
|
|
.expect("Room exists") |
|
|
|
|
.origin_server_ts |
|
|
|
|
{ |
|
|
|
|
todo_outlier_stack.extend(pdu.prev_events.iter().cloned()); |
|
|
|
|
todo_timeline_stack.push((pdu, json)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
while let Some(prev) = todo_timeline_stack.pop() { |
|
|
|
|
upgrade_outlier_to_timeline_pdu(prev.0, prev.1, &create_event, origin, db, room_id, pub_key_map).await?; |
|
|
|
|
upgrade_outlier_to_timeline_pdu( |
|
|
|
|
prev.0, |
|
|
|
|
prev.1, |
|
|
|
|
&create_event, |
|
|
|
|
origin, |
|
|
|
|
db, |
|
|
|
|
room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, db, room_id, pub_key_map).await |
|
|
|
|
upgrade_outlier_to_timeline_pdu( |
|
|
|
|
incoming_pdu, |
|
|
|
|
val, |
|
|
|
|
&create_event, |
|
|
|
|
origin, |
|
|
|
|
db, |
|
|
|
|
room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn handle_outlier_pdu<'a>( |
|
|
|
|
@ -913,7 +944,8 @@ fn handle_outlier_pdu<'a>(
@@ -913,7 +944,8 @@ fn handle_outlier_pdu<'a>(
|
|
|
|
|
value: BTreeMap<String, CanonicalJsonValue>, |
|
|
|
|
db: &'a Database, |
|
|
|
|
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
|
) -> AsyncRecursiveType<'a, StdResult<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> { |
|
|
|
|
) -> AsyncRecursiveType<'a, StdResult<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> |
|
|
|
|
{ |
|
|
|
|
Box::pin(async move { |
|
|
|
|
let start_time = Instant::now(); |
|
|
|
|
|
|
|
|
|
@ -928,11 +960,11 @@ fn handle_outlier_pdu<'a>(
@@ -928,11 +960,11 @@ fn handle_outlier_pdu<'a>(
|
|
|
|
|
// 2. Check signatures, otherwise drop
|
|
|
|
|
// 3. check content hash, redact if doesn't match
|
|
|
|
|
|
|
|
|
|
let create_event_content = |
|
|
|
|
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) |
|
|
|
|
.expect("Raw::from_value always works.") |
|
|
|
|
.deserialize() |
|
|
|
|
.map_err(|_| "Invalid PowerLevels event in db.".to_owned())?; |
|
|
|
|
let create_event_content = |
|
|
|
|
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) |
|
|
|
|
.expect("Raw::from_value always works.") |
|
|
|
|
.deserialize() |
|
|
|
|
.map_err(|_| "Invalid PowerLevels event in db.".to_owned())?; |
|
|
|
|
|
|
|
|
|
let room_version_id = &create_event_content.room_version; |
|
|
|
|
let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); |
|
|
|
|
@ -1062,9 +1094,8 @@ fn handle_outlier_pdu<'a>(
@@ -1062,9 +1094,8 @@ fn handle_outlier_pdu<'a>(
|
|
|
|
|
.map_err(|_| "Failed to add pdu as outlier.".to_owned())?; |
|
|
|
|
debug!("Added pdu as outlier."); |
|
|
|
|
|
|
|
|
|
Ok((incoming_pdu,val)) |
|
|
|
|
Ok((incoming_pdu, val)) |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
|
@ -1076,381 +1107,385 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1076,381 +1107,385 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
room_id: &RoomId, |
|
|
|
|
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
|
) -> StdResult<Option<Vec<u8>>, String> { |
|
|
|
|
// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
|
|
|
|
|
// doing all the checks in this list starting at 1. These are not timeline events.
|
|
|
|
|
|
|
|
|
|
// TODO: if we know the prev_events of the incoming event we can avoid the request and build
|
|
|
|
|
// the state from a known point and resolve if > 1 prev_event
|
|
|
|
|
// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
|
|
|
|
|
// doing all the checks in this list starting at 1. These are not timeline events.
|
|
|
|
|
|
|
|
|
|
debug!("Requesting state at event."); |
|
|
|
|
let mut state_at_incoming_event = None; |
|
|
|
|
|
|
|
|
|
if incoming_pdu.prev_events.len() == 1 { |
|
|
|
|
let prev_event = &incoming_pdu.prev_events[0]; |
|
|
|
|
let prev_event_sstatehash = db |
|
|
|
|
.rooms |
|
|
|
|
.pdu_shortstatehash(prev_event) |
|
|
|
|
.map_err(|_| "Failed talking to db".to_owned())?; |
|
|
|
|
// TODO: if we know the prev_events of the incoming event we can avoid the request and build
|
|
|
|
|
// the state from a known point and resolve if > 1 prev_event
|
|
|
|
|
|
|
|
|
|
let state = |
|
|
|
|
prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash)); |
|
|
|
|
debug!("Requesting state at event."); |
|
|
|
|
let mut state_at_incoming_event = None; |
|
|
|
|
|
|
|
|
|
if let Some(Ok(state)) = state { |
|
|
|
|
warn!("Using cached state"); |
|
|
|
|
let mut state = fetch_and_handle_outliers( |
|
|
|
|
db, |
|
|
|
|
origin, |
|
|
|
|
&state.into_iter().collect::<Vec<_>>(), |
|
|
|
|
&create_event, |
|
|
|
|
&room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|(pdu,_)| { |
|
|
|
|
if incoming_pdu.prev_events.len() == 1 { |
|
|
|
|
let prev_event = &incoming_pdu.prev_events[0]; |
|
|
|
|
let prev_event_sstatehash = db |
|
|
|
|
.rooms |
|
|
|
|
.pdu_shortstatehash(prev_event) |
|
|
|
|
.map_err(|_| "Failed talking to db".to_owned())?; |
|
|
|
|
|
|
|
|
|
let state = |
|
|
|
|
prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash)); |
|
|
|
|
|
|
|
|
|
if let Some(Ok(state)) = state { |
|
|
|
|
warn!("Using cached state"); |
|
|
|
|
let mut state = fetch_and_handle_outliers( |
|
|
|
|
db, |
|
|
|
|
origin, |
|
|
|
|
&state.into_iter().collect::<Vec<_>>(), |
|
|
|
|
&create_event, |
|
|
|
|
&room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|(pdu, _)| { |
|
|
|
|
( |
|
|
|
|
( |
|
|
|
|
( |
|
|
|
|
pdu.kind.clone(), |
|
|
|
|
pdu.state_key |
|
|
|
|
.clone() |
|
|
|
|
.expect("events from state_full_ids are state events"), |
|
|
|
|
), |
|
|
|
|
pdu, |
|
|
|
|
) |
|
|
|
|
}) |
|
|
|
|
.collect::<HashMap<_, _>>(); |
|
|
|
|
pdu.kind.clone(), |
|
|
|
|
pdu.state_key |
|
|
|
|
.clone() |
|
|
|
|
.expect("events from state_full_ids are state events"), |
|
|
|
|
), |
|
|
|
|
pdu, |
|
|
|
|
) |
|
|
|
|
}) |
|
|
|
|
.collect::<HashMap<_, _>>(); |
|
|
|
|
|
|
|
|
|
let prev_pdu = db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { |
|
|
|
|
let prev_pdu = |
|
|
|
|
db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { |
|
|
|
|
"Could not find prev event, but we know the state.".to_owned() |
|
|
|
|
})?; |
|
|
|
|
|
|
|
|
|
if let Some(state_key) = &prev_pdu.state_key { |
|
|
|
|
state.insert((prev_pdu.kind.clone(), state_key.clone()), prev_pdu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
state_at_incoming_event = Some(state); |
|
|
|
|
if let Some(state_key) = &prev_pdu.state_key { |
|
|
|
|
state.insert((prev_pdu.kind.clone(), state_key.clone()), prev_pdu); |
|
|
|
|
} |
|
|
|
|
// TODO: set incoming_auth_events?
|
|
|
|
|
|
|
|
|
|
state_at_incoming_event = Some(state); |
|
|
|
|
} |
|
|
|
|
// TODO: set incoming_auth_events?
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if state_at_incoming_event.is_none() { |
|
|
|
|
warn!("Calling /state_ids"); |
|
|
|
|
// Call /state_ids to find out what the state at this pdu is. We trust the server's
|
|
|
|
|
// response to some extend, but we still do a lot of checks on the events
|
|
|
|
|
match db |
|
|
|
|
.sending |
|
|
|
|
.send_federation_request( |
|
|
|
|
&db.globals, |
|
|
|
|
if state_at_incoming_event.is_none() { |
|
|
|
|
warn!("Calling /state_ids"); |
|
|
|
|
// Call /state_ids to find out what the state at this pdu is. We trust the server's
|
|
|
|
|
// response to some extend, but we still do a lot of checks on the events
|
|
|
|
|
match db |
|
|
|
|
.sending |
|
|
|
|
.send_federation_request( |
|
|
|
|
&db.globals, |
|
|
|
|
origin, |
|
|
|
|
get_room_state_ids::v1::Request { |
|
|
|
|
room_id: &room_id, |
|
|
|
|
event_id: &incoming_pdu.event_id, |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok(res) => { |
|
|
|
|
debug!("Fetching state events at event."); |
|
|
|
|
let state_vec = fetch_and_handle_outliers( |
|
|
|
|
&db, |
|
|
|
|
origin, |
|
|
|
|
get_room_state_ids::v1::Request { |
|
|
|
|
room_id: &room_id, |
|
|
|
|
event_id: &incoming_pdu.event_id, |
|
|
|
|
}, |
|
|
|
|
&res.pdu_ids, |
|
|
|
|
&create_event, |
|
|
|
|
&room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok(res) => { |
|
|
|
|
debug!("Fetching state events at event."); |
|
|
|
|
let state_vec = fetch_and_handle_outliers( |
|
|
|
|
&db, |
|
|
|
|
origin, |
|
|
|
|
&res.pdu_ids, |
|
|
|
|
&create_event, |
|
|
|
|
&room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
let mut state = HashMap::new(); |
|
|
|
|
for (pdu, _) in state_vec { |
|
|
|
|
match state.entry((pdu.kind.clone(), pdu.state_key.clone().ok_or_else(|| "Found non-state pdu in state events.".to_owned())?)) { |
|
|
|
|
Entry::Vacant(v) => { |
|
|
|
|
v.insert(pdu); |
|
|
|
|
} |
|
|
|
|
Entry::Occupied(_) => { |
|
|
|
|
return Err( |
|
|
|
|
"State event's type and state_key combination exists multiple times.".to_owned(), |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
let mut state = HashMap::new(); |
|
|
|
|
for (pdu, _) in state_vec { |
|
|
|
|
match state.entry(( |
|
|
|
|
pdu.kind.clone(), |
|
|
|
|
pdu.state_key |
|
|
|
|
.clone() |
|
|
|
|
.ok_or_else(|| "Found non-state pdu in state events.".to_owned())?, |
|
|
|
|
)) { |
|
|
|
|
Entry::Vacant(v) => { |
|
|
|
|
v.insert(pdu); |
|
|
|
|
} |
|
|
|
|
Entry::Occupied(_) => return Err( |
|
|
|
|
"State event's type and state_key combination exists multiple times." |
|
|
|
|
.to_owned(), |
|
|
|
|
), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// The original create event must still be in the state
|
|
|
|
|
if state |
|
|
|
|
.get(&(EventType::RoomCreate, "".to_owned())) |
|
|
|
|
.map(|a| a.as_ref()) |
|
|
|
|
!= Some(&create_event) |
|
|
|
|
{ |
|
|
|
|
return Err("Incoming event refers to wrong create event.".to_owned()); |
|
|
|
|
} |
|
|
|
|
// The original create event must still be in the state
|
|
|
|
|
if state |
|
|
|
|
.get(&(EventType::RoomCreate, "".to_owned())) |
|
|
|
|
.map(|a| a.as_ref()) |
|
|
|
|
!= Some(&create_event) |
|
|
|
|
{ |
|
|
|
|
return Err("Incoming event refers to wrong create event.".to_owned()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
debug!("Fetching auth chain events at event."); |
|
|
|
|
fetch_and_handle_outliers( |
|
|
|
|
&db, |
|
|
|
|
origin, |
|
|
|
|
&res.auth_chain_ids, |
|
|
|
|
&create_event, |
|
|
|
|
&room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await; |
|
|
|
|
debug!("Fetching auth chain events at event."); |
|
|
|
|
fetch_and_handle_outliers( |
|
|
|
|
&db, |
|
|
|
|
origin, |
|
|
|
|
&res.auth_chain_ids, |
|
|
|
|
&create_event, |
|
|
|
|
&room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
state_at_incoming_event = Some(state); |
|
|
|
|
} |
|
|
|
|
Err(_) => { |
|
|
|
|
return Err("Fetching state for event failed".into()); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
state_at_incoming_event = Some(state); |
|
|
|
|
} |
|
|
|
|
Err(_) => { |
|
|
|
|
return Err("Fetching state for event failed".into()); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let state_at_incoming_event = |
|
|
|
|
state_at_incoming_event.expect("we always set this to some above"); |
|
|
|
|
let state_at_incoming_event = |
|
|
|
|
state_at_incoming_event.expect("we always set this to some above"); |
|
|
|
|
|
|
|
|
|
// 11. Check the auth of the event passes based on the state of the event
|
|
|
|
|
// 11. Check the auth of the event passes based on the state of the event
|
|
|
|
|
let create_event_content = |
|
|
|
|
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) |
|
|
|
|
.expect("Raw::from_value always works.") |
|
|
|
|
.deserialize() |
|
|
|
|
.map_err(|_| "Invalid PowerLevels event in db.".to_owned())?; |
|
|
|
|
|
|
|
|
|
let room_version_id = &create_event_content.room_version; |
|
|
|
|
let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); |
|
|
|
|
|
|
|
|
|
// If the previous event was the create event special rules apply
|
|
|
|
|
let previous_create = if incoming_pdu.auth_events.len() == 1 |
|
|
|
|
&& incoming_pdu.prev_events == incoming_pdu.auth_events |
|
|
|
|
{ |
|
|
|
|
db.rooms |
|
|
|
|
.get_pdu(&incoming_pdu.auth_events[0]) |
|
|
|
|
.map_err(|e| e.to_string())? |
|
|
|
|
.filter(|maybe_create| **maybe_create == *create_event) |
|
|
|
|
} else { |
|
|
|
|
None |
|
|
|
|
}; |
|
|
|
|
let room_version_id = &create_event_content.room_version; |
|
|
|
|
let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); |
|
|
|
|
|
|
|
|
|
// If the previous event was the create event special rules apply
|
|
|
|
|
let previous_create = if incoming_pdu.auth_events.len() == 1 |
|
|
|
|
&& incoming_pdu.prev_events == incoming_pdu.auth_events |
|
|
|
|
{ |
|
|
|
|
db.rooms |
|
|
|
|
.get_pdu(&incoming_pdu.auth_events[0]) |
|
|
|
|
.map_err(|e| e.to_string())? |
|
|
|
|
.filter(|maybe_create| **maybe_create == *create_event) |
|
|
|
|
} else { |
|
|
|
|
None |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
if !state_res::event_auth::auth_check( |
|
|
|
|
&room_version, |
|
|
|
|
&incoming_pdu, |
|
|
|
|
previous_create.clone(), |
|
|
|
|
&state_at_incoming_event, |
|
|
|
|
None, // TODO: third party invite
|
|
|
|
|
) |
|
|
|
|
.map_err(|_e| "Auth check failed.".to_owned())? |
|
|
|
|
{ |
|
|
|
|
return Err("Event has failed auth check with state at the event.".into()); |
|
|
|
|
} |
|
|
|
|
debug!("Auth check succeeded."); |
|
|
|
|
if !state_res::event_auth::auth_check( |
|
|
|
|
&room_version, |
|
|
|
|
&incoming_pdu, |
|
|
|
|
previous_create.clone(), |
|
|
|
|
&state_at_incoming_event, |
|
|
|
|
None, // TODO: third party invite
|
|
|
|
|
) |
|
|
|
|
.map_err(|_e| "Auth check failed.".to_owned())? |
|
|
|
|
{ |
|
|
|
|
return Err("Event has failed auth check with state at the event.".into()); |
|
|
|
|
} |
|
|
|
|
debug!("Auth check succeeded."); |
|
|
|
|
|
|
|
|
|
// We start looking at current room state now, so lets lock the room
|
|
|
|
|
// We start looking at current room state now, so lets lock the room
|
|
|
|
|
|
|
|
|
|
let mutex_state = Arc::clone( |
|
|
|
|
db.globals |
|
|
|
|
.roomid_mutex_state |
|
|
|
|
.write() |
|
|
|
|
.unwrap() |
|
|
|
|
.entry(room_id.clone()) |
|
|
|
|
.or_default(), |
|
|
|
|
); |
|
|
|
|
let state_lock = mutex_state.lock().await; |
|
|
|
|
let mutex_state = Arc::clone( |
|
|
|
|
db.globals |
|
|
|
|
.roomid_mutex_state |
|
|
|
|
.write() |
|
|
|
|
.unwrap() |
|
|
|
|
.entry(room_id.clone()) |
|
|
|
|
.or_default(), |
|
|
|
|
); |
|
|
|
|
let state_lock = mutex_state.lock().await; |
|
|
|
|
|
|
|
|
|
// Now we calculate the set of extremities this room has after the incoming event has been
|
|
|
|
|
// applied. We start with the previous extremities (aka leaves)
|
|
|
|
|
let mut extremities = db |
|
|
|
|
.rooms |
|
|
|
|
.get_pdu_leaves(&room_id) |
|
|
|
|
.map_err(|_| "Failed to load room leaves".to_owned())?; |
|
|
|
|
// Now we calculate the set of extremities this room has after the incoming event has been
|
|
|
|
|
// applied. We start with the previous extremities (aka leaves)
|
|
|
|
|
let mut extremities = db |
|
|
|
|
.rooms |
|
|
|
|
.get_pdu_leaves(&room_id) |
|
|
|
|
.map_err(|_| "Failed to load room leaves".to_owned())?; |
|
|
|
|
|
|
|
|
|
// Remove any forward extremities that are referenced by this incoming event's prev_events
|
|
|
|
|
for prev_event in &incoming_pdu.prev_events { |
|
|
|
|
if extremities.contains(prev_event) { |
|
|
|
|
extremities.remove(prev_event); |
|
|
|
|
} |
|
|
|
|
// Remove any forward extremities that are referenced by this incoming event's prev_events
|
|
|
|
|
for prev_event in &incoming_pdu.prev_events { |
|
|
|
|
if extremities.contains(prev_event) { |
|
|
|
|
extremities.remove(prev_event); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Only keep those extremities were not referenced yet
|
|
|
|
|
extremities.retain(|id| !matches!(db.rooms.is_event_referenced(&room_id, id), Ok(true))); |
|
|
|
|
// Only keep those extremities were not referenced yet
|
|
|
|
|
extremities.retain(|id| !matches!(db.rooms.is_event_referenced(&room_id, id), Ok(true))); |
|
|
|
|
|
|
|
|
|
let mut extremity_statehashes = Vec::new(); |
|
|
|
|
let mut extremity_statehashes = Vec::new(); |
|
|
|
|
|
|
|
|
|
for id in &extremities { |
|
|
|
|
match db |
|
|
|
|
.rooms |
|
|
|
|
.get_pdu(&id) |
|
|
|
|
.map_err(|_| "Failed to ask db for pdu.".to_owned())? |
|
|
|
|
{ |
|
|
|
|
Some(leaf_pdu) => { |
|
|
|
|
extremity_statehashes.push(( |
|
|
|
|
db.rooms |
|
|
|
|
.pdu_shortstatehash(&leaf_pdu.event_id) |
|
|
|
|
.map_err(|_| "Failed to ask db for pdu state hash.".to_owned())? |
|
|
|
|
.ok_or_else(|| { |
|
|
|
|
error!( |
|
|
|
|
"Found extremity pdu with no statehash in db: {:?}", |
|
|
|
|
leaf_pdu |
|
|
|
|
); |
|
|
|
|
"Found pdu with no statehash in db.".to_owned() |
|
|
|
|
})?, |
|
|
|
|
Some(leaf_pdu), |
|
|
|
|
)); |
|
|
|
|
} |
|
|
|
|
_ => { |
|
|
|
|
error!("Missing state snapshot for {:?}", id); |
|
|
|
|
return Err("Missing state snapshot.".to_owned()); |
|
|
|
|
} |
|
|
|
|
for id in &extremities { |
|
|
|
|
match db |
|
|
|
|
.rooms |
|
|
|
|
.get_pdu(&id) |
|
|
|
|
.map_err(|_| "Failed to ask db for pdu.".to_owned())? |
|
|
|
|
{ |
|
|
|
|
Some(leaf_pdu) => { |
|
|
|
|
extremity_statehashes.push(( |
|
|
|
|
db.rooms |
|
|
|
|
.pdu_shortstatehash(&leaf_pdu.event_id) |
|
|
|
|
.map_err(|_| "Failed to ask db for pdu state hash.".to_owned())? |
|
|
|
|
.ok_or_else(|| { |
|
|
|
|
error!( |
|
|
|
|
"Found extremity pdu with no statehash in db: {:?}", |
|
|
|
|
leaf_pdu |
|
|
|
|
); |
|
|
|
|
"Found pdu with no statehash in db.".to_owned() |
|
|
|
|
})?, |
|
|
|
|
Some(leaf_pdu), |
|
|
|
|
)); |
|
|
|
|
} |
|
|
|
|
_ => { |
|
|
|
|
error!("Missing state snapshot for {:?}", id); |
|
|
|
|
return Err("Missing state snapshot.".to_owned()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// 12. Ensure that the state is derived from the previous current state (i.e. we calculated
|
|
|
|
|
// by doing state res where one of the inputs was a previously trusted set of state,
|
|
|
|
|
// don't just trust a set of state we got from a remote).
|
|
|
|
|
// 12. Ensure that the state is derived from the previous current state (i.e. we calculated
|
|
|
|
|
// by doing state res where one of the inputs was a previously trusted set of state,
|
|
|
|
|
// don't just trust a set of state we got from a remote).
|
|
|
|
|
|
|
|
|
|
// We do this by adding the current state to the list of fork states
|
|
|
|
|
let current_statehash = db |
|
|
|
|
.rooms |
|
|
|
|
.current_shortstatehash(&room_id) |
|
|
|
|
.map_err(|_| "Failed to load current state hash.".to_owned())? |
|
|
|
|
.expect("every room has state"); |
|
|
|
|
// We do this by adding the current state to the list of fork states
|
|
|
|
|
let current_statehash = db |
|
|
|
|
.rooms |
|
|
|
|
.current_shortstatehash(&room_id) |
|
|
|
|
.map_err(|_| "Failed to load current state hash.".to_owned())? |
|
|
|
|
.expect("every room has state"); |
|
|
|
|
|
|
|
|
|
let current_state = db |
|
|
|
|
.rooms |
|
|
|
|
.state_full(current_statehash) |
|
|
|
|
.map_err(|_| "Failed to load room state.")?; |
|
|
|
|
let current_state = db |
|
|
|
|
.rooms |
|
|
|
|
.state_full(current_statehash) |
|
|
|
|
.map_err(|_| "Failed to load room state.")?; |
|
|
|
|
|
|
|
|
|
extremity_statehashes.push((current_statehash.clone(), None)); |
|
|
|
|
extremity_statehashes.push((current_statehash.clone(), None)); |
|
|
|
|
|
|
|
|
|
let mut fork_states = Vec::new(); |
|
|
|
|
for (statehash, leaf_pdu) in extremity_statehashes { |
|
|
|
|
let mut leaf_state = db |
|
|
|
|
.rooms |
|
|
|
|
.state_full(statehash) |
|
|
|
|
.map_err(|_| "Failed to ask db for room state.".to_owned())?; |
|
|
|
|
|
|
|
|
|
if let Some(leaf_pdu) = leaf_pdu { |
|
|
|
|
if let Some(state_key) = &leaf_pdu.state_key { |
|
|
|
|
// Now it's the state after
|
|
|
|
|
let key = (leaf_pdu.kind.clone(), state_key.clone()); |
|
|
|
|
leaf_state.insert(key, leaf_pdu); |
|
|
|
|
} |
|
|
|
|
let mut fork_states = Vec::new(); |
|
|
|
|
for (statehash, leaf_pdu) in extremity_statehashes { |
|
|
|
|
let mut leaf_state = db |
|
|
|
|
.rooms |
|
|
|
|
.state_full(statehash) |
|
|
|
|
.map_err(|_| "Failed to ask db for room state.".to_owned())?; |
|
|
|
|
|
|
|
|
|
if let Some(leaf_pdu) = leaf_pdu { |
|
|
|
|
if let Some(state_key) = &leaf_pdu.state_key { |
|
|
|
|
// Now it's the state after
|
|
|
|
|
let key = (leaf_pdu.kind.clone(), state_key.clone()); |
|
|
|
|
leaf_state.insert(key, leaf_pdu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fork_states.push(leaf_state); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// We also add state after incoming event to the fork states
|
|
|
|
|
extremities.insert(incoming_pdu.event_id.clone()); |
|
|
|
|
let mut state_after = state_at_incoming_event.clone(); |
|
|
|
|
if let Some(state_key) = &incoming_pdu.state_key { |
|
|
|
|
state_after.insert( |
|
|
|
|
(incoming_pdu.kind.clone(), state_key.clone()), |
|
|
|
|
incoming_pdu.clone(), |
|
|
|
|
fork_states.push(leaf_state); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// We also add state after incoming event to the fork states
|
|
|
|
|
extremities.insert(incoming_pdu.event_id.clone()); |
|
|
|
|
let mut state_after = state_at_incoming_event.clone(); |
|
|
|
|
if let Some(state_key) = &incoming_pdu.state_key { |
|
|
|
|
state_after.insert( |
|
|
|
|
(incoming_pdu.kind.clone(), state_key.clone()), |
|
|
|
|
incoming_pdu.clone(), |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
fork_states.push(state_after.clone()); |
|
|
|
|
|
|
|
|
|
let mut update_state = false; |
|
|
|
|
// 14. Use state resolution to find new room state
|
|
|
|
|
let new_room_state = if fork_states.is_empty() { |
|
|
|
|
return Err("State is empty.".to_owned()); |
|
|
|
|
} else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { |
|
|
|
|
// There was only one state, so it has to be the room's current state (because that is
|
|
|
|
|
// always included)
|
|
|
|
|
fork_states[0] |
|
|
|
|
.iter() |
|
|
|
|
.map(|(k, pdu)| (k.clone(), pdu.event_id.clone())) |
|
|
|
|
.collect() |
|
|
|
|
} else { |
|
|
|
|
// We do need to force an update to this room's state
|
|
|
|
|
update_state = true; |
|
|
|
|
|
|
|
|
|
let fork_states = &fork_states |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|map| { |
|
|
|
|
map.into_iter() |
|
|
|
|
.map(|(k, v)| (k, v.event_id.clone())) |
|
|
|
|
.collect::<StateMap<_>>() |
|
|
|
|
}) |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
|
|
|
|
|
let mut auth_chain_sets = Vec::new(); |
|
|
|
|
for state in fork_states { |
|
|
|
|
auth_chain_sets.push( |
|
|
|
|
get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db) |
|
|
|
|
.map_err(|_| "Failed to load auth chain.".to_owned())? |
|
|
|
|
.collect(), |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
fork_states.push(state_after.clone()); |
|
|
|
|
|
|
|
|
|
let mut update_state = false; |
|
|
|
|
// 14. Use state resolution to find new room state
|
|
|
|
|
let new_room_state = if fork_states.is_empty() { |
|
|
|
|
return Err("State is empty.".to_owned()); |
|
|
|
|
} else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { |
|
|
|
|
// There was only one state, so it has to be the room's current state (because that is
|
|
|
|
|
// always included)
|
|
|
|
|
fork_states[0] |
|
|
|
|
.iter() |
|
|
|
|
.map(|(k, pdu)| (k.clone(), pdu.event_id.clone())) |
|
|
|
|
.collect() |
|
|
|
|
} else { |
|
|
|
|
// We do need to force an update to this room's state
|
|
|
|
|
update_state = true; |
|
|
|
|
|
|
|
|
|
let fork_states = &fork_states |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|map| { |
|
|
|
|
map.into_iter() |
|
|
|
|
.map(|(k, v)| (k, v.event_id.clone())) |
|
|
|
|
.collect::<StateMap<_>>() |
|
|
|
|
}) |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
|
|
|
|
|
let mut auth_chain_sets = Vec::new(); |
|
|
|
|
for state in fork_states { |
|
|
|
|
auth_chain_sets.push( |
|
|
|
|
get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db) |
|
|
|
|
.map_err(|_| "Failed to load auth chain.".to_owned())? |
|
|
|
|
.collect(), |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let state = match state_res::StateResolution::resolve( |
|
|
|
|
&room_id, |
|
|
|
|
room_version_id, |
|
|
|
|
fork_states, |
|
|
|
|
auth_chain_sets, |
|
|
|
|
|id| { |
|
|
|
|
let res = db.rooms.get_pdu(id); |
|
|
|
|
if let Err(e) = &res { |
|
|
|
|
error!("LOOK AT ME Failed to fetch event: {}", e); |
|
|
|
|
} |
|
|
|
|
res.ok().flatten() |
|
|
|
|
}, |
|
|
|
|
) { |
|
|
|
|
Ok(new_state) => new_state, |
|
|
|
|
Err(_) => { |
|
|
|
|
return Err("State resolution failed, either an event could not be found or deserialization".into()); |
|
|
|
|
let state = match state_res::StateResolution::resolve( |
|
|
|
|
&room_id, |
|
|
|
|
room_version_id, |
|
|
|
|
fork_states, |
|
|
|
|
auth_chain_sets, |
|
|
|
|
|id| { |
|
|
|
|
let res = db.rooms.get_pdu(id); |
|
|
|
|
if let Err(e) = &res { |
|
|
|
|
error!("LOOK AT ME Failed to fetch event: {}", e); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
state |
|
|
|
|
res.ok().flatten() |
|
|
|
|
}, |
|
|
|
|
) { |
|
|
|
|
Ok(new_state) => new_state, |
|
|
|
|
Err(_) => { |
|
|
|
|
return Err("State resolution failed, either an event could not be found or deserialization".into()); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
debug!("starting soft fail auth check"); |
|
|
|
|
// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it
|
|
|
|
|
let soft_fail = !state_res::event_auth::auth_check( |
|
|
|
|
&room_version, |
|
|
|
|
&incoming_pdu, |
|
|
|
|
previous_create, |
|
|
|
|
¤t_state, |
|
|
|
|
None, |
|
|
|
|
) |
|
|
|
|
.map_err(|_e| "Auth check failed.".to_owned())?; |
|
|
|
|
|
|
|
|
|
let mut pdu_id = None; |
|
|
|
|
if !soft_fail { |
|
|
|
|
// Now that the event has passed all auth it is added into the timeline.
|
|
|
|
|
// We use the `state_at_event` instead of `state_after` so we accurately
|
|
|
|
|
// represent the state for this event.
|
|
|
|
|
pdu_id = Some( |
|
|
|
|
append_incoming_pdu( |
|
|
|
|
&db, |
|
|
|
|
&incoming_pdu, |
|
|
|
|
val, |
|
|
|
|
extremities, |
|
|
|
|
&state_at_incoming_event, |
|
|
|
|
&state_lock, |
|
|
|
|
) |
|
|
|
|
.map_err(|_| "Failed to add pdu to db.".to_owned())?, |
|
|
|
|
); |
|
|
|
|
debug!("Appended incoming pdu."); |
|
|
|
|
} else { |
|
|
|
|
warn!("Event was soft failed: {:?}", incoming_pdu); |
|
|
|
|
} |
|
|
|
|
state |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// Set the new room state to the resolved state
|
|
|
|
|
if update_state { |
|
|
|
|
db.rooms |
|
|
|
|
.force_state(&room_id, new_room_state, &db) |
|
|
|
|
.map_err(|_| "Failed to set new room state.".to_owned())?; |
|
|
|
|
} |
|
|
|
|
debug!("Updated resolved state"); |
|
|
|
|
debug!("starting soft fail auth check"); |
|
|
|
|
// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it
|
|
|
|
|
let soft_fail = !state_res::event_auth::auth_check( |
|
|
|
|
&room_version, |
|
|
|
|
&incoming_pdu, |
|
|
|
|
previous_create, |
|
|
|
|
¤t_state, |
|
|
|
|
None, |
|
|
|
|
) |
|
|
|
|
.map_err(|_e| "Auth check failed.".to_owned())?; |
|
|
|
|
|
|
|
|
|
let mut pdu_id = None; |
|
|
|
|
if !soft_fail { |
|
|
|
|
// Now that the event has passed all auth it is added into the timeline.
|
|
|
|
|
// We use the `state_at_event` instead of `state_after` so we accurately
|
|
|
|
|
// represent the state for this event.
|
|
|
|
|
pdu_id = Some( |
|
|
|
|
append_incoming_pdu( |
|
|
|
|
&db, |
|
|
|
|
&incoming_pdu, |
|
|
|
|
val, |
|
|
|
|
extremities, |
|
|
|
|
&state_at_incoming_event, |
|
|
|
|
&state_lock, |
|
|
|
|
) |
|
|
|
|
.map_err(|_| "Failed to add pdu to db.".to_owned())?, |
|
|
|
|
); |
|
|
|
|
debug!("Appended incoming pdu."); |
|
|
|
|
} else { |
|
|
|
|
warn!("Event was soft failed: {:?}", incoming_pdu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if soft_fail { |
|
|
|
|
// Soft fail, we leave the event as an outlier but don't add it to the timeline
|
|
|
|
|
return Err("Event has been soft failed".into()); |
|
|
|
|
} |
|
|
|
|
// Set the new room state to the resolved state
|
|
|
|
|
if update_state { |
|
|
|
|
db.rooms |
|
|
|
|
.force_state(&room_id, new_room_state, &db) |
|
|
|
|
.map_err(|_| "Failed to set new room state.".to_owned())?; |
|
|
|
|
} |
|
|
|
|
debug!("Updated resolved state"); |
|
|
|
|
|
|
|
|
|
if soft_fail { |
|
|
|
|
// Soft fail, we leave the event as an outlier but don't add it to the timeline
|
|
|
|
|
return Err("Event has been soft failed".into()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Event has passed all auth/stateres checks
|
|
|
|
|
drop(state_lock); |
|
|
|
|
Ok(pdu_id) |
|
|
|
|
// Event has passed all auth/stateres checks
|
|
|
|
|
drop(state_lock); |
|
|
|
|
Ok(pdu_id) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
|
|
|
|
@ -1535,9 +1570,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
@@ -1535,9 +1570,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
|
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok((pdu, json)) => { |
|
|
|
|
(pdu, Some(json)) |
|
|
|
|
} |
|
|
|
|
Ok((pdu, json)) => (pdu, Some(json)), |
|
|
|
|
Err(e) => { |
|
|
|
|
warn!("Authentication of event {} failed: {:?}", id, e); |
|
|
|
|
back_off(id.clone()); |
|
|
|
|
|