|
|
|
@ -840,7 +840,7 @@ type AsyncRecursiveType<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>; |
|
|
|
/// 14. Use state resolution to find new room state
|
|
|
|
/// 14. Use state resolution to find new room state
|
|
|
|
// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
|
|
|
|
// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
|
|
|
|
#[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))] |
|
|
|
#[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))] |
|
|
|
pub fn handle_incoming_pdu<'a>( |
|
|
|
pub async fn handle_incoming_pdu<'a>( |
|
|
|
origin: &'a ServerName, |
|
|
|
origin: &'a ServerName, |
|
|
|
event_id: &'a EventId, |
|
|
|
event_id: &'a EventId, |
|
|
|
room_id: &'a RoomId, |
|
|
|
room_id: &'a RoomId, |
|
|
|
@ -848,11 +848,7 @@ pub fn handle_incoming_pdu<'a>( |
|
|
|
is_timeline_event: bool, |
|
|
|
is_timeline_event: bool, |
|
|
|
db: &'a Database, |
|
|
|
db: &'a Database, |
|
|
|
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
) -> AsyncRecursiveType<'a, StdResult<Option<Vec<u8>>, String>> { |
|
|
|
) -> StdResult<Option<Vec<u8>>, String> { |
|
|
|
Box::pin(async move { |
|
|
|
|
|
|
|
let start_time = Instant::now(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
|
|
|
|
|
|
|
|
match db.rooms.exists(&room_id) { |
|
|
|
match db.rooms.exists(&room_id) { |
|
|
|
Ok(true) => {} |
|
|
|
Ok(true) => {} |
|
|
|
_ => { |
|
|
|
_ => { |
|
|
|
@ -865,6 +861,64 @@ pub fn handle_incoming_pdu<'a>( |
|
|
|
return Ok(Some(pdu_id.to_vec())); |
|
|
|
return Ok(Some(pdu_id.to_vec())); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let create_event = db |
|
|
|
|
|
|
|
.rooms |
|
|
|
|
|
|
|
.room_state_get(&room_id, &EventType::RoomCreate, "") |
|
|
|
|
|
|
|
.map_err(|_| "Failed to ask database for event.".to_owned())? |
|
|
|
|
|
|
|
.ok_or_else(|| "Failed to find create event in db.".to_owned())?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let (incoming_pdu, val) = handle_outlier_pdu(origin, &create_event, event_id, room_id, value, db, pub_key_map).await?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 8. if not timeline event: stop
|
|
|
|
|
|
|
|
if !is_timeline_event |
|
|
|
|
|
|
|
|| incoming_pdu.origin_server_ts |
|
|
|
|
|
|
|
< db.rooms |
|
|
|
|
|
|
|
.first_pdu_in_room(&room_id) |
|
|
|
|
|
|
|
.map_err(|_| "Error loading first room event.".to_owned())? |
|
|
|
|
|
|
|
.expect("Room exists") |
|
|
|
|
|
|
|
.origin_server_ts |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
return Ok(None); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
|
|
|
|
|
|
|
let mut todo_outlier_stack = incoming_pdu.prev_events.clone(); |
|
|
|
|
|
|
|
let mut todo_timeline_stack = Vec::new(); |
|
|
|
|
|
|
|
while let Some(prev_event_id) = todo_outlier_stack.pop() { |
|
|
|
|
|
|
|
if let Some((pdu, Some(json))) = fetch_and_handle_outliers( |
|
|
|
|
|
|
|
db, |
|
|
|
|
|
|
|
origin, |
|
|
|
|
|
|
|
&[prev_event_id], |
|
|
|
|
|
|
|
&create_event, |
|
|
|
|
|
|
|
&room_id, |
|
|
|
|
|
|
|
pub_key_map, |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
.await.pop() { |
|
|
|
|
|
|
|
todo_timeline_stack.push((pdu, json)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while let Some(prev) = todo_timeline_stack.pop() { |
|
|
|
|
|
|
|
upgrade_outlier_to_timeline_pdu(prev.0, prev.1, &create_event, origin, db, room_id, pub_key_map).await?; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, db, room_id, pub_key_map).await |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fn handle_outlier_pdu<'a>( |
|
|
|
|
|
|
|
origin: &'a ServerName, |
|
|
|
|
|
|
|
create_event: &'a PduEvent, |
|
|
|
|
|
|
|
event_id: &'a EventId, |
|
|
|
|
|
|
|
room_id: &'a RoomId, |
|
|
|
|
|
|
|
value: BTreeMap<String, CanonicalJsonValue>, |
|
|
|
|
|
|
|
db: &'a Database, |
|
|
|
|
|
|
|
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
|
|
|
|
) -> AsyncRecursiveType<'a, StdResult<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> { |
|
|
|
|
|
|
|
Box::pin(async move { |
|
|
|
|
|
|
|
let start_time = Instant::now(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
|
|
|
|
|
|
|
|
|
|
|
|
// We go through all the signatures we see on the value and fetch the corresponding signing
|
|
|
|
// We go through all the signatures we see on the value and fetch the corresponding signing
|
|
|
|
// keys
|
|
|
|
// keys
|
|
|
|
fetch_required_signing_keys(&value, &pub_key_map, db) |
|
|
|
fetch_required_signing_keys(&value, &pub_key_map, db) |
|
|
|
@ -873,11 +927,6 @@ pub fn handle_incoming_pdu<'a>( |
|
|
|
|
|
|
|
|
|
|
|
// 2. Check signatures, otherwise drop
|
|
|
|
// 2. Check signatures, otherwise drop
|
|
|
|
// 3. check content hash, redact if doesn't match
|
|
|
|
// 3. check content hash, redact if doesn't match
|
|
|
|
let create_event = db |
|
|
|
|
|
|
|
.rooms |
|
|
|
|
|
|
|
.room_state_get(&room_id, &EventType::RoomCreate, "") |
|
|
|
|
|
|
|
.map_err(|_| "Failed to ask database for event.".to_owned())? |
|
|
|
|
|
|
|
.ok_or_else(|| "Failed to find create event in db.".to_owned())?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let create_event_content = |
|
|
|
let create_event_content = |
|
|
|
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) |
|
|
|
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) |
|
|
|
@ -924,13 +973,13 @@ pub fn handle_incoming_pdu<'a>( |
|
|
|
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
|
|
|
|
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
|
|
|
|
// EDIT: Step 5 is not applied anymore because it failed too often
|
|
|
|
// EDIT: Step 5 is not applied anymore because it failed too often
|
|
|
|
debug!("Fetching auth events for {}", incoming_pdu.event_id); |
|
|
|
debug!("Fetching auth events for {}", incoming_pdu.event_id); |
|
|
|
fetch_and_handle_events( |
|
|
|
fetch_and_handle_outliers( |
|
|
|
db, |
|
|
|
db, |
|
|
|
origin, |
|
|
|
origin, |
|
|
|
&incoming_pdu.auth_events, |
|
|
|
&incoming_pdu.auth_events, |
|
|
|
|
|
|
|
&create_event, |
|
|
|
&room_id, |
|
|
|
&room_id, |
|
|
|
pub_key_map, |
|
|
|
pub_key_map, |
|
|
|
false, |
|
|
|
|
|
|
|
) |
|
|
|
) |
|
|
|
.await; |
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
|
|
@ -1013,37 +1062,20 @@ pub fn handle_incoming_pdu<'a>( |
|
|
|
.map_err(|_| "Failed to add pdu as outlier.".to_owned())?; |
|
|
|
.map_err(|_| "Failed to add pdu as outlier.".to_owned())?; |
|
|
|
debug!("Added pdu as outlier."); |
|
|
|
debug!("Added pdu as outlier."); |
|
|
|
|
|
|
|
|
|
|
|
// 8. if not timeline event: stop
|
|
|
|
Ok((incoming_pdu,val)) |
|
|
|
if !is_timeline_event |
|
|
|
}) |
|
|
|
|| incoming_pdu.origin_server_ts |
|
|
|
|
|
|
|
< db.rooms |
|
|
|
|
|
|
|
.first_pdu_in_room(&room_id) |
|
|
|
|
|
|
|
.map_err(|_| "Error loading first room event.".to_owned())? |
|
|
|
|
|
|
|
.expect("Room exists") |
|
|
|
|
|
|
|
.origin_server_ts |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
let elapsed = start_time.elapsed(); |
|
|
|
|
|
|
|
warn!( |
|
|
|
|
|
|
|
"Handling outlier event {} took {}m{}s", |
|
|
|
|
|
|
|
event_id, |
|
|
|
|
|
|
|
elapsed.as_secs() / 60, |
|
|
|
|
|
|
|
elapsed.as_secs() % 60 |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
return Ok(None); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// TODO: make not recursive
|
|
|
|
} |
|
|
|
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
|
|
|
|
|
|
|
fetch_and_handle_events( |
|
|
|
|
|
|
|
db, |
|
|
|
|
|
|
|
origin, |
|
|
|
|
|
|
|
&incoming_pdu.prev_events, |
|
|
|
|
|
|
|
&room_id, |
|
|
|
|
|
|
|
pub_key_map, |
|
|
|
|
|
|
|
true, |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
|
|
|
|
incoming_pdu: Arc<PduEvent>, |
|
|
|
|
|
|
|
val: BTreeMap<String, CanonicalJsonValue>, |
|
|
|
|
|
|
|
create_event: &PduEvent, |
|
|
|
|
|
|
|
origin: &ServerName, |
|
|
|
|
|
|
|
db: &Database, |
|
|
|
|
|
|
|
room_id: &RoomId, |
|
|
|
|
|
|
|
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
|
|
|
|
) -> StdResult<Option<Vec<u8>>, String> { |
|
|
|
// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
|
|
|
|
// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
|
|
|
|
// doing all the checks in this list starting at 1. These are not timeline events.
|
|
|
|
// doing all the checks in this list starting at 1. These are not timeline events.
|
|
|
|
|
|
|
|
|
|
|
|
@ -1065,17 +1097,17 @@ pub fn handle_incoming_pdu<'a>( |
|
|
|
|
|
|
|
|
|
|
|
if let Some(Ok(state)) = state { |
|
|
|
if let Some(Ok(state)) = state { |
|
|
|
warn!("Using cached state"); |
|
|
|
warn!("Using cached state"); |
|
|
|
let mut state = fetch_and_handle_events( |
|
|
|
let mut state = fetch_and_handle_outliers( |
|
|
|
db, |
|
|
|
db, |
|
|
|
origin, |
|
|
|
origin, |
|
|
|
&state.into_iter().collect::<Vec<_>>(), |
|
|
|
&state.into_iter().collect::<Vec<_>>(), |
|
|
|
|
|
|
|
&create_event, |
|
|
|
&room_id, |
|
|
|
&room_id, |
|
|
|
pub_key_map, |
|
|
|
pub_key_map, |
|
|
|
false, |
|
|
|
|
|
|
|
) |
|
|
|
) |
|
|
|
.await |
|
|
|
.await |
|
|
|
.into_iter() |
|
|
|
.into_iter() |
|
|
|
.map(|pdu| { |
|
|
|
.map(|(pdu,_)| { |
|
|
|
( |
|
|
|
( |
|
|
|
( |
|
|
|
( |
|
|
|
pdu.kind.clone(), |
|
|
|
pdu.kind.clone(), |
|
|
|
@ -1119,18 +1151,18 @@ pub fn handle_incoming_pdu<'a>( |
|
|
|
{ |
|
|
|
{ |
|
|
|
Ok(res) => { |
|
|
|
Ok(res) => { |
|
|
|
debug!("Fetching state events at event."); |
|
|
|
debug!("Fetching state events at event."); |
|
|
|
let state_vec = fetch_and_handle_events( |
|
|
|
let state_vec = fetch_and_handle_outliers( |
|
|
|
&db, |
|
|
|
&db, |
|
|
|
origin, |
|
|
|
origin, |
|
|
|
&res.pdu_ids, |
|
|
|
&res.pdu_ids, |
|
|
|
|
|
|
|
&create_event, |
|
|
|
&room_id, |
|
|
|
&room_id, |
|
|
|
pub_key_map, |
|
|
|
pub_key_map, |
|
|
|
false, |
|
|
|
|
|
|
|
) |
|
|
|
) |
|
|
|
.await; |
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
|
|
let mut state = HashMap::new(); |
|
|
|
let mut state = HashMap::new(); |
|
|
|
for pdu in state_vec { |
|
|
|
for (pdu, _) in state_vec { |
|
|
|
match state.entry((pdu.kind.clone(), pdu.state_key.clone().ok_or_else(|| "Found non-state pdu in state events.".to_owned())?)) { |
|
|
|
match state.entry((pdu.kind.clone(), pdu.state_key.clone().ok_or_else(|| "Found non-state pdu in state events.".to_owned())?)) { |
|
|
|
Entry::Vacant(v) => { |
|
|
|
Entry::Vacant(v) => { |
|
|
|
v.insert(pdu); |
|
|
|
v.insert(pdu); |
|
|
|
@ -1153,13 +1185,13 @@ pub fn handle_incoming_pdu<'a>( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
debug!("Fetching auth chain events at event."); |
|
|
|
debug!("Fetching auth chain events at event."); |
|
|
|
fetch_and_handle_events( |
|
|
|
fetch_and_handle_outliers( |
|
|
|
&db, |
|
|
|
&db, |
|
|
|
origin, |
|
|
|
origin, |
|
|
|
&res.auth_chain_ids, |
|
|
|
&res.auth_chain_ids, |
|
|
|
|
|
|
|
&create_event, |
|
|
|
&room_id, |
|
|
|
&room_id, |
|
|
|
pub_key_map, |
|
|
|
pub_key_map, |
|
|
|
false, |
|
|
|
|
|
|
|
) |
|
|
|
) |
|
|
|
.await; |
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
|
|
@ -1175,6 +1207,28 @@ pub fn handle_incoming_pdu<'a>( |
|
|
|
state_at_incoming_event.expect("we always set this to some above"); |
|
|
|
state_at_incoming_event.expect("we always set this to some above"); |
|
|
|
|
|
|
|
|
|
|
|
// 11. Check the auth of the event passes based on the state of the event
|
|
|
|
// 11. Check the auth of the event passes based on the state of the event
|
|
|
|
|
|
|
|
let create_event_content = |
|
|
|
|
|
|
|
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) |
|
|
|
|
|
|
|
.expect("Raw::from_value always works.") |
|
|
|
|
|
|
|
.deserialize() |
|
|
|
|
|
|
|
.map_err(|_| "Invalid PowerLevels event in db.".to_owned())?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let room_version_id = &create_event_content.room_version; |
|
|
|
|
|
|
|
let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If the previous event was the create event special rules apply
|
|
|
|
|
|
|
|
let previous_create = if incoming_pdu.auth_events.len() == 1 |
|
|
|
|
|
|
|
&& incoming_pdu.prev_events == incoming_pdu.auth_events |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
db.rooms |
|
|
|
|
|
|
|
.get_pdu(&incoming_pdu.auth_events[0]) |
|
|
|
|
|
|
|
.map_err(|e| e.to_string())? |
|
|
|
|
|
|
|
.filter(|maybe_create| **maybe_create == *create_event) |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
None |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if !state_res::event_auth::auth_check( |
|
|
|
if !state_res::event_auth::auth_check( |
|
|
|
&room_version, |
|
|
|
&room_version, |
|
|
|
&incoming_pdu, |
|
|
|
&incoming_pdu, |
|
|
|
@ -1396,34 +1450,27 @@ pub fn handle_incoming_pdu<'a>( |
|
|
|
|
|
|
|
|
|
|
|
// Event has passed all auth/stateres checks
|
|
|
|
// Event has passed all auth/stateres checks
|
|
|
|
drop(state_lock); |
|
|
|
drop(state_lock); |
|
|
|
|
|
|
|
|
|
|
|
let elapsed = start_time.elapsed(); |
|
|
|
|
|
|
|
warn!( |
|
|
|
|
|
|
|
"Handling timeline event {} took {}m{}s", |
|
|
|
|
|
|
|
event_id, |
|
|
|
|
|
|
|
elapsed.as_secs() / 60, |
|
|
|
|
|
|
|
elapsed.as_secs() % 60 |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
Ok(pdu_id) |
|
|
|
Ok(pdu_id) |
|
|
|
}) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
|
|
|
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
|
|
|
/// it is appended to the outliers Tree.
|
|
|
|
/// it is appended to the outliers Tree.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
|
|
|
|
/// Returns pdu and if we fetched it over federation the raw json.
|
|
|
|
|
|
|
|
///
|
|
|
|
/// a. Look in the main timeline (pduid_pdu tree)
|
|
|
|
/// a. Look in the main timeline (pduid_pdu tree)
|
|
|
|
/// b. Look at outlier pdu tree
|
|
|
|
/// b. Look at outlier pdu tree
|
|
|
|
/// c. Ask origin server over federation
|
|
|
|
/// c. Ask origin server over federation
|
|
|
|
/// d. TODO: Ask other servers over federation?
|
|
|
|
/// d. TODO: Ask other servers over federation?
|
|
|
|
//#[tracing::instrument(skip(db, key_map, auth_cache))]
|
|
|
|
//#[tracing::instrument(skip(db, key_map, auth_cache))]
|
|
|
|
pub(crate) fn fetch_and_handle_events<'a>( |
|
|
|
pub(crate) fn fetch_and_handle_outliers<'a>( |
|
|
|
db: &'a Database, |
|
|
|
db: &'a Database, |
|
|
|
origin: &'a ServerName, |
|
|
|
origin: &'a ServerName, |
|
|
|
events: &'a [EventId], |
|
|
|
events: &'a [EventId], |
|
|
|
|
|
|
|
create_event: &'a PduEvent, |
|
|
|
room_id: &'a RoomId, |
|
|
|
room_id: &'a RoomId, |
|
|
|
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
are_timeline_events: bool, |
|
|
|
) -> AsyncRecursiveType<'a, Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>> { |
|
|
|
) -> AsyncRecursiveType<'a, Vec<Arc<PduEvent>>> { |
|
|
|
|
|
|
|
Box::pin(async move { |
|
|
|
Box::pin(async move { |
|
|
|
let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { |
|
|
|
let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { |
|
|
|
Entry::Vacant(e) => { |
|
|
|
Entry::Vacant(e) => { |
|
|
|
@ -1449,16 +1496,12 @@ pub(crate) fn fetch_and_handle_events<'a>( |
|
|
|
|
|
|
|
|
|
|
|
// a. Look in the main timeline (pduid_pdu tree)
|
|
|
|
// a. Look in the main timeline (pduid_pdu tree)
|
|
|
|
// b. Look at outlier pdu tree
|
|
|
|
// b. Look at outlier pdu tree
|
|
|
|
// (get_pdu checks both)
|
|
|
|
// (get_pdu_json checks both)
|
|
|
|
let local_pdu = if are_timeline_events { |
|
|
|
let local_pdu = db.rooms.get_pdu(&id); |
|
|
|
db.rooms.get_non_outlier_pdu(&id).map(|o| o.map(Arc::new)) |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
db.rooms.get_pdu(&id) |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
let pdu = match local_pdu { |
|
|
|
let pdu = match local_pdu { |
|
|
|
Ok(Some(pdu)) => { |
|
|
|
Ok(Some(pdu)) => { |
|
|
|
trace!("Found {} in db", id); |
|
|
|
trace!("Found {} in db", id); |
|
|
|
pdu |
|
|
|
(pdu, None) |
|
|
|
} |
|
|
|
} |
|
|
|
Ok(None) => { |
|
|
|
Ok(None) => { |
|
|
|
// c. Ask origin server over federation
|
|
|
|
// c. Ask origin server over federation
|
|
|
|
@ -1474,39 +1517,26 @@ pub(crate) fn fetch_and_handle_events<'a>( |
|
|
|
{ |
|
|
|
{ |
|
|
|
Ok(res) => { |
|
|
|
Ok(res) => { |
|
|
|
debug!("Got {} over federation", id); |
|
|
|
debug!("Got {} over federation", id); |
|
|
|
let (event_id, mut value) = |
|
|
|
let (event_id, value) = |
|
|
|
match crate::pdu::gen_event_id_canonical_json(&res.pdu) { |
|
|
|
match crate::pdu::gen_event_id_canonical_json(&res.pdu) { |
|
|
|
Ok(t) => t, |
|
|
|
Ok(t) => t, |
|
|
|
Err(_) => continue, |
|
|
|
Err(_) => continue, |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
// This will also fetch the auth chain
|
|
|
|
// This will also fetch the auth chain
|
|
|
|
match handle_incoming_pdu( |
|
|
|
match handle_outlier_pdu( |
|
|
|
origin, |
|
|
|
origin, |
|
|
|
|
|
|
|
create_event, |
|
|
|
&event_id, |
|
|
|
&event_id, |
|
|
|
&room_id, |
|
|
|
&room_id, |
|
|
|
value.clone(), |
|
|
|
value.clone(), |
|
|
|
are_timeline_events, |
|
|
|
|
|
|
|
db, |
|
|
|
db, |
|
|
|
pub_key_map, |
|
|
|
pub_key_map, |
|
|
|
) |
|
|
|
) |
|
|
|
.await |
|
|
|
.await |
|
|
|
{ |
|
|
|
{ |
|
|
|
Ok(_) => { |
|
|
|
Ok((pdu, json)) => { |
|
|
|
value.insert( |
|
|
|
(pdu, Some(json)) |
|
|
|
"event_id".to_owned(), |
|
|
|
|
|
|
|
CanonicalJsonValue::String(event_id.into()), |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Arc::new( |
|
|
|
|
|
|
|
serde_json::from_value( |
|
|
|
|
|
|
|
serde_json::to_value(value) |
|
|
|
|
|
|
|
.expect("canonicaljsonobject is valid value"), |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
.expect( |
|
|
|
|
|
|
|
"This is possible because handle_incoming_pdu worked", |
|
|
|
|
|
|
|
), |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
Err(e) => { |
|
|
|
Err(e) => { |
|
|
|
warn!("Authentication of event {} failed: {:?}", id, e); |
|
|
|
warn!("Authentication of event {} failed: {:?}", id, e); |
|
|
|
|