Browse Source

Happy path of incoming /send pdu resolution

merge-requests/10/head
Devin Ragotzy 5 years ago
parent
commit
c7525fdd5e
  1. 1
      Cargo.lock
  2. 2
      Cargo.toml
  3. 2
      src/database/rooms.rs
  4. 2
      src/pdu.rs
  5. 490
      src/server_server.rs

1
Cargo.lock generated

@ -1986,6 +1986,7 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" @@ -1986,6 +1986,7 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483"
[[package]]
name = "state-res"
version = "0.1.0"
source = "git+https://github.com/ruma/state-res#dca71f76eea9f1378a54e96e5fc98d71dfbf5dde"
dependencies = [
"itertools",
"maplit",

2
Cargo.toml

@ -24,7 +24,7 @@ ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", @@ -24,7 +24,7 @@ ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api",
# Used when doing state resolution
# state-res = { git = "https://github.com/timokoesters/state-res", branch = "spec-comp", features = ["unstable-pre-spec"] }
state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec", "gen-eventid"] }
state-res = { git = "https://github.com/ruma/state-res", features = ["unstable-pre-spec", "gen-eventid"] }
# state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] }
# Used for long polling and federation sender, should be the same as rocket::tokio

2
src/database/rooms.rs

@ -269,7 +269,7 @@ impl Rooms { @@ -269,7 +269,7 @@ impl Rooms {
pub fn force_state(
&self,
room_id: &RoomId,
state: &HashMap<(EventType, String), Vec<u8>>,
state: HashMap<(EventType, String), Vec<u8>>,
) -> Result<()> {
let state_hash =
self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::<Vec<_>>())?;

2
src/pdu.rs

@ -17,7 +17,7 @@ use std::{ @@ -17,7 +17,7 @@ use std::{
time::UNIX_EPOCH,
};
#[derive(Deserialize, Serialize, Debug)]
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct PduEvent {
pub event_id: EventId,
pub room_id: RoomId,

490
src/server_server.rs

@ -24,7 +24,7 @@ use ruma::{ @@ -24,7 +24,7 @@ use ruma::{
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,
};
use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet},
convert::TryFrom,
fmt::Debug,
sync::Arc,
@ -390,6 +390,22 @@ pub async fn get_public_rooms_route( @@ -390,6 +390,22 @@ pub async fn get_public_rooms_route(
.into())
}
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum PrevEvents<T> {
Sequential(T),
Fork(Vec<T>),
}
impl<T: Clone> PrevEvents<T> {
pub fn new(id: &[T]) -> Self {
match id {
[] => panic!("All events must have previous event"),
[single_id] => Self::Sequential(single_id.clone()),
rest => Self::Fork(rest.to_vec()),
}
}
}
#[cfg_attr(
feature = "conduit_bin",
put("/_matrix/federation/v1/send/<_>", data = "<body>")
@ -475,78 +491,140 @@ pub async fn send_transaction_message_route<'a>( @@ -475,78 +491,140 @@ pub async fn send_transaction_message_route<'a>(
// The events that must be resolved to catch up to the incoming event
let mut missing = vec![];
let mut seen = BTreeMap::new();
let mut prev_ids = pdu.prev_events.to_vec();
// Don't kill our server with state-res
// TODO: set this at a reasonable level this is for debug/wip purposes
if prev_ids.len() > 5 {
resolved_map.insert(
event_id,
Err("Event has abnormally large prev_events count".into()),
);
continue;
}
let mut seen = state_res::StateMap::new();
// This is `while let Some(event_id) = prev_ids.pop()` but with a fancy continue
let mut prev_ids = vec![PrevEvents::new(&pdu.prev_events)];
// This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue
// in the case of a failed request to the server that sent the event
'inner: loop {
let id = if let Some(id) = prev_ids.pop() {
id
} else {
break 'inner;
};
// TODO: if this is ever more that 1 at a time we must do actual
// full state resolution not just auth
match prev_ids.pop() {
Some(PrevEvents::Sequential(id)) => match db
.rooms
.pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())?
{
// We know the state snapshot for this events parents so we can simply auth the
// incoming event and append to DB and append to state if it passes
Some(state_hash) => {
seen = db.rooms.state_full(&state_hash)?;
break 'inner;
}
// We need to fill in information about this event's `prev_events` (parents)
None => {
match send_request(
&db.globals,
body.body.origin.clone(),
ruma::api::federation::event::get_event::v1::Request::new(&id),
)
.await
{
Ok(res) => {
let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu);
let prev_pdu = serde_json::from_value::<PduEvent>(
serde_json::to_value(&val)
.expect("CanonicalJsonObj is a valid JsonValue"),
)
.expect("all ruma pdus are conduit pdus");
// TODO: do we need this
assert_eq!(room_id, &prev_pdu.room_id);
prev_ids.push(PrevEvents::new(&prev_pdu.prev_events));
missing.push(PrevEvents::Sequential(prev_pdu));
}
// We can't hard fail because there are some valid errors, just
// keep checking PDU's
//
// As an example a possible error
// {"errcode":"M_FORBIDDEN","error":"Host not in room."}
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));
// We have to give up on this PDU
continue 'outer;
}
};
}
},
Some(PrevEvents::Fork(ids)) => {
error!(
"prev_events > 1: {}",
serde_json::to_string_pretty(&pdu).unwrap()
);
// Don't kill our server with state-res
// TODO: set this at a reasonable level this is for debug/wip purposes
if ids.len() > 5 {
error!(
"prev_events > 1: {}",
serde_json::to_string_pretty(&pdu).unwrap()
);
resolved_map.insert(
event_id,
Err("Previous events are too large for state-res".into()),
);
continue 'outer;
}
match db
.rooms
.pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())?
{
// We know the state snapshot for this events parents so we can simply auth the
// incoming event and append to DB and append to state if it passes
Some(state_hash) => {
seen = db.rooms.state_full(&state_hash)?;
break 'inner;
}
// We need to fill in information about this event's `prev_events` (parents)
None => {
// We have a state event so we need info for state-res
match send_request(
&db.globals,
body.body.origin.clone(),
ruma::api::federation::event::get_event::v1::Request::new(&id),
)
.await
{
Ok(res) => {
let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu);
let prev_pdu = serde_json::from_value::<PduEvent>(
serde_json::to_value(&val)
.expect("CanonicalJsonObj is a valid JsonValue"),
// We want this to stay unique incase the fork comes together?
let mut prev_fork_ids = BTreeSet::new();
let mut missing_fork = vec![];
for id in &ids {
match db
.rooms
.pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())?
{
// We know the state snapshot for this events parents so we can simply auth the
// incoming event and append to DB and append to state if it passes
Some(state_hash) => {
seen = db.rooms.state_full(&state_hash)?;
break 'inner;
}
None => match send_request(
&db.globals,
body.body.origin.clone(),
ruma::api::federation::event::get_event::v1::Request::new(&id),
)
.expect("all ruma pdus are conduit pdus");
// TODO: do we need this
assert_eq!(room_id, &prev_pdu.room_id);
prev_ids.extend(prev_pdu.prev_events.to_vec());
missing.push(prev_pdu);
}
// We can't hard fail because there are some valid errors, just
// keep checking PDU's
//
// As an example a possible error
// {"errcode":"M_FORBIDDEN","error":"Host not in room."}
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));
// We have to give up on this PDU
continue 'outer;
.await
{
Ok(res) => {
let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu);
let prev_pdu = serde_json::from_value::<PduEvent>(
serde_json::to_value(&val)
.expect("CanonicalJsonObj is a valid JsonValue"),
)
.expect("all ruma pdus are conduit pdus");
// TODO: do we need this
assert_eq!(room_id, &prev_pdu.room_id);
for id in &prev_pdu.prev_events {
prev_fork_ids.insert(id.clone());
}
missing_fork.push(prev_pdu);
}
// We can't hard fail because there are some valid errors, just
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));
// We have to give up on this PDU
continue 'outer;
}
},
}
};
}
prev_ids.push(PrevEvents::new(
&prev_fork_ids.into_iter().collect::<Vec<_>>(),
));
missing.push(PrevEvents::new(&missing_fork));
}
// All done finding missing events
None => {
break 'inner;
}
}
}
// Now build up
// Now build up state
let mut state_snapshot = seen
.iter()
.map(|(k, v)| (k.clone(), v.event_id.clone()))
@ -556,124 +634,222 @@ pub async fn send_transaction_message_route<'a>( @@ -556,124 +634,222 @@ pub async fn send_transaction_message_route<'a>(
.map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res()))
.collect::<BTreeMap<_, Arc<state_res::StateEvent>>>();
// TODO: this only accounts for sequentially missing events no holes will be filled
// and I'm still not sure what happens when fork introduces multiple `prev_events`
// and I'm still not sure what happens when a fork introduces multiple `prev_events`
//
// We need to go from oldest (furthest ancestor of the incoming event) to the
// prev_event of the incoming event so we reverse the order oldest -> most recent
for missing_pdu in missing.into_iter().rev() {
// For state events
if missing_pdu.state_key.is_some() {
let missing_pdu = missing_pdu.convert_for_state_res();
match state_res::StateResolution::apply_event(
room_id,
&RoomVersionId::Version6,
missing_pdu.clone(),
&state_snapshot,
Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(true) => {
// TODO: do we need this
assert_eq!(room_id, missing_pdu.room_id());
match missing_pdu {
PrevEvents::Sequential(missing_pdu) => {
// For state events
if missing_pdu.state_key.is_some() {
let missing_pdu = missing_pdu.convert_for_state_res();
match state_res::StateResolution::apply_event(
room_id,
&RoomVersionId::Version6,
missing_pdu.clone(),
&state_snapshot,
Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(true) => {
// TODO: do we need this
assert_eq!(room_id, missing_pdu.room_id());
let count = db.globals.next_count()?;
let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
// // Since we know the state from event to event we can do the
// // slightly more efficient force_state
// db.rooms.force_state(
// room_id,
// state_snapshot
// .iter()
// .map(|(k, v)| {
// (
// k.clone(),
// serde_json::to_vec(
// &db.rooms
// .get_pdu(v)
// .expect("db err")
// .expect("we know of all the pdus"),
// )
// .expect("serde can serialize pdus"),
// )
// })
// .collect(),
// )?;
db.rooms
.append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?;
// Now append the new missing event to DB it will be part of the
// next events state_snapshot
db.rooms.append_pdu(
&PduEvent::from(&*missing_pdu),
&utils::to_canonical_object(&*missing_pdu)
.expect("Pdu is valid canonical object"),
count,
pdu_id.clone().into(),
&db.globals,
&db.account_data,
&db.admin,
)?;
// Only after the state is recorded in the DB can we update the state_snapshot
// This will update the state snapshot so it is correct next loop through
state_snapshot.insert(
(missing_pdu.kind(), missing_pdu.state_key()),
missing_pdu.event_id(),
);
accum_event_map.insert(missing_pdu.event_id(), missing_pdu);
}
Ok(false) => {
error!(
"apply missing: {}",
serde_json::to_string_pretty(&*missing_pdu).unwrap()
);
continue;
}
Err(e) => {
error!("{}", e);
// This is not a fatal error but we do eventually need to handle
// events failing that are not the incoming events
// TODO: what to do when missing events fail (not incoming events)
}
}
// All events between the event we know about and the incoming event need to be accounted
// for
} else {
// TODO: Some auth needs to be done for non state events
if !db
.rooms
.is_joined(&missing_pdu.sender, &missing_pdu.room_id)?
{
error!("Sender is not joined {}", missing_pdu.kind);
// TODO: we probably should not be getting events for different rooms
//
// I think we need to keep going until we reach
// the incoming pdu so do not error here
continue;
}
let count = db.globals.next_count()?;
let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec();
let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
// Since we know the state from event to event we can do the
// slightly more efficient force_state
db.rooms.force_state(
room_id,
state_snapshot
.iter()
.map(|(k, v)| {
(
k.clone(),
serde_json::to_vec(
&db.rooms
.get_pdu(v)
.expect("db err")
.expect("we know of all the pdus"),
)
.expect("serde can serialize pdus"),
)
})
.collect(),
)?;
// Now append the new missing event to DB it will be part of the
// next events state_snapshot
db.rooms.append_to_state(&pdu_id, &missing_pdu)?;
db.rooms.append_pdu(
&PduEvent::from(&*missing_pdu),
&utils::to_canonical_object(&*missing_pdu)
&missing_pdu,
&utils::to_canonical_object(&missing_pdu)
.expect("Pdu is valid canonical object"),
count,
pdu_id.clone().into(),
pdu_id.into(),
&db.globals,
&db.account_data,
&db.admin,
)?;
// Only after the state is recorded in the DB can we update the state_snapshot
// This will update the state snapshot so it is correct next loop through
state_snapshot.insert(
(missing_pdu.kind(), missing_pdu.state_key()),
missing_pdu.event_id(),
);
accum_event_map.insert(missing_pdu.event_id(), missing_pdu);
}
Ok(false) => {
error!("{:?}", missing_pdu);
continue;
}
Err(e) => {
error!("{}", e);
// I think we need to keep going until we reach
// the incoming pdu so do not error here
continue; // The continue is not needed but to remind us that this is not an error
// Continue this inner for loop adding all the missing events
}
}
// All between the event we know about and the incoming event need to be accounted
// for
} else {
// TODO: Some auth needs to be done for non state events
if !db
.rooms
.is_joined(&missing_pdu.sender, &missing_pdu.room_id)?
{
error!("Sender is not joined {}", missing_pdu.kind);
// I think we need to keep going until we reach
// the incoming pdu so do not error here
continue;
PrevEvents::Fork(pdus) => {
for missing_pdu in pdus.into_iter().rev() {
// For state events
if missing_pdu.state_key.is_some() {
let missing_pdu = missing_pdu.convert_for_state_res();
match state_res::StateResolution::apply_event(
room_id,
&RoomVersionId::Version6,
missing_pdu.clone(),
&state_snapshot,
Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(true) => {
// TODO: do we need this
assert_eq!(room_id, missing_pdu.room_id());
let count = db.globals.next_count()?;
let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms
.append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?;
db.rooms.append_pdu(
&PduEvent::from(&*missing_pdu),
&utils::to_canonical_object(&*missing_pdu)
.expect("Pdu is valid canonical object"),
count,
pdu_id.clone().into(),
&db.globals,
&db.account_data,
&db.admin,
)?;
// Only after the state is recorded in the DB can we update the state_snapshot
// This will update the state snapshot so it is correct next loop through
state_snapshot.insert(
(missing_pdu.kind(), missing_pdu.state_key()),
missing_pdu.event_id(),
);
accum_event_map.insert(missing_pdu.event_id(), missing_pdu);
}
Ok(false) => {
error!(
"apply missing fork: {}",
serde_json::to_string_pretty(&*missing_pdu).unwrap()
);
continue;
}
Err(e) => {
error!("fork state-res: {}", e);
// TODO: what to do when missing events fail (not incoming events)
}
}
} else {
// TODO: Some auth needs to be done for non state events
if !db
.rooms
.is_joined(&missing_pdu.sender, &missing_pdu.room_id)?
{
error!("fork Sender is not joined {}", missing_pdu.kind);
// TODO: we probably should not be getting events for different rooms
continue;
}
let count = db.globals.next_count()?;
let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_to_state(&pdu_id, &missing_pdu)?;
db.rooms.append_pdu(
&missing_pdu,
&utils::to_canonical_object(&missing_pdu)
.expect("Pdu is valid canonical object"),
count,
pdu_id.into(),
&db.globals,
&db.account_data,
&db.admin,
)?;
// Continue this inner for loop adding all the missing events
}
}
}
let count = db.globals.next_count()?;
let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_to_state(&pdu_id, &missing_pdu)?;
db.rooms.append_pdu(
&missing_pdu,
&value,
count,
pdu_id.into(),
&db.globals,
&db.account_data,
&db.admin,
)?;
// Continue this inner for loop adding all the missing events
}
}
// Back to the original incoming event
// If it is a non state event we still must add it and create a statehash for it
// If it is a non state event we still must add it and associate a statehash with the pdu_id
if value.get("state_key").is_none() {
// TODO: Some auth needs to be done for non state events
if !db.rooms.is_joined(&pdu.sender, room_id)? {
error!("Sender is not joined {}", pdu.kind);
// I think we need to keep going until we reach
// the incoming pdu so do not error here
resolved_map.insert(event_id, Err("Sender not found in room".into()));
continue;
}
@ -700,6 +876,7 @@ pub async fn send_transaction_message_route<'a>( @@ -700,6 +876,7 @@ pub async fn send_transaction_message_route<'a>(
// the original incoming event is the youngest child and so can be simply authed and append
// to the state
// If we have holes or a fork I am less sure what can be guaranteed about our state?
// Or what must be done to fix holes and forks?
match state_res::StateResolution::apply_event(
room_id,
&RoomVersionId::Version6,
@ -728,7 +905,10 @@ pub async fn send_transaction_message_route<'a>( @@ -728,7 +905,10 @@ pub async fn send_transaction_message_route<'a>(
}
Ok(false) => {
resolved_map.insert(event_id, Err("Failed event auth".into()));
error!("{:?}", pdu);
error!(
"auth failed: {}",
serde_json::to_string_pretty(&pdu).unwrap()
);
}
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));

Loading…
Cancel
Save