|
|
|
|
@ -1392,12 +1392,11 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1392,12 +1392,11 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
let mut starting_events = Vec::with_capacity(leaf_state.len()); |
|
|
|
|
|
|
|
|
|
for (k, id) in leaf_state { |
|
|
|
|
let k = db |
|
|
|
|
.rooms |
|
|
|
|
.get_statekey_from_short(k) |
|
|
|
|
.map_err(|_| "Failed to get_statekey_from_short.".to_owned())?; |
|
|
|
|
|
|
|
|
|
state.insert(k, id.clone()); |
|
|
|
|
if let Ok(k) = db.rooms.get_statekey_from_short(k) { |
|
|
|
|
state.insert(k, id.clone()); |
|
|
|
|
} else { |
|
|
|
|
warn!("Failed to get_statekey_from_short."); |
|
|
|
|
} |
|
|
|
|
starting_events.push(id); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -1755,11 +1754,16 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1755,11 +1754,16 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
.into_iter() |
|
|
|
|
.map(|map| { |
|
|
|
|
map.into_iter() |
|
|
|
|
.map(|(k, id)| db.rooms.get_statekey_from_short(k).map(|k| (k, id))) |
|
|
|
|
.collect::<Result<StateMap<_>>>() |
|
|
|
|
.filter_map(|(k, id)| { |
|
|
|
|
db.rooms |
|
|
|
|
.get_statekey_from_short(k) |
|
|
|
|
.map(|k| (k, id)) |
|
|
|
|
.map_err(|e| warn!("Failed to get_statekey_from_short: {}", e)) |
|
|
|
|
.ok() |
|
|
|
|
}) |
|
|
|
|
.collect::<StateMap<_>>() |
|
|
|
|
}) |
|
|
|
|
.collect::<Result<_>>() |
|
|
|
|
.map_err(|_| "Failed to get_statekey_from_short.".to_owned())?; |
|
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
let state = match state_res::resolve( |
|
|
|
|
room_version_id, |
|
|
|
|
@ -1871,73 +1875,78 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
@@ -1871,73 +1875,78 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
|
|
|
|
|
// a. Look in the main timeline (pduid_pdu tree)
|
|
|
|
|
// b. Look at outlier pdu tree
|
|
|
|
|
// (get_pdu_json checks both)
|
|
|
|
|
let local_pdu = db.rooms.get_pdu(id); |
|
|
|
|
let pdu = match local_pdu { |
|
|
|
|
Ok(Some(pdu)) => { |
|
|
|
|
if let Ok(Some(local_pdu)) = db.rooms.get_pdu(id) { |
|
|
|
|
trace!("Found {} in db", id); |
|
|
|
|
pdus.push((local_pdu, None)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// c. Ask origin server over federation
|
|
|
|
|
// We also handle its auth chain here so we don't get a stack overflow in
|
|
|
|
|
// handle_outlier_pdu.
|
|
|
|
|
let mut todo_auth_events = vec![id]; |
|
|
|
|
let mut events_in_reverse_order = Vec::new(); |
|
|
|
|
while let Some(next_id) = todo_auth_events.pop() { |
|
|
|
|
if let Ok(Some(_)) = db.rooms.get_pdu(next_id) { |
|
|
|
|
trace!("Found {} in db", id); |
|
|
|
|
(pdu, None) |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
Ok(None) => { |
|
|
|
|
// c. Ask origin server over federation
|
|
|
|
|
warn!("Fetching {} over federation.", id); |
|
|
|
|
match db |
|
|
|
|
.sending |
|
|
|
|
.send_federation_request( |
|
|
|
|
&db.globals, |
|
|
|
|
origin, |
|
|
|
|
get_event::v1::Request { event_id: id }, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok(res) => { |
|
|
|
|
warn!("Got {} over federation", id); |
|
|
|
|
let (calculated_event_id, value) = |
|
|
|
|
match crate::pdu::gen_event_id_canonical_json(&res.pdu) { |
|
|
|
|
Ok(t) => t, |
|
|
|
|
Err(_) => { |
|
|
|
|
back_off((**id).to_owned()); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
if calculated_event_id != **id { |
|
|
|
|
warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}", |
|
|
|
|
id, calculated_event_id, &res.pdu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This will also fetch the auth chain
|
|
|
|
|
match handle_outlier_pdu( |
|
|
|
|
origin, |
|
|
|
|
create_event, |
|
|
|
|
id, |
|
|
|
|
room_id, |
|
|
|
|
value.clone(), |
|
|
|
|
db, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok((pdu, json)) => (pdu, Some(json)), |
|
|
|
|
Err(e) => { |
|
|
|
|
warn!("Authentication of event {} failed: {:?}", id, e); |
|
|
|
|
back_off((**id).to_owned()); |
|
|
|
|
warn!("Fetching {} over federation.", next_id); |
|
|
|
|
match db |
|
|
|
|
.sending |
|
|
|
|
.send_federation_request( |
|
|
|
|
&db.globals, |
|
|
|
|
origin, |
|
|
|
|
get_event::v1::Request { event_id: next_id }, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok(res) => { |
|
|
|
|
warn!("Got {} over federation", next_id); |
|
|
|
|
let (calculated_event_id, value) = |
|
|
|
|
match crate::pdu::gen_event_id_canonical_json(&res.pdu) { |
|
|
|
|
Ok(t) => t, |
|
|
|
|
Err(_) => { |
|
|
|
|
back_off((**next_id).to_owned()); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Err(_) => { |
|
|
|
|
warn!("Failed to fetch event: {}", id); |
|
|
|
|
back_off((**id).to_owned()); |
|
|
|
|
continue; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
if calculated_event_id != **next_id { |
|
|
|
|
warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}", |
|
|
|
|
next_id, calculated_event_id, &res.pdu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
events_in_reverse_order.push((next_id, value)); |
|
|
|
|
} |
|
|
|
|
Err(_) => { |
|
|
|
|
warn!("Failed to fetch event: {}", next_id); |
|
|
|
|
back_off((**next_id).to_owned()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Err(e) => { |
|
|
|
|
warn!("Error loading {}: {}", id, e); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
while let Some((next_id, value)) = events_in_reverse_order.pop() { |
|
|
|
|
match handle_outlier_pdu( |
|
|
|
|
origin, |
|
|
|
|
create_event, |
|
|
|
|
next_id, |
|
|
|
|
room_id, |
|
|
|
|
value.clone(), |
|
|
|
|
db, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok((pdu, json)) => { |
|
|
|
|
pdus.push((pdu, Some(json))); |
|
|
|
|
} |
|
|
|
|
Err(e) => { |
|
|
|
|
warn!("Authentication of event {} failed: {:?}", next_id, e); |
|
|
|
|
back_off((**next_id).to_owned()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
pdus.push(pdu); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
pdus |
|
|
|
|
}) |
|
|
|
|
|