|
|
|
@ -18,6 +18,7 @@ use ruma::{ |
|
|
|
OutgoingRequest, |
|
|
|
OutgoingRequest, |
|
|
|
}, |
|
|
|
}, |
|
|
|
directory::{IncomingFilter, IncomingRoomNetwork}, |
|
|
|
directory::{IncomingFilter, IncomingRoomNetwork}, |
|
|
|
|
|
|
|
events::EventType, |
|
|
|
serde::to_canonical_value, |
|
|
|
serde::to_canonical_value, |
|
|
|
signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap}, |
|
|
|
signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap}, |
|
|
|
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, |
|
|
|
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, |
|
|
|
@ -483,34 +484,6 @@ pub async fn get_public_rooms_route( |
|
|
|
.into()) |
|
|
|
.into()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] |
|
|
|
|
|
|
|
pub enum PrevEvents<T> { |
|
|
|
|
|
|
|
Sequential(T), |
|
|
|
|
|
|
|
Fork(Vec<T>), |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
impl<T> IntoIterator for PrevEvents<T> { |
|
|
|
|
|
|
|
type Item = T; |
|
|
|
|
|
|
|
type IntoIter = std::vec::IntoIter<Self::Item>; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fn into_iter(self) -> Self::IntoIter { |
|
|
|
|
|
|
|
match self { |
|
|
|
|
|
|
|
Self::Sequential(item) => vec![item].into_iter(), |
|
|
|
|
|
|
|
Self::Fork(list) => list.into_iter(), |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
impl<T: Clone> PrevEvents<T> { |
|
|
|
|
|
|
|
pub fn new(id: &[T]) -> Self { |
|
|
|
|
|
|
|
match id { |
|
|
|
|
|
|
|
[] => panic!("All events must have previous event"), |
|
|
|
|
|
|
|
[single_id] => Self::Sequential(single_id.clone()), |
|
|
|
|
|
|
|
rest => Self::Fork(rest.to_vec()), |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg_attr(
|
|
|
|
#[cfg_attr(
|
|
|
|
feature = "conduit_bin", |
|
|
|
feature = "conduit_bin", |
|
|
|
put("/_matrix/federation/v1/send/<_>", data = "<body>") |
|
|
|
put("/_matrix/federation/v1/send/<_>", data = "<body>") |
|
|
|
@ -605,8 +578,16 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
UserId::try_from(sender.as_str()).expect("All PDUs have a valid sender field"); |
|
|
|
UserId::try_from(sender.as_str()).expect("All PDUs have a valid sender field"); |
|
|
|
let origin = sender.server_name(); |
|
|
|
let origin = sender.server_name(); |
|
|
|
|
|
|
|
|
|
|
|
// TODO: this could fail or the server not respond...
|
|
|
|
let keys = match fetch_signing_keys(&db, origin).await { |
|
|
|
let keys = fetch_signing_keys(&db, origin).await?; |
|
|
|
Ok(keys) => keys, |
|
|
|
|
|
|
|
Err(_) => { |
|
|
|
|
|
|
|
resolved_map.insert( |
|
|
|
|
|
|
|
event_id, |
|
|
|
|
|
|
|
Err("Could not find signing keys for this server".to_string()), |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
continue; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
pub_key_map.insert( |
|
|
|
pub_key_map.insert( |
|
|
|
origin.to_string(), |
|
|
|
origin.to_string(), |
|
|
|
@ -769,11 +750,12 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
//
|
|
|
|
//
|
|
|
|
// calculate_forward_extremities takes care of adding the current state if not already in the state sets
|
|
|
|
// calculate_forward_extremities takes care of adding the current state if not already in the state sets
|
|
|
|
// it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree.
|
|
|
|
// it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree.
|
|
|
|
let (mut fork_states, fork_ids) = match calculate_forward_extremities( |
|
|
|
let (mut fork_states, extremities) = match calculate_forward_extremities( |
|
|
|
&db, |
|
|
|
&db, |
|
|
|
&pdu, |
|
|
|
&pdu, |
|
|
|
server_name, |
|
|
|
server_name, |
|
|
|
&pub_key_map, |
|
|
|
&pub_key_map, |
|
|
|
|
|
|
|
current_state, |
|
|
|
&mut auth_cache, |
|
|
|
&mut auth_cache, |
|
|
|
) |
|
|
|
) |
|
|
|
.await |
|
|
|
.await |
|
|
|
@ -791,6 +773,7 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
|
|
|
|
|
|
|
|
let fork_states = fork_states.into_iter().collect::<Vec<_>>(); |
|
|
|
let fork_states = fork_states.into_iter().collect::<Vec<_>>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut update_state = false; |
|
|
|
// 13. start state-res with all previous forward extremities minus the ones that are in
|
|
|
|
// 13. start state-res with all previous forward extremities minus the ones that are in
|
|
|
|
// the prev_events of this event plus the new one created by this event and use
|
|
|
|
// the prev_events of this event plus the new one created by this event and use
|
|
|
|
// the result as the new room state
|
|
|
|
// the result as the new room state
|
|
|
|
@ -800,11 +783,12 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
} else if fork_states.len() == 1 { |
|
|
|
} else if fork_states.len() == 1 { |
|
|
|
fork_states[0].clone() |
|
|
|
fork_states[0].clone() |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
|
|
|
|
// We do need to force an update to this rooms state
|
|
|
|
|
|
|
|
update_state = true; |
|
|
|
|
|
|
|
|
|
|
|
// TODO: remove this is for current debugging Jan, 15 2021
|
|
|
|
// TODO: remove this is for current debugging Jan, 15 2021
|
|
|
|
let mut number_fetches = 0_u32; |
|
|
|
let mut number_fetches = 0_u32; |
|
|
|
let mut auth_events = vec![]; |
|
|
|
let mut auth_events = vec![]; |
|
|
|
// this keeps track if we error so we can break out of these inner loops
|
|
|
|
|
|
|
|
// to continue on with the incoming PDU's
|
|
|
|
|
|
|
|
for map in &fork_states { |
|
|
|
for map in &fork_states { |
|
|
|
let mut state_auth = vec![]; |
|
|
|
let mut state_auth = vec![]; |
|
|
|
for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) { |
|
|
|
for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) { |
|
|
|
@ -821,14 +805,12 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
.await |
|
|
|
.await |
|
|
|
.map(|mut vec| { |
|
|
|
.map(|mut vec| { |
|
|
|
number_fetches += 1; |
|
|
|
number_fetches += 1; |
|
|
|
vec.remove(0) |
|
|
|
vec.pop() |
|
|
|
}) { |
|
|
|
}) { |
|
|
|
Ok(aev) => aev, |
|
|
|
Ok(Some(aev)) => aev, |
|
|
|
Err(_) => { |
|
|
|
_ => { |
|
|
|
resolved_map.insert( |
|
|
|
resolved_map |
|
|
|
event_id.clone(), |
|
|
|
.insert(event_id.clone(), Err("Failed to fetch event".into())); |
|
|
|
Err("Event has been soft failed".into()), |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
continue 'main_pdu_loop; |
|
|
|
continue 'main_pdu_loop; |
|
|
|
} |
|
|
|
} |
|
|
|
}, |
|
|
|
}, |
|
|
|
@ -839,20 +821,19 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
} |
|
|
|
} |
|
|
|
info!("{} event's were not in the auth_cache", number_fetches); |
|
|
|
info!("{} event's were not in the auth_cache", number_fetches); |
|
|
|
|
|
|
|
|
|
|
|
let mut event_map = EventMap::new(); |
|
|
|
|
|
|
|
// Add everything we will need to event_map
|
|
|
|
// Add everything we will need to event_map
|
|
|
|
event_map.extend( |
|
|
|
auth_cache.extend( |
|
|
|
auth_events |
|
|
|
auth_events |
|
|
|
.iter() |
|
|
|
.iter() |
|
|
|
.map(|pdus| pdus.iter().map(|pdu| (pdu.event_id().clone(), pdu.clone()))) |
|
|
|
.map(|pdus| pdus.iter().map(|pdu| (pdu.event_id().clone(), pdu.clone()))) |
|
|
|
.flatten(), |
|
|
|
.flatten(), |
|
|
|
); |
|
|
|
); |
|
|
|
event_map.extend( |
|
|
|
auth_cache.extend( |
|
|
|
incoming_auth_events |
|
|
|
incoming_auth_events |
|
|
|
.into_iter() |
|
|
|
.into_iter() |
|
|
|
.map(|pdu| (pdu.event_id().clone(), pdu)), |
|
|
|
.map(|pdu| (pdu.event_id().clone(), pdu)), |
|
|
|
); |
|
|
|
); |
|
|
|
event_map.extend( |
|
|
|
auth_cache.extend( |
|
|
|
state_at_event |
|
|
|
state_at_event |
|
|
|
.into_iter() |
|
|
|
.into_iter() |
|
|
|
.map(|(_, pdu)| (pdu.event_id().clone(), pdu)), |
|
|
|
.map(|(_, pdu)| (pdu.event_id().clone(), pdu)), |
|
|
|
@ -873,7 +854,7 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
.into_iter() |
|
|
|
.into_iter() |
|
|
|
.map(|pdus| pdus.into_iter().map(|pdu| pdu.event_id().clone()).collect()) |
|
|
|
.map(|pdus| pdus.into_iter().map(|pdu| pdu.event_id().clone()).collect()) |
|
|
|
.collect(), |
|
|
|
.collect(), |
|
|
|
&mut event_map, |
|
|
|
&mut auth_cache, |
|
|
|
) { |
|
|
|
) { |
|
|
|
Ok(res) => res |
|
|
|
Ok(res) => res |
|
|
|
.into_iter() |
|
|
|
.into_iter() |
|
|
|
@ -905,14 +886,23 @@ pub async fn send_transaction_message_route<'a>( |
|
|
|
); |
|
|
|
); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
// Add the event to the DB and update the forward extremities (via roomid_pduleaves).
|
|
|
|
// Add the event to the DB and update the forward extremities (via roomid_pduleaves).
|
|
|
|
append_state(&db, &pdu, &fork_ids)?; |
|
|
|
append_incoming_pdu( |
|
|
|
|
|
|
|
&db, |
|
|
|
|
|
|
|
&pdu, |
|
|
|
|
|
|
|
&extremities, |
|
|
|
|
|
|
|
if update_state { |
|
|
|
|
|
|
|
Some(state_at_forks) |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
None |
|
|
|
|
|
|
|
}, |
|
|
|
|
|
|
|
)?; |
|
|
|
|
|
|
|
|
|
|
|
// Event has passed all auth/stateres checks
|
|
|
|
// Event has passed all auth/stateres checks
|
|
|
|
resolved_map.insert(pdu.event_id().clone(), Ok(())); |
|
|
|
resolved_map.insert(pdu.event_id().clone(), Ok(())); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) |
|
|
|
Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// An async function that can recursively calls itself.
|
|
|
|
/// An async function that can recursively calls itself.
|
|
|
|
@ -1029,6 +1019,7 @@ async fn fetch_check_auth_events( |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// TODO: Batch these async calls so we can wait on multiple at once
|
|
|
|
let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache) |
|
|
|
let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache) |
|
|
|
.await |
|
|
|
.await |
|
|
|
.map(|mut vec| { |
|
|
|
.map(|mut vec| { |
|
|
|
@ -1119,6 +1110,7 @@ async fn calculate_forward_extremities( |
|
|
|
pdu: &PduEvent, |
|
|
|
pdu: &PduEvent, |
|
|
|
origin: &ServerName, |
|
|
|
origin: &ServerName, |
|
|
|
pub_key_map: &PublicKeyMap, |
|
|
|
pub_key_map: &PublicKeyMap, |
|
|
|
|
|
|
|
current_state: BTreeMap<(EventType, Option<String>), Arc<PduEvent>>, |
|
|
|
auth_cache: &mut EventMap<Arc<PduEvent>>, |
|
|
|
auth_cache: &mut EventMap<Arc<PduEvent>>, |
|
|
|
) -> Result<(BTreeSet<StateMap<Arc<PduEvent>>>, Vec<EventId>)> { |
|
|
|
) -> Result<(BTreeSet<StateMap<Arc<PduEvent>>>, Vec<EventId>)> { |
|
|
|
let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; |
|
|
|
let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; |
|
|
|
@ -1126,17 +1118,13 @@ async fn calculate_forward_extremities( |
|
|
|
let mut is_incoming_leaf = true; |
|
|
|
let mut is_incoming_leaf = true; |
|
|
|
// Make sure the incoming event is not already a forward extremity
|
|
|
|
// Make sure the incoming event is not already a forward extremity
|
|
|
|
// FIXME: I think this could happen if different servers send us the same event??
|
|
|
|
// FIXME: I think this could happen if different servers send us the same event??
|
|
|
|
if current_leaves.contains(pdu.event_id()) { |
|
|
|
//
|
|
|
|
is_incoming_leaf = false; |
|
|
|
|
|
|
|
// Not sure what to do here
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If the incoming event is already referenced by an existing event
|
|
|
|
// If the incoming event is already referenced by an existing event
|
|
|
|
// then do nothing - it's not a candidate to be a new extremity if
|
|
|
|
// then do nothing - it's not a candidate to be a new extremity if
|
|
|
|
// it has been referenced.
|
|
|
|
// it has been referenced.
|
|
|
|
if already_referenced(db, pdu)? { |
|
|
|
if current_leaves.contains(pdu.event_id()) || db.rooms.get_pdu_id(pdu.event_id())?.is_some() { |
|
|
|
is_incoming_leaf = false; |
|
|
|
is_incoming_leaf = false; |
|
|
|
// This event has been dealt with already??
|
|
|
|
// Not sure what to do here
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// TODO:
|
|
|
|
// TODO:
|
|
|
|
@ -1213,29 +1201,54 @@ async fn calculate_forward_extremities( |
|
|
|
|
|
|
|
|
|
|
|
// This guarantees that our current room state is included
|
|
|
|
// This guarantees that our current room state is included
|
|
|
|
if !includes_current_state && current_hash.is_some() { |
|
|
|
if !includes_current_state && current_hash.is_some() { |
|
|
|
fork_states.insert( |
|
|
|
fork_states.insert(current_state); |
|
|
|
db.rooms |
|
|
|
|
|
|
|
.state_full(pdu.room_id(), current_hash.as_ref().unwrap())? |
|
|
|
|
|
|
|
.into_iter() |
|
|
|
|
|
|
|
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) |
|
|
|
|
|
|
|
.collect(), |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Ok((fork_states, dbg!(current_leaves))) |
|
|
|
Ok((fork_states, dbg!(current_leaves))) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// TODO: we need to know if the event is a prev_event (is this event already referenced in the DAG)
|
|
|
|
/// Update the room state to be the resolved state and add the fully auth'ed event
|
|
|
|
fn already_referenced(_db: &Database, _pdu: &PduEvent) -> Result<bool> { |
|
|
|
/// to the DB.
|
|
|
|
Ok(false) |
|
|
|
///
|
|
|
|
} |
|
|
|
/// TODO: If we force the state we need to validate all events in that state
|
|
|
|
|
|
|
|
/// any events we fetched from another server need to be fully verified?
|
|
|
|
fn append_state(db: &Database, pdu: &PduEvent, new_room_leaves: &[EventId]) -> Result<()> { |
|
|
|
fn append_incoming_pdu( |
|
|
|
|
|
|
|
db: &Database, |
|
|
|
|
|
|
|
pdu: &PduEvent, |
|
|
|
|
|
|
|
new_room_leaves: &[EventId], |
|
|
|
|
|
|
|
state: Option<StateMap<Arc<PduEvent>>>, |
|
|
|
|
|
|
|
) -> Result<()> { |
|
|
|
let count = db.globals.next_count()?; |
|
|
|
let count = db.globals.next_count()?; |
|
|
|
let mut pdu_id = pdu.room_id.as_bytes().to_vec(); |
|
|
|
let mut pdu_id = pdu.room_id.as_bytes().to_vec(); |
|
|
|
pdu_id.push(0xff); |
|
|
|
pdu_id.push(0xff); |
|
|
|
pdu_id.extend_from_slice(&count.to_be_bytes()); |
|
|
|
pdu_id.extend_from_slice(&count.to_be_bytes()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Update the state of the room if needed
|
|
|
|
|
|
|
|
// We can tell if we need to do this based on wether state resolution took place or not
|
|
|
|
|
|
|
|
if let Some(state) = state { |
|
|
|
|
|
|
|
let new = state |
|
|
|
|
|
|
|
.into_iter() |
|
|
|
|
|
|
|
.map(|((ev, k), pdu)| { |
|
|
|
|
|
|
|
Ok(( |
|
|
|
|
|
|
|
( |
|
|
|
|
|
|
|
ev, |
|
|
|
|
|
|
|
k.ok_or_else(|| Error::Conflict("State contained non state event"))?, |
|
|
|
|
|
|
|
), |
|
|
|
|
|
|
|
db.rooms |
|
|
|
|
|
|
|
.get_pdu_id(pdu.event_id()) |
|
|
|
|
|
|
|
.ok() |
|
|
|
|
|
|
|
.flatten() |
|
|
|
|
|
|
|
.ok_or_else(|| Error::Conflict("Resolved state contained unknown event"))? |
|
|
|
|
|
|
|
.to_vec(), |
|
|
|
|
|
|
|
)) |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
.collect::<Result<_>>()?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
info!("Force update of state for {:?}", pdu); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
db.rooms.force_state(pdu.room_id(), new, &db.globals)?; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// We append to state before appending the pdu, so we don't have a moment in time with the
|
|
|
|
// We append to state before appending the pdu, so we don't have a moment in time with the
|
|
|
|
// pdu without it's state. This is okay because append_pdu can't fail.
|
|
|
|
// pdu without it's state. This is okay because append_pdu can't fail.
|
|
|
|
let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; |
|
|
|
let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; |
|
|
|
|