diff --git a/Cargo.lock b/Cargo.lock index 0520b0d..a8fad68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1986,6 +1986,7 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" [[package]] name = "state-res" version = "0.1.0" +source = "git+https://github.com/ruma/state-res#dca71f76eea9f1378a54e96e5fc98d71dfbf5dde" dependencies = [ "itertools", "maplit", diff --git a/Cargo.toml b/Cargo.toml index 757a9b8..1c97bdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", # Used when doing state resolution # state-res = { git = "https://github.com/timokoesters/state-res", branch = "spec-comp", features = ["unstable-pre-spec"] } -state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec", "gen-eventid"] } +state-res = { git = "https://github.com/ruma/state-res", features = ["unstable-pre-spec", "gen-eventid"] } # state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] } # Used for long polling and federation sender, should be the same as rocket::tokio diff --git a/src/database/rooms.rs b/src/database/rooms.rs index a9005f4..9178eeb 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -269,7 +269,7 @@ impl Rooms { pub fn force_state( &self, room_id: &RoomId, - state: &HashMap<(EventType, String), Vec>, + state: HashMap<(EventType, String), Vec>, ) -> Result<()> { let state_hash = self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::>())?; diff --git a/src/pdu.rs b/src/pdu.rs index 75ef492..f6ec415 100644 --- a/src/pdu.rs +++ b/src/pdu.rs @@ -17,7 +17,7 @@ use std::{ time::UNIX_EPOCH, }; -#[derive(Deserialize, Serialize, Debug)] +#[derive(Clone, Deserialize, Serialize, Debug)] pub struct PduEvent { pub event_id: EventId, pub room_id: RoomId, diff --git a/src/server_server.rs b/src/server_server.rs index 047847c..71f731f 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -24,7 +24,7 @@ use ruma::{ EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, }; use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, convert::TryFrom, fmt::Debug, sync::Arc, @@ -390,6 +390,22 @@ pub async fn get_public_rooms_route( .into()) } +#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum PrevEvents { + Sequential(T), + Fork(Vec), +} + +impl PrevEvents { + pub fn new(id: &[T]) -> Self { + match id { + [] => panic!("All events must have previous event"), + [single_id] => Self::Sequential(single_id.clone()), + rest => Self::Fork(rest.to_vec()), + } + } +} + #[cfg_attr( feature = "conduit_bin", put("/_matrix/federation/v1/send/<_>", data = "") @@ -475,78 +491,140 @@ pub async fn send_transaction_message_route<'a>( // The events that must be resolved to catch up to the incoming event let mut missing = vec![]; - let mut seen = BTreeMap::new(); - - let mut prev_ids = pdu.prev_events.to_vec(); - // Don't kill our server with state-res - // TODO: set this at a reasonable level this is for debug/wip purposes - if prev_ids.len() > 5 { - resolved_map.insert( - event_id, - Err("Event has abnormally large prev_events count".into()), - ); - continue; - } + let mut seen = state_res::StateMap::new(); - // This is `while let Some(event_id) = prev_ids.pop()` but with a fancy continue + let mut prev_ids = vec![PrevEvents::new(&pdu.prev_events)]; + + // This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue // in the case of a failed request to the server that sent the event 'inner: loop { - let id = if let Some(id) = prev_ids.pop() { - id - } else { - break 'inner; - }; + // TODO: if this is ever more that 1 at a time we must do actual + // full state resolution not just auth + match prev_ids.pop() { + Some(PrevEvents::Sequential(id)) => match db + .rooms + .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? + { + // We know the state snapshot for this events parents so we can simply auth the + // incoming event and append to DB and append to state if it passes + Some(state_hash) => { + seen = db.rooms.state_full(&state_hash)?; + break 'inner; + } + // We need to fill in information about this event's `prev_events` (parents) + None => { + match send_request( + &db.globals, + body.body.origin.clone(), + ruma::api::federation::event::get_event::v1::Request::new(&id), + ) + .await + { + Ok(res) => { + let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu); + let prev_pdu = serde_json::from_value::( + serde_json::to_value(&val) + .expect("CanonicalJsonObj is a valid JsonValue"), + ) + .expect("all ruma pdus are conduit pdus"); + + // TODO: do we need this + assert_eq!(room_id, &prev_pdu.room_id); + + prev_ids.push(PrevEvents::new(&prev_pdu.prev_events)); + missing.push(PrevEvents::Sequential(prev_pdu)); + } + // We can't hard fail because there are some valid errors, just + // keep checking PDU's + // + // As an example a possible error + // {"errcode":"M_FORBIDDEN","error":"Host not in room."} + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + // We have to give up on this PDU + continue 'outer; + } + }; + } + }, + Some(PrevEvents::Fork(ids)) => { + error!( + "prev_events > 1: {}", + serde_json::to_string_pretty(&pdu).unwrap() + ); + // Don't kill our server with state-res + // TODO: set this at a reasonable level this is for debug/wip purposes + if ids.len() > 5 { + error!( + "prev_events > 1: {}", + serde_json::to_string_pretty(&pdu).unwrap() + ); + resolved_map.insert( + event_id, + Err("Previous events are too large for state-res".into()), + ); + continue 'outer; + } - match db - .rooms - .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? - { - // We know the state snapshot for this events parents so we can simply auth the - // incoming event and append to DB and append to state if it passes - Some(state_hash) => { - seen = db.rooms.state_full(&state_hash)?; - break 'inner; - } - // We need to fill in information about this event's `prev_events` (parents) - None => { - // We have a state event so we need info for state-res - match send_request( - &db.globals, - body.body.origin.clone(), - ruma::api::federation::event::get_event::v1::Request::new(&id), - ) - .await - { - Ok(res) => { - let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu); - let prev_pdu = serde_json::from_value::( - serde_json::to_value(&val) - .expect("CanonicalJsonObj is a valid JsonValue"), + // We want this to stay unique incase the fork comes together? + let mut prev_fork_ids = BTreeSet::new(); + let mut missing_fork = vec![]; + for id in &ids { + match db + .rooms + .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? + { + // We know the state snapshot for this events parents so we can simply auth the + // incoming event and append to DB and append to state if it passes + Some(state_hash) => { + seen = db.rooms.state_full(&state_hash)?; + break 'inner; + } + None => match send_request( + &db.globals, + body.body.origin.clone(), + ruma::api::federation::event::get_event::v1::Request::new(&id), ) - .expect("all ruma pdus are conduit pdus"); - - // TODO: do we need this - assert_eq!(room_id, &prev_pdu.room_id); - - prev_ids.extend(prev_pdu.prev_events.to_vec()); - missing.push(prev_pdu); - } - // We can't hard fail because there are some valid errors, just - // keep checking PDU's - // - // As an example a possible error - // {"errcode":"M_FORBIDDEN","error":"Host not in room."} - Err(err) => { - resolved_map.insert(event_id, Err(err.to_string())); - // We have to give up on this PDU - continue 'outer; + .await + { + Ok(res) => { + let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu); + let prev_pdu = serde_json::from_value::( + serde_json::to_value(&val) + .expect("CanonicalJsonObj is a valid JsonValue"), + ) + .expect("all ruma pdus are conduit pdus"); + + // TODO: do we need this + assert_eq!(room_id, &prev_pdu.room_id); + + for id in &prev_pdu.prev_events { + prev_fork_ids.insert(id.clone()); + } + missing_fork.push(prev_pdu); + } + // We can't hard fail because there are some valid errors, just + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + // We have to give up on this PDU + continue 'outer; + } + }, } - }; + } + prev_ids.push(PrevEvents::new( + &prev_fork_ids.into_iter().collect::>(), + )); + missing.push(PrevEvents::new(&missing_fork)); + } + // All done finding missing events + None => { + break 'inner; } } } - // Now build up + // Now build up state let mut state_snapshot = seen .iter() .map(|(k, v)| (k.clone(), v.event_id.clone())) @@ -556,124 +634,222 @@ pub async fn send_transaction_message_route<'a>( .map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) .collect::>>(); // TODO: this only accounts for sequentially missing events no holes will be filled - // and I'm still not sure what happens when fork introduces multiple `prev_events` + // and I'm still not sure what happens when a fork introduces multiple `prev_events` // // We need to go from oldest (furthest ancestor of the incoming event) to the // prev_event of the incoming event so we reverse the order oldest -> most recent for missing_pdu in missing.into_iter().rev() { - // For state events - if missing_pdu.state_key.is_some() { - let missing_pdu = missing_pdu.convert_for_state_res(); - match state_res::StateResolution::apply_event( - room_id, - &RoomVersionId::Version6, - missing_pdu.clone(), - &state_snapshot, - Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? - &db.rooms, - ) { - Ok(true) => { - // TODO: do we need this - assert_eq!(room_id, missing_pdu.room_id()); + match missing_pdu { + PrevEvents::Sequential(missing_pdu) => { + // For state events + if missing_pdu.state_key.is_some() { + let missing_pdu = missing_pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + missing_pdu.clone(), + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + // TODO: do we need this + assert_eq!(room_id, missing_pdu.room_id()); + + let count = db.globals.next_count()?; + let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + // // Since we know the state from event to event we can do the + // // slightly more efficient force_state + // db.rooms.force_state( + // room_id, + // state_snapshot + // .iter() + // .map(|(k, v)| { + // ( + // k.clone(), + // serde_json::to_vec( + // &db.rooms + // .get_pdu(v) + // .expect("db err") + // .expect("we know of all the pdus"), + // ) + // .expect("serde can serialize pdus"), + // ) + // }) + // .collect(), + // )?; + + db.rooms + .append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?; + // Now append the new missing event to DB it will be part of the + // next events state_snapshot + db.rooms.append_pdu( + &PduEvent::from(&*missing_pdu), + &utils::to_canonical_object(&*missing_pdu) + .expect("Pdu is valid canonical object"), + count, + pdu_id.clone().into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + + // Only after the state is recorded in the DB can we update the state_snapshot + // This will update the state snapshot so it is correct next loop through + state_snapshot.insert( + (missing_pdu.kind(), missing_pdu.state_key()), + missing_pdu.event_id(), + ); + accum_event_map.insert(missing_pdu.event_id(), missing_pdu); + } + Ok(false) => { + error!( + "apply missing: {}", + serde_json::to_string_pretty(&*missing_pdu).unwrap() + ); + continue; + } + Err(e) => { + error!("{}", e); + // This is not a fatal error but we do eventually need to handle + // events failing that are not the incoming events + // TODO: what to do when missing events fail (not incoming events) + } + } + // All events between the event we know about and the incoming event need to be accounted + // for + } else { + // TODO: Some auth needs to be done for non state events + if !db + .rooms + .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? + { + error!("Sender is not joined {}", missing_pdu.kind); + // TODO: we probably should not be getting events for different rooms + // + // I think we need to keep going until we reach + // the incoming pdu so do not error here + continue; + } let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); + let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); - // Since we know the state from event to event we can do the - // slightly more efficient force_state - db.rooms.force_state( - room_id, - state_snapshot - .iter() - .map(|(k, v)| { - ( - k.clone(), - serde_json::to_vec( - &db.rooms - .get_pdu(v) - .expect("db err") - .expect("we know of all the pdus"), - ) - .expect("serde can serialize pdus"), - ) - }) - .collect(), - )?; - // Now append the new missing event to DB it will be part of the - // next events state_snapshot + db.rooms.append_to_state(&pdu_id, &missing_pdu)?; db.rooms.append_pdu( - &PduEvent::from(&*missing_pdu), - &utils::to_canonical_object(&*missing_pdu) + &missing_pdu, + &utils::to_canonical_object(&missing_pdu) .expect("Pdu is valid canonical object"), count, - pdu_id.clone().into(), + pdu_id.into(), &db.globals, &db.account_data, &db.admin, )?; - - // Only after the state is recorded in the DB can we update the state_snapshot - // This will update the state snapshot so it is correct next loop through - state_snapshot.insert( - (missing_pdu.kind(), missing_pdu.state_key()), - missing_pdu.event_id(), - ); - accum_event_map.insert(missing_pdu.event_id(), missing_pdu); - } - Ok(false) => { - error!("{:?}", missing_pdu); - continue; - } - Err(e) => { - error!("{}", e); - // I think we need to keep going until we reach - // the incoming pdu so do not error here - continue; // The continue is not needed but to remind us that this is not an error + // Continue this inner for loop adding all the missing events } } - // All between the event we know about and the incoming event need to be accounted - // for - } else { - // TODO: Some auth needs to be done for non state events - if !db - .rooms - .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? - { - error!("Sender is not joined {}", missing_pdu.kind); - // I think we need to keep going until we reach - // the incoming pdu so do not error here - continue; + PrevEvents::Fork(pdus) => { + for missing_pdu in pdus.into_iter().rev() { + // For state events + if missing_pdu.state_key.is_some() { + let missing_pdu = missing_pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + missing_pdu.clone(), + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + // TODO: do we need this + assert_eq!(room_id, missing_pdu.room_id()); + + let count = db.globals.next_count()?; + let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms + .append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?; + db.rooms.append_pdu( + &PduEvent::from(&*missing_pdu), + &utils::to_canonical_object(&*missing_pdu) + .expect("Pdu is valid canonical object"), + count, + pdu_id.clone().into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + + // Only after the state is recorded in the DB can we update the state_snapshot + // This will update the state snapshot so it is correct next loop through + state_snapshot.insert( + (missing_pdu.kind(), missing_pdu.state_key()), + missing_pdu.event_id(), + ); + accum_event_map.insert(missing_pdu.event_id(), missing_pdu); + } + Ok(false) => { + error!( + "apply missing fork: {}", + serde_json::to_string_pretty(&*missing_pdu).unwrap() + ); + continue; + } + Err(e) => { + error!("fork state-res: {}", e); + // TODO: what to do when missing events fail (not incoming events) + } + } + } else { + // TODO: Some auth needs to be done for non state events + if !db + .rooms + .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? + { + error!("fork Sender is not joined {}", missing_pdu.kind); + // TODO: we probably should not be getting events for different rooms + continue; + } + + let count = db.globals.next_count()?; + let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_to_state(&pdu_id, &missing_pdu)?; + db.rooms.append_pdu( + &missing_pdu, + &utils::to_canonical_object(&missing_pdu) + .expect("Pdu is valid canonical object"), + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + // Continue this inner for loop adding all the missing events + } + } } - - let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &missing_pdu)?; - db.rooms.append_pdu( - &missing_pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - // Continue this inner for loop adding all the missing events } } // Back to the original incoming event - // If it is a non state event we still must add it and create a statehash for it + // If it is a non state event we still must add it and associate a statehash with the pdu_id if value.get("state_key").is_none() { // TODO: Some auth needs to be done for non state events if !db.rooms.is_joined(&pdu.sender, room_id)? { error!("Sender is not joined {}", pdu.kind); - // I think we need to keep going until we reach - // the incoming pdu so do not error here + resolved_map.insert(event_id, Err("Sender not found in room".into())); continue; } @@ -700,6 +876,7 @@ pub async fn send_transaction_message_route<'a>( // the original incoming event is the youngest child and so can be simply authed and append // to the state // If we have holes or a fork I am less sure what can be guaranteed about our state? + // Or what must be done to fix holes and forks? match state_res::StateResolution::apply_event( room_id, &RoomVersionId::Version6, @@ -728,7 +905,10 @@ pub async fn send_transaction_message_route<'a>( } Ok(false) => { resolved_map.insert(event_id, Err("Failed event auth".into())); - error!("{:?}", pdu); + error!( + "auth failed: {}", + serde_json::to_string_pretty(&pdu).unwrap() + ); } Err(err) => { resolved_map.insert(event_id, Err(err.to_string()));