|
|
|
|
@ -1248,7 +1248,7 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1248,7 +1248,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
&db.globals, |
|
|
|
|
origin, |
|
|
|
|
get_room_state_ids::v1::Request { |
|
|
|
|
room_id: &room_id, |
|
|
|
|
room_id, |
|
|
|
|
event_id: &incoming_pdu.event_id, |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
@ -1257,15 +1257,15 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1257,15 +1257,15 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
Ok(res) => { |
|
|
|
|
warn!("Fetching state events at event."); |
|
|
|
|
let state_vec = fetch_and_handle_outliers( |
|
|
|
|
&db, |
|
|
|
|
db, |
|
|
|
|
origin, |
|
|
|
|
&res.pdu_ids |
|
|
|
|
.iter() |
|
|
|
|
.cloned() |
|
|
|
|
.map(Arc::new) |
|
|
|
|
.collect::<Vec<_>>(), |
|
|
|
|
&create_event, |
|
|
|
|
&room_id, |
|
|
|
|
create_event, |
|
|
|
|
room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await; |
|
|
|
|
@ -1350,11 +1350,11 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1350,11 +1350,11 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
None, // TODO: third party invite
|
|
|
|
|
|k, s| { |
|
|
|
|
db.rooms |
|
|
|
|
.get_shortstatekey(&k, &s) |
|
|
|
|
.get_shortstatekey(k, s) |
|
|
|
|
.ok() |
|
|
|
|
.flatten() |
|
|
|
|
.and_then(|shortstatekey| state_at_incoming_event.get(&shortstatekey)) |
|
|
|
|
.and_then(|event_id| db.rooms.get_pdu(&event_id).ok().flatten()) |
|
|
|
|
.and_then(|event_id| db.rooms.get_pdu(event_id).ok().flatten()) |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
.map_err(|_e| "Auth check failed.".to_owned())? |
|
|
|
|
@ -1379,7 +1379,7 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1379,7 +1379,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
// applied. We start with the previous extremities (aka leaves)
|
|
|
|
|
let mut extremities = db |
|
|
|
|
.rooms |
|
|
|
|
.get_pdu_leaves(&room_id) |
|
|
|
|
.get_pdu_leaves(room_id) |
|
|
|
|
.map_err(|_| "Failed to load room leaves".to_owned())?; |
|
|
|
|
|
|
|
|
|
// Remove any forward extremities that are referenced by this incoming event's prev_events
|
|
|
|
|
@ -1390,11 +1390,11 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1390,11 +1390,11 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Only keep those extremities were not referenced yet
|
|
|
|
|
extremities.retain(|id| !matches!(db.rooms.is_event_referenced(&room_id, id), Ok(true))); |
|
|
|
|
extremities.retain(|id| !matches!(db.rooms.is_event_referenced(room_id, id), Ok(true))); |
|
|
|
|
|
|
|
|
|
let current_sstatehash = db |
|
|
|
|
.rooms |
|
|
|
|
.current_shortstatehash(&room_id) |
|
|
|
|
.current_shortstatehash(room_id) |
|
|
|
|
.map_err(|_| "Failed to load current state hash.".to_owned())? |
|
|
|
|
.expect("every room has state"); |
|
|
|
|
|
|
|
|
|
@ -1409,7 +1409,7 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1409,7 +1409,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
for id in dbg!(&extremities) { |
|
|
|
|
match db |
|
|
|
|
.rooms |
|
|
|
|
.get_pdu(&id) |
|
|
|
|
.get_pdu(id) |
|
|
|
|
.map_err(|_| "Failed to ask db for pdu.".to_owned())? |
|
|
|
|
{ |
|
|
|
|
Some(leaf_pdu) => { |
|
|
|
|
@ -1486,7 +1486,7 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1486,7 +1486,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
.iter() |
|
|
|
|
.map(|(k, id)| { |
|
|
|
|
db.rooms |
|
|
|
|
.compress_state_event(*k, &id, &db.globals) |
|
|
|
|
.compress_state_event(*k, id, &db.globals) |
|
|
|
|
.map_err(|_| "Failed to compress_state_event.".to_owned()) |
|
|
|
|
}) |
|
|
|
|
.collect::<StdResult<_, String>>()? |
|
|
|
|
@ -1514,7 +1514,7 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1514,7 +1514,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let state = match state_res::StateResolution::resolve( |
|
|
|
|
&room_id, |
|
|
|
|
room_id, |
|
|
|
|
room_version_id, |
|
|
|
|
fork_states, |
|
|
|
|
auth_chain_sets, |
|
|
|
|
@ -1549,7 +1549,7 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1549,7 +1549,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
// Set the new room state to the resolved state
|
|
|
|
|
if update_state { |
|
|
|
|
db.rooms |
|
|
|
|
.force_state(&room_id, new_room_state, &db) |
|
|
|
|
.force_state(room_id, new_room_state, db) |
|
|
|
|
.map_err(|_| "Failed to set new room state.".to_owned())?; |
|
|
|
|
} |
|
|
|
|
debug!("Updated resolved state"); |
|
|
|
|
@ -1562,7 +1562,7 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1562,7 +1562,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
let auth_events = db |
|
|
|
|
.rooms |
|
|
|
|
.get_auth_events( |
|
|
|
|
&room_id, |
|
|
|
|
room_id, |
|
|
|
|
&incoming_pdu.kind, |
|
|
|
|
&incoming_pdu.sender, |
|
|
|
|
incoming_pdu.state_key.as_deref(), |
|
|
|
|
@ -1587,13 +1587,13 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1587,13 +1587,13 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
.iter() |
|
|
|
|
.map(|(shortstatekey, id)| { |
|
|
|
|
db.rooms |
|
|
|
|
.compress_state_event(*shortstatekey, &id, &db.globals) |
|
|
|
|
.compress_state_event(*shortstatekey, id, &db.globals) |
|
|
|
|
.map_err(|_| "Failed to compress_state_event".to_owned()) |
|
|
|
|
}) |
|
|
|
|
.collect::<StdResult<_, String>>()?; |
|
|
|
|
|
|
|
|
|
let pdu_id = append_incoming_pdu( |
|
|
|
|
&db, |
|
|
|
|
db, |
|
|
|
|
&incoming_pdu, |
|
|
|
|
val, |
|
|
|
|
extremities, |
|
|
|
|
@ -1644,7 +1644,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
@@ -1644,7 +1644,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
|
|
|
|
|
|
|
|
|
|
let mut pdus = vec![]; |
|
|
|
|
for id in events { |
|
|
|
|
if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(&id) { |
|
|
|
|
if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(id) { |
|
|
|
|
// Exponential backoff
|
|
|
|
|
let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); |
|
|
|
|
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { |
|
|
|
|
@ -1660,7 +1660,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
@@ -1660,7 +1660,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
|
|
|
|
|
// a. Look in the main timeline (pduid_pdu tree)
|
|
|
|
|
// b. Look at outlier pdu tree
|
|
|
|
|
// (get_pdu_json checks both)
|
|
|
|
|
let local_pdu = db.rooms.get_pdu(&id); |
|
|
|
|
let local_pdu = db.rooms.get_pdu(id); |
|
|
|
|
let pdu = match local_pdu { |
|
|
|
|
Ok(Some(pdu)) => { |
|
|
|
|
trace!("Found {} in db", id); |
|
|
|
|
@ -1674,7 +1674,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
@@ -1674,7 +1674,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
|
|
|
|
|
.send_federation_request( |
|
|
|
|
&db.globals, |
|
|
|
|
origin, |
|
|
|
|
get_event::v1::Request { event_id: &id }, |
|
|
|
|
get_event::v1::Request { event_id: id }, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
@ -1694,7 +1694,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
@@ -1694,7 +1694,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
|
|
|
|
|
origin, |
|
|
|
|
create_event, |
|
|
|
|
&event_id, |
|
|
|
|
&room_id, |
|
|
|
|
room_id, |
|
|
|
|
value.clone(), |
|
|
|
|
db, |
|
|
|
|
pub_key_map, |
|
|
|
|
@ -1842,7 +1842,7 @@ pub(crate) async fn fetch_signing_keys(
@@ -1842,7 +1842,7 @@ pub(crate) async fn fetch_signing_keys(
|
|
|
|
|
.sending |
|
|
|
|
.send_federation_request( |
|
|
|
|
&db.globals, |
|
|
|
|
&server, |
|
|
|
|
server, |
|
|
|
|
get_remote_server_keys::v2::Request::new( |
|
|
|
|
origin, |
|
|
|
|
MilliSecondsSinceUnixEpoch::from_system_time( |
|
|
|
|
|