|
|
|
|
@ -867,17 +867,19 @@ pub async fn handle_incoming_pdu<'a>(
@@ -867,17 +867,19 @@ pub async fn handle_incoming_pdu<'a>(
|
|
|
|
|
.map_err(|_| "Failed to ask database for event.".to_owned())? |
|
|
|
|
.ok_or_else(|| "Failed to find create event in db.".to_owned())?; |
|
|
|
|
|
|
|
|
|
let (incoming_pdu, val) = handle_outlier_pdu(origin, &create_event, event_id, room_id, value, db, pub_key_map).await?; |
|
|
|
|
let (incoming_pdu, val) = handle_outlier_pdu( |
|
|
|
|
origin, |
|
|
|
|
&create_event, |
|
|
|
|
event_id, |
|
|
|
|
room_id, |
|
|
|
|
value, |
|
|
|
|
db, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
|
|
|
|
|
// 8. if not timeline event: stop
|
|
|
|
|
if !is_timeline_event |
|
|
|
|
|| incoming_pdu.origin_server_ts |
|
|
|
|
< db.rooms |
|
|
|
|
.first_pdu_in_room(&room_id) |
|
|
|
|
.map_err(|_| "Error loading first room event.".to_owned())? |
|
|
|
|
.expect("Room exists") |
|
|
|
|
.origin_server_ts |
|
|
|
|
{ |
|
|
|
|
if !is_timeline_event { |
|
|
|
|
return Ok(None); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -893,16 +895,45 @@ pub async fn handle_incoming_pdu<'a>(
@@ -893,16 +895,45 @@ pub async fn handle_incoming_pdu<'a>(
|
|
|
|
|
&room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await.pop() { |
|
|
|
|
.await |
|
|
|
|
.pop() |
|
|
|
|
{ |
|
|
|
|
if incoming_pdu.origin_server_ts |
|
|
|
|
> db.rooms |
|
|
|
|
.first_pdu_in_room(&room_id) |
|
|
|
|
.map_err(|_| "Error loading first room event.".to_owned())? |
|
|
|
|
.expect("Room exists") |
|
|
|
|
.origin_server_ts |
|
|
|
|
{ |
|
|
|
|
todo_outlier_stack.extend(pdu.prev_events.iter().cloned()); |
|
|
|
|
todo_timeline_stack.push((pdu, json)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
while let Some(prev) = todo_timeline_stack.pop() { |
|
|
|
|
upgrade_outlier_to_timeline_pdu(prev.0, prev.1, &create_event, origin, db, room_id, pub_key_map).await?; |
|
|
|
|
upgrade_outlier_to_timeline_pdu( |
|
|
|
|
prev.0, |
|
|
|
|
prev.1, |
|
|
|
|
&create_event, |
|
|
|
|
origin, |
|
|
|
|
db, |
|
|
|
|
room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, db, room_id, pub_key_map).await |
|
|
|
|
upgrade_outlier_to_timeline_pdu( |
|
|
|
|
incoming_pdu, |
|
|
|
|
val, |
|
|
|
|
&create_event, |
|
|
|
|
origin, |
|
|
|
|
db, |
|
|
|
|
room_id, |
|
|
|
|
pub_key_map, |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn handle_outlier_pdu<'a>( |
|
|
|
|
@ -913,7 +944,8 @@ fn handle_outlier_pdu<'a>(
@@ -913,7 +944,8 @@ fn handle_outlier_pdu<'a>(
|
|
|
|
|
value: BTreeMap<String, CanonicalJsonValue>, |
|
|
|
|
db: &'a Database, |
|
|
|
|
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
|
) -> AsyncRecursiveType<'a, StdResult<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> { |
|
|
|
|
) -> AsyncRecursiveType<'a, StdResult<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> |
|
|
|
|
{ |
|
|
|
|
Box::pin(async move { |
|
|
|
|
let start_time = Instant::now(); |
|
|
|
|
|
|
|
|
|
@ -1064,7 +1096,6 @@ fn handle_outlier_pdu<'a>(
@@ -1064,7 +1096,6 @@ fn handle_outlier_pdu<'a>(
|
|
|
|
|
|
|
|
|
|
Ok((incoming_pdu, val)) |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
|
@ -1120,7 +1151,8 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1120,7 +1151,8 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
}) |
|
|
|
|
.collect::<HashMap<_, _>>(); |
|
|
|
|
|
|
|
|
|
let prev_pdu = db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { |
|
|
|
|
let prev_pdu = |
|
|
|
|
db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { |
|
|
|
|
"Could not find prev event, but we know the state.".to_owned() |
|
|
|
|
})?; |
|
|
|
|
|
|
|
|
|
@ -1163,15 +1195,19 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1163,15 +1195,19 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
|
|
|
|
|
let mut state = HashMap::new(); |
|
|
|
|
for (pdu, _) in state_vec { |
|
|
|
|
match state.entry((pdu.kind.clone(), pdu.state_key.clone().ok_or_else(|| "Found non-state pdu in state events.".to_owned())?)) { |
|
|
|
|
match state.entry(( |
|
|
|
|
pdu.kind.clone(), |
|
|
|
|
pdu.state_key |
|
|
|
|
.clone() |
|
|
|
|
.ok_or_else(|| "Found non-state pdu in state events.".to_owned())?, |
|
|
|
|
)) { |
|
|
|
|
Entry::Vacant(v) => { |
|
|
|
|
v.insert(pdu); |
|
|
|
|
} |
|
|
|
|
Entry::Occupied(_) => { |
|
|
|
|
return Err( |
|
|
|
|
"State event's type and state_key combination exists multiple times.".to_owned(), |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
Entry::Occupied(_) => return Err( |
|
|
|
|
"State event's type and state_key combination exists multiple times." |
|
|
|
|
.to_owned(), |
|
|
|
|
), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -1228,7 +1264,6 @@ async fn upgrade_outlier_to_timeline_pdu(
@@ -1228,7 +1264,6 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|
|
|
|
None |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if !state_res::event_auth::auth_check( |
|
|
|
|
&room_version, |
|
|
|
|
&incoming_pdu, |
|
|
|
|
@ -1535,9 +1570,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
@@ -1535,9 +1570,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
|
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok((pdu, json)) => { |
|
|
|
|
(pdu, Some(json)) |
|
|
|
|
} |
|
|
|
|
Ok((pdu, json)) => (pdu, Some(json)), |
|
|
|
|
Err(e) => { |
|
|
|
|
warn!("Authentication of event {} failed: {:?}", id, e); |
|
|
|
|
back_off(id.clone()); |
|
|
|
|
|