|
|
|
@ -64,6 +64,7 @@ use std::{ |
|
|
|
future::Future, |
|
|
|
future::Future, |
|
|
|
mem, |
|
|
|
mem, |
|
|
|
net::{IpAddr, SocketAddr}, |
|
|
|
net::{IpAddr, SocketAddr}, |
|
|
|
|
|
|
|
ops::Deref, |
|
|
|
pin::Pin, |
|
|
|
pin::Pin, |
|
|
|
sync::{Arc, RwLock, RwLockWriteGuard}, |
|
|
|
sync::{Arc, RwLock, RwLockWriteGuard}, |
|
|
|
time::{Duration, Instant, SystemTime}, |
|
|
|
time::{Duration, Instant, SystemTime}, |
|
|
|
@ -396,10 +397,7 @@ async fn find_actual_destination( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if let Some(port) = force_port { |
|
|
|
if let Some(port) = force_port { |
|
|
|
FedDest::Named( |
|
|
|
FedDest::Named(delegated_hostname, format!(":{}", port)) |
|
|
|
delegated_hostname, |
|
|
|
|
|
|
|
format!(":{}", port.to_string()), |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
} else { |
|
|
|
} else { |
|
|
|
add_port_to_hostname(&delegated_hostname) |
|
|
|
add_port_to_hostname(&delegated_hostname) |
|
|
|
} |
|
|
|
} |
|
|
|
@ -432,10 +430,7 @@ async fn find_actual_destination( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if let Some(port) = force_port { |
|
|
|
if let Some(port) = force_port { |
|
|
|
FedDest::Named( |
|
|
|
FedDest::Named(hostname.clone(), format!(":{}", port)) |
|
|
|
hostname.clone(), |
|
|
|
|
|
|
|
format!(":{}", port.to_string()), |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
} else { |
|
|
|
} else { |
|
|
|
add_port_to_hostname(&hostname) |
|
|
|
add_port_to_hostname(&hostname) |
|
|
|
} |
|
|
|
} |
|
|
|
@ -550,12 +545,11 @@ pub fn get_server_keys_route(db: DatabaseGuard) -> Json<String> { |
|
|
|
return Json("Federation is disabled.".to_owned()); |
|
|
|
return Json("Federation is disabled.".to_owned()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
let mut verify_keys = BTreeMap::new(); |
|
|
|
let mut verify_keys: BTreeMap<Box<ServerSigningKeyId>, VerifyKey> = BTreeMap::new(); |
|
|
|
verify_keys.insert( |
|
|
|
verify_keys.insert( |
|
|
|
ServerSigningKeyId::try_from( |
|
|
|
format!("ed25519:{}", db.globals.keypair().version()) |
|
|
|
format!("ed25519:{}", db.globals.keypair().version()).as_str(), |
|
|
|
.try_into() |
|
|
|
) |
|
|
|
.expect("found invalid server signing keys in DB"), |
|
|
|
.expect("found invalid server signing keys in DB"), |
|
|
|
|
|
|
|
VerifyKey { |
|
|
|
VerifyKey { |
|
|
|
key: base64::encode_config(db.globals.keypair().public_key(), base64::STANDARD_NO_PAD), |
|
|
|
key: base64::encode_config(db.globals.keypair().public_key(), base64::STANDARD_NO_PAD), |
|
|
|
}, |
|
|
|
}, |
|
|
|
@ -725,7 +719,7 @@ pub async fn send_transaction_message_route( |
|
|
|
|
|
|
|
|
|
|
|
for pdu in &body.pdus { |
|
|
|
for pdu in &body.pdus { |
|
|
|
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
|
|
|
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
|
|
|
let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(pdu, &db) { |
|
|
|
let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(pdu) { |
|
|
|
Ok(t) => t, |
|
|
|
Ok(t) => t, |
|
|
|
Err(_) => { |
|
|
|
Err(_) => { |
|
|
|
// Event could not be converted to canonical json
|
|
|
|
// Event could not be converted to canonical json
|
|
|
|
@ -736,7 +730,7 @@ pub async fn send_transaction_message_route( |
|
|
|
// 0. Check the server is in the room
|
|
|
|
// 0. Check the server is in the room
|
|
|
|
let room_id = match value |
|
|
|
let room_id = match value |
|
|
|
.get("room_id") |
|
|
|
.get("room_id") |
|
|
|
.and_then(|id| RoomId::try_from(id.as_str()?).ok()) |
|
|
|
.and_then(|id| RoomId::parse(id.as_str()?).ok()) |
|
|
|
{ |
|
|
|
{ |
|
|
|
Some(id) => id, |
|
|
|
Some(id) => id, |
|
|
|
None => { |
|
|
|
None => { |
|
|
|
@ -1001,14 +995,9 @@ pub(crate) async fn handle_incoming_pdu<'a>( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
|
|
|
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
|
|
|
let mut graph = HashMap::new(); |
|
|
|
let mut graph: HashMap<Arc<EventId>, _> = HashMap::new(); |
|
|
|
let mut eventid_info = HashMap::new(); |
|
|
|
let mut eventid_info = HashMap::new(); |
|
|
|
let mut todo_outlier_stack: Vec<_> = incoming_pdu |
|
|
|
let mut todo_outlier_stack: Vec<Arc<EventId>> = incoming_pdu.prev_events.clone(); |
|
|
|
.prev_events |
|
|
|
|
|
|
|
.iter() |
|
|
|
|
|
|
|
.cloned() |
|
|
|
|
|
|
|
.map(Arc::new) |
|
|
|
|
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut amount = 0; |
|
|
|
let mut amount = 0; |
|
|
|
|
|
|
|
|
|
|
|
@ -1027,7 +1016,7 @@ pub(crate) async fn handle_incoming_pdu<'a>( |
|
|
|
if amount > 100 { |
|
|
|
if amount > 100 { |
|
|
|
// Max limit reached
|
|
|
|
// Max limit reached
|
|
|
|
warn!("Max prev event limit reached!"); |
|
|
|
warn!("Max prev event limit reached!"); |
|
|
|
graph.insert((*prev_event_id).clone(), HashSet::new()); |
|
|
|
graph.insert(prev_event_id.clone(), HashSet::new()); |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -1038,27 +1027,27 @@ pub(crate) async fn handle_incoming_pdu<'a>( |
|
|
|
amount += 1; |
|
|
|
amount += 1; |
|
|
|
for prev_prev in &pdu.prev_events { |
|
|
|
for prev_prev in &pdu.prev_events { |
|
|
|
if !graph.contains_key(prev_prev) { |
|
|
|
if !graph.contains_key(prev_prev) { |
|
|
|
todo_outlier_stack.push(dbg!(Arc::new(prev_prev.clone()))); |
|
|
|
todo_outlier_stack.push(dbg!(prev_prev.clone())); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
graph.insert( |
|
|
|
graph.insert( |
|
|
|
(*prev_event_id).clone(), |
|
|
|
prev_event_id.clone(), |
|
|
|
pdu.prev_events.iter().cloned().collect(), |
|
|
|
pdu.prev_events.iter().cloned().collect(), |
|
|
|
); |
|
|
|
); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
// Time based check failed
|
|
|
|
// Time based check failed
|
|
|
|
graph.insert((*prev_event_id).clone(), HashSet::new()); |
|
|
|
graph.insert(prev_event_id.clone(), HashSet::new()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
eventid_info.insert(prev_event_id.clone(), (pdu, json)); |
|
|
|
eventid_info.insert(prev_event_id.clone(), (pdu, json)); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
// Get json failed
|
|
|
|
// Get json failed
|
|
|
|
graph.insert((*prev_event_id).clone(), HashSet::new()); |
|
|
|
graph.insert(prev_event_id.clone(), HashSet::new()); |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
// Fetch and handle failed
|
|
|
|
// Fetch and handle failed
|
|
|
|
graph.insert((*prev_event_id).clone(), HashSet::new()); |
|
|
|
graph.insert(prev_event_id.clone(), HashSet::new()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -1074,7 +1063,6 @@ pub(crate) async fn handle_incoming_pdu<'a>( |
|
|
|
.get(event_id) |
|
|
|
.get(event_id) |
|
|
|
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts), |
|
|
|
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts), |
|
|
|
), |
|
|
|
), |
|
|
|
ruma::event_id!("$notimportant"), |
|
|
|
|
|
|
|
)) |
|
|
|
)) |
|
|
|
}) |
|
|
|
}) |
|
|
|
.map_err(|_| "Error sorting prev events".to_owned())?; |
|
|
|
.map_err(|_| "Error sorting prev events".to_owned())?; |
|
|
|
@ -1084,7 +1072,7 @@ pub(crate) async fn handle_incoming_pdu<'a>( |
|
|
|
if errors >= 5 { |
|
|
|
if errors >= 5 { |
|
|
|
break; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
if let Some((pdu, json)) = eventid_info.remove(&prev_id) { |
|
|
|
if let Some((pdu, json)) = eventid_info.remove(&*prev_id) { |
|
|
|
if pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { |
|
|
|
if pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
@ -1200,8 +1188,7 @@ fn handle_outlier_pdu<'a>( |
|
|
|
&incoming_pdu |
|
|
|
&incoming_pdu |
|
|
|
.auth_events |
|
|
|
.auth_events |
|
|
|
.iter() |
|
|
|
.iter() |
|
|
|
.cloned() |
|
|
|
.map(|x| Arc::from(&**x)) |
|
|
|
.map(Arc::new) |
|
|
|
|
|
|
|
.collect::<Vec<_>>(), |
|
|
|
.collect::<Vec<_>>(), |
|
|
|
create_event, |
|
|
|
create_event, |
|
|
|
room_id, |
|
|
|
room_id, |
|
|
|
@ -1331,7 +1318,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
let mut state_at_incoming_event = None; |
|
|
|
let mut state_at_incoming_event = None; |
|
|
|
|
|
|
|
|
|
|
|
if incoming_pdu.prev_events.len() == 1 { |
|
|
|
if incoming_pdu.prev_events.len() == 1 { |
|
|
|
let prev_event = &incoming_pdu.prev_events[0]; |
|
|
|
let prev_event = &*incoming_pdu.prev_events[0]; |
|
|
|
let prev_event_sstatehash = db |
|
|
|
let prev_event_sstatehash = db |
|
|
|
.rooms |
|
|
|
.rooms |
|
|
|
.pdu_shortstatehash(prev_event) |
|
|
|
.pdu_shortstatehash(prev_event) |
|
|
|
@ -1353,7 +1340,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
.get_or_create_shortstatekey(&prev_pdu.kind, state_key, &db.globals) |
|
|
|
.get_or_create_shortstatekey(&prev_pdu.kind, state_key, &db.globals) |
|
|
|
.map_err(|_| "Failed to create shortstatekey.".to_owned())?; |
|
|
|
.map_err(|_| "Failed to create shortstatekey.".to_owned())?; |
|
|
|
|
|
|
|
|
|
|
|
state.insert(shortstatekey, Arc::new(prev_event.clone())); |
|
|
|
state.insert(shortstatekey, Arc::from(prev_event)); |
|
|
|
// Now it's the state after the pdu
|
|
|
|
// Now it's the state after the pdu
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -1397,7 +1384,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
.rooms |
|
|
|
.rooms |
|
|
|
.get_or_create_shortstatekey(&prev_event.kind, state_key, &db.globals) |
|
|
|
.get_or_create_shortstatekey(&prev_event.kind, state_key, &db.globals) |
|
|
|
.map_err(|_| "Failed to create shortstatekey.".to_owned())?; |
|
|
|
.map_err(|_| "Failed to create shortstatekey.".to_owned())?; |
|
|
|
leaf_state.insert(shortstatekey, Arc::new(prev_event.event_id.clone())); |
|
|
|
leaf_state.insert(shortstatekey, Arc::from(&*prev_event.event_id)); |
|
|
|
// Now it's the state after the pdu
|
|
|
|
// Now it's the state after the pdu
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -1410,14 +1397,13 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
.get_statekey_from_short(k) |
|
|
|
.get_statekey_from_short(k) |
|
|
|
.map_err(|_| "Failed to get_statekey_from_short.".to_owned())?; |
|
|
|
.map_err(|_| "Failed to get_statekey_from_short.".to_owned())?; |
|
|
|
|
|
|
|
|
|
|
|
state.insert(k, (*id).clone()); |
|
|
|
state.insert(k, id.clone()); |
|
|
|
starting_events.push(id); |
|
|
|
starting_events.push(id); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
auth_chain_sets.push( |
|
|
|
auth_chain_sets.push( |
|
|
|
get_auth_chain(room_id, starting_events, db) |
|
|
|
get_auth_chain(room_id, starting_events, db) |
|
|
|
.map_err(|_| "Failed to load auth chain.".to_owned())? |
|
|
|
.map_err(|_| "Failed to load auth chain.".to_owned())? |
|
|
|
.map(|event_id| (*event_id).clone()) |
|
|
|
|
|
|
|
.collect(), |
|
|
|
.collect(), |
|
|
|
); |
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
@ -1444,7 +1430,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
.rooms |
|
|
|
.rooms |
|
|
|
.get_or_create_shortstatekey(&event_type, &state_key, &db.globals) |
|
|
|
.get_or_create_shortstatekey(&event_type, &state_key, &db.globals) |
|
|
|
.map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?; |
|
|
|
.map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?; |
|
|
|
Ok((shortstatekey, Arc::new(event_id))) |
|
|
|
Ok((shortstatekey, event_id)) |
|
|
|
}) |
|
|
|
}) |
|
|
|
.collect::<Result<_, String>>()?, |
|
|
|
.collect::<Result<_, String>>()?, |
|
|
|
), |
|
|
|
), |
|
|
|
@ -1479,8 +1465,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
origin, |
|
|
|
origin, |
|
|
|
&res.pdu_ids |
|
|
|
&res.pdu_ids |
|
|
|
.iter() |
|
|
|
.iter() |
|
|
|
.cloned() |
|
|
|
.map(|x| Arc::from(&**x)) |
|
|
|
.map(Arc::new) |
|
|
|
|
|
|
|
.collect::<Vec<_>>(), |
|
|
|
.collect::<Vec<_>>(), |
|
|
|
create_event, |
|
|
|
create_event, |
|
|
|
room_id, |
|
|
|
room_id, |
|
|
|
@ -1488,7 +1473,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
) |
|
|
|
) |
|
|
|
.await; |
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
|
|
let mut state = BTreeMap::new(); |
|
|
|
let mut state: BTreeMap<_, Arc<EventId>> = BTreeMap::new(); |
|
|
|
for (pdu, _) in state_vec { |
|
|
|
for (pdu, _) in state_vec { |
|
|
|
let state_key = pdu |
|
|
|
let state_key = pdu |
|
|
|
.state_key |
|
|
|
.state_key |
|
|
|
@ -1502,7 +1487,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
|
|
|
|
|
|
|
|
match state.entry(shortstatekey) { |
|
|
|
match state.entry(shortstatekey) { |
|
|
|
btree_map::Entry::Vacant(v) => { |
|
|
|
btree_map::Entry::Vacant(v) => { |
|
|
|
v.insert(Arc::new(pdu.event_id.clone())); |
|
|
|
v.insert(Arc::from(&*pdu.event_id)); |
|
|
|
} |
|
|
|
} |
|
|
|
btree_map::Entry::Occupied(_) => return Err( |
|
|
|
btree_map::Entry::Occupied(_) => return Err( |
|
|
|
"State event's type and state_key combination exists multiple times." |
|
|
|
"State event's type and state_key combination exists multiple times." |
|
|
|
@ -1577,7 +1562,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
.roomid_mutex_state |
|
|
|
.roomid_mutex_state |
|
|
|
.write() |
|
|
|
.write() |
|
|
|
.unwrap() |
|
|
|
.unwrap() |
|
|
|
.entry(room_id.clone()) |
|
|
|
.entry(room_id.to_owned()) |
|
|
|
.or_default(), |
|
|
|
.or_default(), |
|
|
|
); |
|
|
|
); |
|
|
|
let state_lock = mutex_state.lock().await; |
|
|
|
let state_lock = mutex_state.lock().await; |
|
|
|
@ -1647,7 +1632,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
db, |
|
|
|
db, |
|
|
|
&incoming_pdu, |
|
|
|
&incoming_pdu, |
|
|
|
val, |
|
|
|
val, |
|
|
|
extremities, |
|
|
|
extremities.iter().map(Deref::deref), |
|
|
|
state_ids_compressed, |
|
|
|
state_ids_compressed, |
|
|
|
soft_fail, |
|
|
|
soft_fail, |
|
|
|
&state_lock, |
|
|
|
&state_lock, |
|
|
|
@ -1715,7 +1700,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
.rooms |
|
|
|
.rooms |
|
|
|
.get_or_create_shortstatekey(&leaf_pdu.kind, state_key, &db.globals) |
|
|
|
.get_or_create_shortstatekey(&leaf_pdu.kind, state_key, &db.globals) |
|
|
|
.map_err(|_| "Failed to create shortstatekey.".to_owned())?; |
|
|
|
.map_err(|_| "Failed to create shortstatekey.".to_owned())?; |
|
|
|
leaf_state.insert(shortstatekey, Arc::new(leaf_pdu.event_id.clone())); |
|
|
|
leaf_state.insert(shortstatekey, Arc::from(&*leaf_pdu.event_id)); |
|
|
|
// Now it's the state after the pdu
|
|
|
|
// Now it's the state after the pdu
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -1730,7 +1715,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
.get_or_create_shortstatekey(&incoming_pdu.kind, state_key, &db.globals) |
|
|
|
.get_or_create_shortstatekey(&incoming_pdu.kind, state_key, &db.globals) |
|
|
|
.map_err(|_| "Failed to create shortstatekey.".to_owned())?; |
|
|
|
.map_err(|_| "Failed to create shortstatekey.".to_owned())?; |
|
|
|
|
|
|
|
|
|
|
|
state_after.insert(shortstatekey, Arc::new(incoming_pdu.event_id.clone())); |
|
|
|
state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id)); |
|
|
|
} |
|
|
|
} |
|
|
|
fork_states.push(state_after); |
|
|
|
fork_states.push(state_after); |
|
|
|
|
|
|
|
|
|
|
|
@ -1762,7 +1747,6 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
db, |
|
|
|
db, |
|
|
|
) |
|
|
|
) |
|
|
|
.map_err(|_| "Failed to load auth chain.".to_owned())? |
|
|
|
.map_err(|_| "Failed to load auth chain.".to_owned())? |
|
|
|
.map(|event_id| (*event_id).clone()) |
|
|
|
|
|
|
|
.collect(), |
|
|
|
.collect(), |
|
|
|
); |
|
|
|
); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -1771,11 +1755,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
.into_iter() |
|
|
|
.into_iter() |
|
|
|
.map(|map| { |
|
|
|
.map(|map| { |
|
|
|
map.into_iter() |
|
|
|
map.into_iter() |
|
|
|
.map(|(k, id)| { |
|
|
|
.map(|(k, id)| db.rooms.get_statekey_from_short(k).map(|k| (k, id))) |
|
|
|
db.rooms |
|
|
|
|
|
|
|
.get_statekey_from_short(k) |
|
|
|
|
|
|
|
.map(|k| (k, (*id).clone())) |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
.collect::<Result<StateMap<_>>>() |
|
|
|
.collect::<Result<StateMap<_>>>() |
|
|
|
}) |
|
|
|
}) |
|
|
|
.collect::<Result<_>>() |
|
|
|
.collect::<Result<_>>() |
|
|
|
@ -1832,7 +1812,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
db, |
|
|
|
db, |
|
|
|
&incoming_pdu, |
|
|
|
&incoming_pdu, |
|
|
|
val, |
|
|
|
val, |
|
|
|
extremities, |
|
|
|
extremities.iter().map(Deref::deref), |
|
|
|
state_ids_compressed, |
|
|
|
state_ids_compressed, |
|
|
|
soft_fail, |
|
|
|
soft_fail, |
|
|
|
&state_lock, |
|
|
|
&state_lock, |
|
|
|
@ -1874,7 +1854,8 @@ pub(crate) fn fetch_and_handle_outliers<'a>( |
|
|
|
|
|
|
|
|
|
|
|
let mut pdus = vec![]; |
|
|
|
let mut pdus = vec![]; |
|
|
|
for id in events { |
|
|
|
for id in events { |
|
|
|
if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(id) { |
|
|
|
if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(&**id) |
|
|
|
|
|
|
|
{ |
|
|
|
// Exponential backoff
|
|
|
|
// Exponential backoff
|
|
|
|
let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); |
|
|
|
let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); |
|
|
|
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { |
|
|
|
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { |
|
|
|
@ -1911,10 +1892,10 @@ pub(crate) fn fetch_and_handle_outliers<'a>( |
|
|
|
Ok(res) => { |
|
|
|
Ok(res) => { |
|
|
|
warn!("Got {} over federation", id); |
|
|
|
warn!("Got {} over federation", id); |
|
|
|
let (calculated_event_id, value) = |
|
|
|
let (calculated_event_id, value) = |
|
|
|
match crate::pdu::gen_event_id_canonical_json(&res.pdu, &db) { |
|
|
|
match crate::pdu::gen_event_id_canonical_json(&res.pdu) { |
|
|
|
Ok(t) => t, |
|
|
|
Ok(t) => t, |
|
|
|
Err(_) => { |
|
|
|
Err(_) => { |
|
|
|
back_off((**id).clone()); |
|
|
|
back_off((**id).to_owned()); |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
|
}; |
|
|
|
@ -1939,14 +1920,14 @@ pub(crate) fn fetch_and_handle_outliers<'a>( |
|
|
|
Ok((pdu, json)) => (pdu, Some(json)), |
|
|
|
Ok((pdu, json)) => (pdu, Some(json)), |
|
|
|
Err(e) => { |
|
|
|
Err(e) => { |
|
|
|
warn!("Authentication of event {} failed: {:?}", id, e); |
|
|
|
warn!("Authentication of event {} failed: {:?}", id, e); |
|
|
|
back_off((**id).clone()); |
|
|
|
back_off((**id).to_owned()); |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
Err(_) => { |
|
|
|
Err(_) => { |
|
|
|
warn!("Failed to fetch event: {}", id); |
|
|
|
warn!("Failed to fetch event: {}", id); |
|
|
|
back_off((**id).clone()); |
|
|
|
back_off((**id).to_owned()); |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -2124,11 +2105,11 @@ pub(crate) async fn fetch_signing_keys( |
|
|
|
/// Append the incoming event setting the state snapshot to the state from the
|
|
|
|
/// Append the incoming event setting the state snapshot to the state from the
|
|
|
|
/// server that sent the event.
|
|
|
|
/// server that sent the event.
|
|
|
|
#[tracing::instrument(skip(db, pdu, pdu_json, new_room_leaves, state_ids_compressed, _mutex_lock))] |
|
|
|
#[tracing::instrument(skip(db, pdu, pdu_json, new_room_leaves, state_ids_compressed, _mutex_lock))] |
|
|
|
fn append_incoming_pdu( |
|
|
|
fn append_incoming_pdu<'a>( |
|
|
|
db: &Database, |
|
|
|
db: &Database, |
|
|
|
pdu: &PduEvent, |
|
|
|
pdu: &PduEvent, |
|
|
|
pdu_json: CanonicalJsonObject, |
|
|
|
pdu_json: CanonicalJsonObject, |
|
|
|
new_room_leaves: HashSet<EventId>, |
|
|
|
new_room_leaves: impl IntoIterator<Item = &'a EventId> + Clone + Debug, |
|
|
|
state_ids_compressed: HashSet<CompressedStateEvent>, |
|
|
|
state_ids_compressed: HashSet<CompressedStateEvent>, |
|
|
|
soft_fail: bool, |
|
|
|
soft_fail: bool, |
|
|
|
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex
|
|
|
|
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex
|
|
|
|
@ -2145,19 +2126,11 @@ fn append_incoming_pdu( |
|
|
|
if soft_fail { |
|
|
|
if soft_fail { |
|
|
|
db.rooms |
|
|
|
db.rooms |
|
|
|
.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; |
|
|
|
.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; |
|
|
|
db.rooms.replace_pdu_leaves( |
|
|
|
db.rooms.replace_pdu_leaves(&pdu.room_id, new_room_leaves)?; |
|
|
|
&pdu.room_id, |
|
|
|
|
|
|
|
&new_room_leaves.into_iter().collect::<Vec<_>>(), |
|
|
|
|
|
|
|
)?; |
|
|
|
|
|
|
|
return Ok(None); |
|
|
|
return Ok(None); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
let pdu_id = db.rooms.append_pdu( |
|
|
|
let pdu_id = db.rooms.append_pdu(pdu, pdu_json, new_room_leaves, db)?; |
|
|
|
pdu, |
|
|
|
|
|
|
|
pdu_json, |
|
|
|
|
|
|
|
&new_room_leaves.into_iter().collect::<Vec<_>>(), |
|
|
|
|
|
|
|
db, |
|
|
|
|
|
|
|
)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for appservice in db.appservice.all()? { |
|
|
|
for appservice in db.appservice.all()? { |
|
|
|
if db.rooms.appservice_in_room(&pdu.room_id, &appservice, db)? { |
|
|
|
if db.rooms.appservice_in_room(&pdu.room_id, &appservice, db)? { |
|
|
|
@ -2298,13 +2271,13 @@ fn get_auth_chain_inner( |
|
|
|
event_id: &EventId, |
|
|
|
event_id: &EventId, |
|
|
|
db: &Database, |
|
|
|
db: &Database, |
|
|
|
) -> Result<HashSet<u64>> { |
|
|
|
) -> Result<HashSet<u64>> { |
|
|
|
let mut todo = vec![event_id.clone()]; |
|
|
|
let mut todo = vec![Arc::from(event_id)]; |
|
|
|
let mut found = HashSet::new(); |
|
|
|
let mut found = HashSet::new(); |
|
|
|
|
|
|
|
|
|
|
|
while let Some(event_id) = todo.pop() { |
|
|
|
while let Some(event_id) = todo.pop() { |
|
|
|
match db.rooms.get_pdu(&event_id) { |
|
|
|
match db.rooms.get_pdu(&event_id) { |
|
|
|
Ok(Some(pdu)) => { |
|
|
|
Ok(Some(pdu)) => { |
|
|
|
if &pdu.room_id != room_id { |
|
|
|
if pdu.room_id != room_id { |
|
|
|
return Err(Error::BadRequest(ErrorKind::Forbidden, "Evil event in db")); |
|
|
|
return Err(Error::BadRequest(ErrorKind::Forbidden, "Evil event in db")); |
|
|
|
} |
|
|
|
} |
|
|
|
for auth_event in &pdu.auth_events { |
|
|
|
for auth_event in &pdu.auth_events { |
|
|
|
@ -2363,10 +2336,10 @@ pub fn get_event_route( |
|
|
|
.and_then(|val| val.as_str()) |
|
|
|
.and_then(|val| val.as_str()) |
|
|
|
.ok_or_else(|| Error::bad_database("Invalid event in database"))?; |
|
|
|
.ok_or_else(|| Error::bad_database("Invalid event in database"))?; |
|
|
|
|
|
|
|
|
|
|
|
let room_id = RoomId::try_from(room_id_str) |
|
|
|
let room_id = <&RoomId>::try_from(room_id_str) |
|
|
|
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; |
|
|
|
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; |
|
|
|
|
|
|
|
|
|
|
|
if !db.rooms.server_in_room(sender_servername, &room_id)? { |
|
|
|
if !db.rooms.server_in_room(sender_servername, room_id)? { |
|
|
|
return Err(Error::BadRequest(ErrorKind::NotFound, "Event not found.")); |
|
|
|
return Err(Error::BadRequest(ErrorKind::NotFound, "Event not found.")); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -2417,7 +2390,7 @@ pub fn get_missing_events_route( |
|
|
|
.and_then(|val| val.as_str()) |
|
|
|
.and_then(|val| val.as_str()) |
|
|
|
.ok_or_else(|| Error::bad_database("Invalid event in database"))?; |
|
|
|
.ok_or_else(|| Error::bad_database("Invalid event in database"))?; |
|
|
|
|
|
|
|
|
|
|
|
let event_room_id = RoomId::try_from(room_id_str) |
|
|
|
let event_room_id = <&RoomId>::try_from(room_id_str) |
|
|
|
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; |
|
|
|
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; |
|
|
|
|
|
|
|
|
|
|
|
if event_room_id != body.room_id { |
|
|
|
if event_room_id != body.room_id { |
|
|
|
@ -2436,7 +2409,7 @@ pub fn get_missing_events_route( |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
queued_events.extend_from_slice( |
|
|
|
queued_events.extend_from_slice( |
|
|
|
&serde_json::from_value::<Vec<EventId>>( |
|
|
|
&serde_json::from_value::<Vec<Box<EventId>>>( |
|
|
|
serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| { |
|
|
|
serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| { |
|
|
|
Error::bad_database("Event in db has no prev_events field.") |
|
|
|
Error::bad_database("Event in db has no prev_events field.") |
|
|
|
})?) |
|
|
|
})?) |
|
|
|
@ -2485,14 +2458,14 @@ pub fn get_event_authorization_route( |
|
|
|
.and_then(|val| val.as_str()) |
|
|
|
.and_then(|val| val.as_str()) |
|
|
|
.ok_or_else(|| Error::bad_database("Invalid event in database"))?; |
|
|
|
.ok_or_else(|| Error::bad_database("Invalid event in database"))?; |
|
|
|
|
|
|
|
|
|
|
|
let room_id = RoomId::try_from(room_id_str) |
|
|
|
let room_id = <&RoomId>::try_from(room_id_str) |
|
|
|
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; |
|
|
|
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; |
|
|
|
|
|
|
|
|
|
|
|
if !db.rooms.server_in_room(sender_servername, &room_id)? { |
|
|
|
if !db.rooms.server_in_room(sender_servername, room_id)? { |
|
|
|
return Err(Error::BadRequest(ErrorKind::NotFound, "Event not found.")); |
|
|
|
return Err(Error::BadRequest(ErrorKind::NotFound, "Event not found.")); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
let auth_chain_ids = get_auth_chain(&room_id, vec![Arc::new(body.event_id.clone())], &db)?; |
|
|
|
let auth_chain_ids = get_auth_chain(room_id, vec![Arc::from(&*body.event_id)], &db)?; |
|
|
|
|
|
|
|
|
|
|
|
Ok(get_event_authorization::v1::Response { |
|
|
|
Ok(get_event_authorization::v1::Response { |
|
|
|
auth_chain: auth_chain_ids |
|
|
|
auth_chain: auth_chain_ids |
|
|
|
@ -2550,7 +2523,7 @@ pub fn get_room_state_route( |
|
|
|
}) |
|
|
|
}) |
|
|
|
.collect(); |
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
|
|
let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::new(body.event_id.clone())], &db)?; |
|
|
|
let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)], &db)?; |
|
|
|
|
|
|
|
|
|
|
|
Ok(get_room_state::v1::Response { |
|
|
|
Ok(get_room_state::v1::Response { |
|
|
|
auth_chain: auth_chain_ids |
|
|
|
auth_chain: auth_chain_ids |
|
|
|
@ -2606,13 +2579,13 @@ pub fn get_room_state_ids_route( |
|
|
|
.rooms |
|
|
|
.rooms |
|
|
|
.state_full_ids(shortstatehash)? |
|
|
|
.state_full_ids(shortstatehash)? |
|
|
|
.into_iter() |
|
|
|
.into_iter() |
|
|
|
.map(|(_, id)| (*id).clone()) |
|
|
|
.map(|(_, id)| (*id).to_owned()) |
|
|
|
.collect(); |
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
|
|
let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::new(body.event_id.clone())], &db)?; |
|
|
|
let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)], &db)?; |
|
|
|
|
|
|
|
|
|
|
|
Ok(get_room_state_ids::v1::Response { |
|
|
|
Ok(get_room_state_ids::v1::Response { |
|
|
|
auth_chain_ids: auth_chain_ids.map(|id| (*id).clone()).collect(), |
|
|
|
auth_chain_ids: auth_chain_ids.map(|id| (*id).to_owned()).collect(), |
|
|
|
pdu_ids, |
|
|
|
pdu_ids, |
|
|
|
} |
|
|
|
} |
|
|
|
.into()) |
|
|
|
.into()) |
|
|
|
@ -2670,12 +2643,9 @@ pub fn create_join_event_template_route( |
|
|
|
None |
|
|
|
None |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
// If there was no create event yet, assume we are creating a room with the default version
|
|
|
|
// If there was no create event yet, assume we are creating a version 6 room right now
|
|
|
|
// right now
|
|
|
|
let room_version_id = |
|
|
|
let room_version_id = create_event_content |
|
|
|
create_event_content.map_or(RoomVersionId::V6, |create_event| create_event.room_version); |
|
|
|
.map_or(db.globals.default_room_version(), |create_event| { |
|
|
|
|
|
|
|
create_event.room_version |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
let room_version = RoomVersion::new(&room_version_id).expect("room version is supported"); |
|
|
|
let room_version = RoomVersion::new(&room_version_id).expect("room version is supported"); |
|
|
|
|
|
|
|
|
|
|
|
if !body.ver.contains(&room_version_id) { |
|
|
|
if !body.ver.contains(&room_version_id) { |
|
|
|
@ -2695,6 +2665,7 @@ pub fn create_join_event_template_route( |
|
|
|
membership: MembershipState::Join, |
|
|
|
membership: MembershipState::Join, |
|
|
|
third_party_invite: None, |
|
|
|
third_party_invite: None, |
|
|
|
reason: None, |
|
|
|
reason: None, |
|
|
|
|
|
|
|
join_authorized_via_users_server: None, |
|
|
|
}) |
|
|
|
}) |
|
|
|
.expect("member event is valid value"); |
|
|
|
.expect("member event is valid value"); |
|
|
|
|
|
|
|
|
|
|
|
@ -2728,7 +2699,7 @@ pub fn create_join_event_template_route( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
let pdu = PduEvent { |
|
|
|
let pdu = PduEvent { |
|
|
|
event_id: ruma::event_id!("$thiswillbefilledinlater"), |
|
|
|
event_id: ruma::event_id!("$thiswillbefilledinlater").into(), |
|
|
|
room_id: body.room_id.clone(), |
|
|
|
room_id: body.room_id.clone(), |
|
|
|
sender: body.user_id.clone(), |
|
|
|
sender: body.user_id.clone(), |
|
|
|
origin_server_ts: utils::millis_since_unix_epoch() |
|
|
|
origin_server_ts: utils::millis_since_unix_epoch() |
|
|
|
@ -2815,7 +2786,7 @@ async fn create_join_event( |
|
|
|
// let mut auth_cache = EventMap::new();
|
|
|
|
// let mut auth_cache = EventMap::new();
|
|
|
|
|
|
|
|
|
|
|
|
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
|
|
|
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
|
|
|
let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(pdu, &db) { |
|
|
|
let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(pdu) { |
|
|
|
Ok(t) => t, |
|
|
|
Ok(t) => t, |
|
|
|
Err(_) => { |
|
|
|
Err(_) => { |
|
|
|
// Event could not be converted to canonical json
|
|
|
|
// Event could not be converted to canonical json
|
|
|
|
@ -2840,7 +2811,7 @@ async fn create_join_event( |
|
|
|
.roomid_mutex_federation |
|
|
|
.roomid_mutex_federation |
|
|
|
.write() |
|
|
|
.write() |
|
|
|
.unwrap() |
|
|
|
.unwrap() |
|
|
|
.entry(room_id.clone()) |
|
|
|
.entry(room_id.to_owned()) |
|
|
|
.or_default(), |
|
|
|
.or_default(), |
|
|
|
); |
|
|
|
); |
|
|
|
let mutex_lock = mutex.lock().await; |
|
|
|
let mutex_lock = mutex.lock().await; |
|
|
|
@ -2939,7 +2910,7 @@ pub async fn create_invite_route( |
|
|
|
return Err(Error::bad_config("Federation is disabled.")); |
|
|
|
return Err(Error::bad_config("Federation is disabled.")); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if !db.rooms.is_supported_version(&db, &body.room_version) { |
|
|
|
if body.room_version != RoomVersionId::V5 && body.room_version != RoomVersionId::V6 { |
|
|
|
return Err(Error::BadRequest( |
|
|
|
return Err(Error::BadRequest( |
|
|
|
ErrorKind::IncompatibleRoomVersion { |
|
|
|
ErrorKind::IncompatibleRoomVersion { |
|
|
|
room_version: body.room_version.clone(), |
|
|
|
room_version: body.room_version.clone(), |
|
|
|
@ -2960,7 +2931,7 @@ pub async fn create_invite_route( |
|
|
|
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Failed to sign event."))?; |
|
|
|
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Failed to sign event."))?; |
|
|
|
|
|
|
|
|
|
|
|
// Generate event id
|
|
|
|
// Generate event id
|
|
|
|
let event_id = EventId::try_from(&*format!( |
|
|
|
let event_id = EventId::parse(format!( |
|
|
|
"${}", |
|
|
|
"${}", |
|
|
|
ruma::signatures::reference_hash(&signed_event, &body.room_version) |
|
|
|
ruma::signatures::reference_hash(&signed_event, &body.room_version) |
|
|
|
.expect("ruma can calculate reference hashes") |
|
|
|
.expect("ruma can calculate reference hashes") |
|
|
|
@ -2973,7 +2944,7 @@ pub async fn create_invite_route( |
|
|
|
CanonicalJsonValue::String(event_id.into()), |
|
|
|
CanonicalJsonValue::String(event_id.into()), |
|
|
|
); |
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
let sender = serde_json::from_value( |
|
|
|
let sender: Box<_> = serde_json::from_value( |
|
|
|
signed_event |
|
|
|
signed_event |
|
|
|
.get("sender") |
|
|
|
.get("sender") |
|
|
|
.ok_or(Error::BadRequest( |
|
|
|
.ok_or(Error::BadRequest( |
|
|
|
@ -2985,7 +2956,7 @@ pub async fn create_invite_route( |
|
|
|
) |
|
|
|
) |
|
|
|
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "sender is not a user id."))?; |
|
|
|
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "sender is not a user id."))?; |
|
|
|
|
|
|
|
|
|
|
|
let invited_user = serde_json::from_value( |
|
|
|
let invited_user: Box<_> = serde_json::from_value( |
|
|
|
signed_event |
|
|
|
signed_event |
|
|
|
.get("state_key") |
|
|
|
.get("state_key") |
|
|
|
.ok_or(Error::BadRequest( |
|
|
|
.ok_or(Error::BadRequest( |
|
|
|
@ -3236,7 +3207,7 @@ pub(crate) async fn fetch_required_signing_keys( |
|
|
|
|
|
|
|
|
|
|
|
let fetch_res = fetch_signing_keys( |
|
|
|
let fetch_res = fetch_signing_keys( |
|
|
|
db, |
|
|
|
db, |
|
|
|
&Box::<ServerName>::try_from(&**signature_server).map_err(|_| { |
|
|
|
signature_server.as_str().try_into().map_err(|_| { |
|
|
|
Error::BadServerResponse("Invalid servername in signatures of server response pdu.") |
|
|
|
Error::BadServerResponse("Invalid servername in signatures of server response pdu.") |
|
|
|
})?, |
|
|
|
})?, |
|
|
|
signature_ids, |
|
|
|
signature_ids, |
|
|
|
@ -3264,7 +3235,7 @@ pub(crate) async fn fetch_required_signing_keys( |
|
|
|
// the PDUs and either cache the key or add it to the list that needs to be retrieved.
|
|
|
|
// the PDUs and either cache the key or add it to the list that needs to be retrieved.
|
|
|
|
fn get_server_keys_from_cache( |
|
|
|
fn get_server_keys_from_cache( |
|
|
|
pdu: &RawJsonValue, |
|
|
|
pdu: &RawJsonValue, |
|
|
|
servers: &mut BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>>, |
|
|
|
servers: &mut BTreeMap<Box<ServerName>, BTreeMap<Box<ServerSigningKeyId>, QueryCriteria>>, |
|
|
|
room_version: &RoomVersionId, |
|
|
|
room_version: &RoomVersionId, |
|
|
|
pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
db: &Database, |
|
|
|
db: &Database, |
|
|
|
@ -3274,19 +3245,20 @@ fn get_server_keys_from_cache( |
|
|
|
Error::BadServerResponse("Invalid PDU in server response") |
|
|
|
Error::BadServerResponse("Invalid PDU in server response") |
|
|
|
})?; |
|
|
|
})?; |
|
|
|
|
|
|
|
|
|
|
|
let event_id = EventId::try_from(&*format!( |
|
|
|
let event_id = format!( |
|
|
|
"${}", |
|
|
|
"${}", |
|
|
|
ruma::signatures::reference_hash(&value, room_version) |
|
|
|
ruma::signatures::reference_hash(&value, room_version) |
|
|
|
.expect("ruma can calculate reference hashes") |
|
|
|
.expect("ruma can calculate reference hashes") |
|
|
|
)) |
|
|
|
); |
|
|
|
.expect("ruma's reference hashes are valid event ids"); |
|
|
|
let event_id = <&EventId>::try_from(event_id.as_str()) |
|
|
|
|
|
|
|
.expect("ruma's reference hashes are valid event ids"); |
|
|
|
|
|
|
|
|
|
|
|
if let Some((time, tries)) = db |
|
|
|
if let Some((time, tries)) = db |
|
|
|
.globals |
|
|
|
.globals |
|
|
|
.bad_event_ratelimiter |
|
|
|
.bad_event_ratelimiter |
|
|
|
.read() |
|
|
|
.read() |
|
|
|
.unwrap() |
|
|
|
.unwrap() |
|
|
|
.get(&event_id) |
|
|
|
.get(event_id) |
|
|
|
{ |
|
|
|
{ |
|
|
|
// Exponential backoff
|
|
|
|
// Exponential backoff
|
|
|
|
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); |
|
|
|
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); |
|
|
|
@ -3320,7 +3292,7 @@ fn get_server_keys_from_cache( |
|
|
|
let contains_all_ids = |
|
|
|
let contains_all_ids = |
|
|
|
|keys: &BTreeMap<String, String>| signature_ids.iter().all(|id| keys.contains_key(id)); |
|
|
|
|keys: &BTreeMap<String, String>| signature_ids.iter().all(|id| keys.contains_key(id)); |
|
|
|
|
|
|
|
|
|
|
|
let origin = &Box::<ServerName>::try_from(&**signature_server).map_err(|_| { |
|
|
|
let origin = <&ServerName>::try_from(signature_server.as_str()).map_err(|_| { |
|
|
|
Error::BadServerResponse("Invalid servername in signatures of server response pdu.") |
|
|
|
Error::BadServerResponse("Invalid servername in signatures of server response pdu.") |
|
|
|
})?; |
|
|
|
})?; |
|
|
|
|
|
|
|
|
|
|
|
@ -3339,7 +3311,7 @@ fn get_server_keys_from_cache( |
|
|
|
|
|
|
|
|
|
|
|
if !contains_all_ids(&result) { |
|
|
|
if !contains_all_ids(&result) { |
|
|
|
trace!("Signing key not loaded for {}", origin); |
|
|
|
trace!("Signing key not loaded for {}", origin); |
|
|
|
servers.insert(origin.clone(), BTreeMap::new()); |
|
|
|
servers.insert(origin.to_owned(), BTreeMap::new()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub_key_map.insert(origin.to_string(), result); |
|
|
|
pub_key_map.insert(origin.to_string(), result); |
|
|
|
@ -3354,7 +3326,7 @@ pub(crate) async fn fetch_join_signing_keys( |
|
|
|
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>, |
|
|
|
db: &Database, |
|
|
|
db: &Database, |
|
|
|
) -> Result<()> { |
|
|
|
) -> Result<()> { |
|
|
|
let mut servers: BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>> = |
|
|
|
let mut servers: BTreeMap<Box<ServerName>, BTreeMap<Box<ServerSigningKeyId>, QueryCriteria>> = |
|
|
|
BTreeMap::new(); |
|
|
|
BTreeMap::new(); |
|
|
|
|
|
|
|
|
|
|
|
{ |
|
|
|
{ |
|
|
|
@ -3388,10 +3360,6 @@ pub(crate) async fn fetch_join_signing_keys( |
|
|
|
server, |
|
|
|
server, |
|
|
|
get_remote_server_keys_batch::v2::Request { |
|
|
|
get_remote_server_keys_batch::v2::Request { |
|
|
|
server_keys: servers.clone(), |
|
|
|
server_keys: servers.clone(), |
|
|
|
minimum_valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time( |
|
|
|
|
|
|
|
SystemTime::now() + Duration::from_secs(60), |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
.expect("time is valid"), |
|
|
|
|
|
|
|
}, |
|
|
|
}, |
|
|
|
) |
|
|
|
) |
|
|
|
.await |
|
|
|
.await |
|
|
|
|