|
|
|
@ -1188,48 +1188,47 @@ pub(crate) fn fetch_and_handle_events<'a>( |
|
|
|
let mut pdus = vec![]; |
|
|
|
let mut pdus = vec![]; |
|
|
|
for id in events { |
|
|
|
for id in events { |
|
|
|
// a. Look at auth cache
|
|
|
|
// a. Look at auth cache
|
|
|
|
let pdu = |
|
|
|
let pdu = match auth_cache.get(id) { |
|
|
|
match auth_cache.get(id) { |
|
|
|
Some(pdu) => { |
|
|
|
|
|
|
|
debug!("Found {} in cache", id); |
|
|
|
|
|
|
|
// We already have the auth chain for events in cache
|
|
|
|
|
|
|
|
pdu.clone() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// b. Look in the main timeline (pduid_pdu tree)
|
|
|
|
|
|
|
|
// c. Look at outlier pdu tree
|
|
|
|
|
|
|
|
// (get_pdu checks both)
|
|
|
|
|
|
|
|
None => match db.rooms.get_pdu(&id)? { |
|
|
|
Some(pdu) => { |
|
|
|
Some(pdu) => { |
|
|
|
debug!("Found {} in cache", id); |
|
|
|
debug!("Found {} in db", id); |
|
|
|
// We already have the auth chain for events in cache
|
|
|
|
// We need to fetch the auth chain
|
|
|
|
pdu.clone() |
|
|
|
let _ = fetch_and_handle_events( |
|
|
|
|
|
|
|
db, |
|
|
|
|
|
|
|
origin, |
|
|
|
|
|
|
|
&pdu.auth_events, |
|
|
|
|
|
|
|
pub_key_map, |
|
|
|
|
|
|
|
auth_cache, |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
.await?; |
|
|
|
|
|
|
|
Arc::new(pdu) |
|
|
|
} |
|
|
|
} |
|
|
|
// b. Look in the main timeline (pduid_pdu tree)
|
|
|
|
None => { |
|
|
|
// c. Look at outlier pdu tree
|
|
|
|
// d. Ask origin server over federation
|
|
|
|
// (get_pdu checks both)
|
|
|
|
debug!("Fetching {} over federation.", id); |
|
|
|
None => match db.rooms.get_pdu(&id)? { |
|
|
|
match db |
|
|
|
Some(pdu) => { |
|
|
|
.sending |
|
|
|
debug!("Found {} in db", id); |
|
|
|
.send_federation_request( |
|
|
|
// We need to fetch the auth chain
|
|
|
|
&db.globals, |
|
|
|
let _ = fetch_and_handle_events( |
|
|
|
|
|
|
|
db, |
|
|
|
|
|
|
|
origin, |
|
|
|
origin, |
|
|
|
&pdu.auth_events, |
|
|
|
get_event::v1::Request { event_id: &id }, |
|
|
|
pub_key_map, |
|
|
|
|
|
|
|
auth_cache, |
|
|
|
|
|
|
|
) |
|
|
|
) |
|
|
|
.await?; |
|
|
|
.await |
|
|
|
Arc::new(pdu) |
|
|
|
{ |
|
|
|
} |
|
|
|
Ok(res) => { |
|
|
|
None => { |
|
|
|
debug!("Got {} over federation: {:?}", id, res); |
|
|
|
// d. Ask origin server over federation
|
|
|
|
let (event_id, mut value) = |
|
|
|
debug!("Fetching {} over federation.", id); |
|
|
|
crate::pdu::gen_event_id_canonical_json(&res.pdu)?; |
|
|
|
match db |
|
|
|
// This will also fetch the auth chain
|
|
|
|
.sending |
|
|
|
match handle_incoming_pdu( |
|
|
|
.send_federation_request( |
|
|
|
|
|
|
|
&db.globals, |
|
|
|
|
|
|
|
origin, |
|
|
|
|
|
|
|
get_event::v1::Request { event_id: &id }, |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
.await |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
Ok(res) => { |
|
|
|
|
|
|
|
debug!("Got {} over federation: {:?}", id, res); |
|
|
|
|
|
|
|
let (event_id, value) = |
|
|
|
|
|
|
|
crate::pdu::gen_event_id_canonical_json(&res.pdu)?; |
|
|
|
|
|
|
|
// This will also fetch the auth chain
|
|
|
|
|
|
|
|
match handle_incoming_pdu( |
|
|
|
|
|
|
|
origin, |
|
|
|
origin, |
|
|
|
&event_id, |
|
|
|
&event_id, |
|
|
|
value.clone(), |
|
|
|
value.clone(), |
|
|
|
@ -1240,25 +1239,31 @@ pub(crate) fn fetch_and_handle_events<'a>( |
|
|
|
) |
|
|
|
) |
|
|
|
.await |
|
|
|
.await |
|
|
|
{ |
|
|
|
{ |
|
|
|
Ok(_) => Arc::new(serde_json::from_value( |
|
|
|
Ok(_) => { |
|
|
|
serde_json::to_value(value) |
|
|
|
value.insert( |
|
|
|
.expect("canonicaljsonobject is valid value"), |
|
|
|
"event_id".to_owned(), |
|
|
|
) |
|
|
|
to_canonical_value(&event_id) |
|
|
|
.expect("This is possible because handle_incoming_pdu worked")), |
|
|
|
.expect("EventId is a valid CanonicalJsonValue"), |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Arc::new(serde_json::from_value( |
|
|
|
|
|
|
|
serde_json::to_value(value).expect("canonicaljsonobject is valid value"), |
|
|
|
|
|
|
|
).expect("This is possible because handle_incoming_pdu worked")) |
|
|
|
|
|
|
|
} |
|
|
|
Err(e) => { |
|
|
|
Err(e) => { |
|
|
|
warn!("Authentication of event {} failed: {:?}", id, e); |
|
|
|
warn!("Authentication of event {} failed: {:?}", id, e); |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
Err(_) => { |
|
|
|
Err(_) => { |
|
|
|
warn!("Failed to fetch event: {}", id); |
|
|
|
warn!("Failed to fetch event: {}", id); |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}, |
|
|
|
} |
|
|
|
}; |
|
|
|
}, |
|
|
|
|
|
|
|
}; |
|
|
|
auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone()); |
|
|
|
auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone()); |
|
|
|
pdus.push(pdu); |
|
|
|
pdus.push(pdu); |
|
|
|
} |
|
|
|
} |
|
|
|
|