|
|
|
@ -884,9 +884,15 @@ pub async fn handle_incoming_pdu<'a>( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
|
|
|
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
|
|
|
|
|
|
|
let mut visited = HashSet::new(); |
|
|
|
let mut todo_outlier_stack = incoming_pdu.prev_events.clone(); |
|
|
|
let mut todo_outlier_stack = incoming_pdu.prev_events.clone(); |
|
|
|
let mut todo_timeline_stack = Vec::new(); |
|
|
|
let mut todo_timeline_stack = Vec::new(); |
|
|
|
while let Some(prev_event_id) = todo_outlier_stack.pop() { |
|
|
|
while let Some(prev_event_id) = todo_outlier_stack.pop() { |
|
|
|
|
|
|
|
if visited.contains(&prev_event_id) { |
|
|
|
|
|
|
|
continue; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
visited.insert(prev_event_id.clone()); |
|
|
|
|
|
|
|
|
|
|
|
if let Some((pdu, json_opt)) = fetch_and_handle_outliers( |
|
|
|
if let Some((pdu, json_opt)) = fetch_and_handle_outliers( |
|
|
|
db, |
|
|
|
db, |
|
|
|
origin, |
|
|
|
origin, |
|
|
|
@ -898,7 +904,9 @@ pub async fn handle_incoming_pdu<'a>( |
|
|
|
.await |
|
|
|
.await |
|
|
|
.pop() |
|
|
|
.pop() |
|
|
|
{ |
|
|
|
{ |
|
|
|
if let Some(json) = json_opt.or_else(|| db.rooms.get_outlier_pdu_json(&prev_event_id).ok().flatten()) { |
|
|
|
if let Some(json) = |
|
|
|
|
|
|
|
json_opt.or_else(|| db.rooms.get_outlier_pdu_json(&prev_event_id).ok().flatten()) |
|
|
|
|
|
|
|
{ |
|
|
|
if incoming_pdu.origin_server_ts |
|
|
|
if incoming_pdu.origin_server_ts |
|
|
|
> db.rooms |
|
|
|
> db.rooms |
|
|
|
.first_pdu_in_room(&room_id) |
|
|
|
.first_pdu_in_room(&room_id) |
|
|
|
@ -949,8 +957,6 @@ fn handle_outlier_pdu<'a>( |
|
|
|
) -> AsyncRecursiveType<'a, StdResult<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> |
|
|
|
) -> AsyncRecursiveType<'a, StdResult<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> |
|
|
|
{ |
|
|
|
{ |
|
|
|
Box::pin(async move { |
|
|
|
Box::pin(async move { |
|
|
|
let start_time = Instant::now(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
|
|
|
|
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
|
|
|
|
|
|
|
|
|
|
|
|
// We go through all the signatures we see on the value and fetch the corresponding signing
|
|
|
|
// We go through all the signatures we see on the value and fetch the corresponding signing
|
|
|
|
@ -1109,6 +1115,9 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
room_id: &RoomId, |
|
|
|
room_id: &RoomId, |
|
|
|
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
) -> StdResult<Option<Vec<u8>>, String> { |
|
|
|
) -> StdResult<Option<Vec<u8>>, String> { |
|
|
|
|
|
|
|
if let Ok(Some(pduid)) = db.rooms.get_pdu_id(&incoming_pdu.event_id) { |
|
|
|
|
|
|
|
return Ok(Some(pduid)); |
|
|
|
|
|
|
|
} |
|
|
|
// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
|
|
|
|
// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
|
|
|
|
// doing all the checks in this list starting at 1. These are not timeline events.
|
|
|
|
// doing all the checks in this list starting at 1. These are not timeline events.
|
|
|
|
|
|
|
|
|
|
|
|
|