diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 6d3a690..c713a9b 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -667,7 +667,7 @@ async fn join_room_by_id_helper( // this is a `state_res::StateEvent` that holds a `ruma::Pdu` let pdu = event_map .get(ev_id) - .expect("Found event_id in sorted events that is not in resolved state"); + .expect("found event_id in sorted events that is not in resolved state"); // We do not rebuild the PDU in this case only insert to DB let count = db.globals.next_count()?; diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 394c7ea..4542c65 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -135,6 +135,29 @@ impl Rooms { .collect::>>() } + /// Returns all state entries we know of from a specific event_id onward. + /// The PDU's are ordered most recent to least recent. + /// + /// This is used to resolve a forward extremity. We have the other servers + /// arm now we need ours. + /// TODO: Probably return Vec> + pub fn state_from(&self, room_id: &RoomId, event_id: &EventId) -> Result> { + let mut last_ids = self.get_pdu_leaves(room_id)?; + let mut collected = vec![]; + while let Some(find_id) = last_ids.pop() { + if event_id == &find_id { + break; + } + + if let Some(pdu) = self.get_pdu(&find_id)? { + last_ids.extend(pdu.prev_events.to_vec()); + collected.push(pdu); + } + } + + Ok(collected) + } + /// Returns all state entries for this type. pub fn state_type( &self, @@ -253,6 +276,36 @@ impl Rooms { .is_some()) } + /// Force the creation of a new StateHash and insert it into the db. This also associates + /// the given `pdu` with the new StateHash. + /// + /// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot. + pub fn force_state_with_pdu( + &self, + new_pdu_id: &[u8], + room_id: &RoomId, + state: HashMap<(EventType, String), Vec>, + ) -> Result<()> { + let state_hash = + self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::>())?; + let mut prefix = state_hash.to_vec(); + prefix.push(0xff); + + for ((event_type, state_key), pdu_id) in state { + let mut state_id = prefix.clone(); + state_id.extend_from_slice(&event_type.as_ref().as_bytes()); + state_id.push(0xff); + state_id.extend_from_slice(&state_key.as_bytes()); + self.stateid_pduid.insert(state_id, pdu_id.to_vec())?; + } + + self.pduid_statehash.insert(new_pdu_id, &*state_hash)?; + self.roomid_statehash + .insert(room_id.as_bytes(), &*state_hash)?; + + Ok(()) + } + /// Force the creation of a new StateHash and insert it into the db. /// /// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot. diff --git a/src/server_server.rs b/src/server_server.rs index 144ea0a..e271891 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -388,12 +388,12 @@ pub async fn get_public_rooms_route( } #[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] -pub enum AuthEvents { +pub enum PrevEvents { Sequential(T), Fork(Vec), } -impl IntoIterator for AuthEvents { +impl IntoIterator for PrevEvents { type Item = T; type IntoIter = std::vec::IntoIter; @@ -405,7 +405,7 @@ impl IntoIterator for AuthEvents { } } -impl AuthEvents { +impl PrevEvents { pub fn new(id: &[T]) -> Self { match id { [] => panic!("All events must have previous event"), @@ -478,15 +478,15 @@ pub async fn send_transaction_message_route<'a>( } let pdu = &body.pdus[pdu_idx]; pdu_idx += 1; - // Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped. + // Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped. // state-res checks signatures - 2. Passes signature checks, otherwise event is dropped. - // 3. Passes hash checks, otherwise it is redacted before being processed further. + // TODO: redact event if hashing fails let (event_id, value) = crate::pdu::process_incoming_pdu(pdu); - let pdu = serde_json::from_value::( + let mut pdu = serde_json::from_value::( serde_json::to_value(&value).expect("CanonicalJsonObj is a valid JsonValue"), ) .expect("all ruma pdus are conduit pdus"); @@ -501,23 +501,31 @@ pub async fn send_transaction_message_route<'a>( // The events that must be resolved to catch up to the incoming event let mut missing = vec![]; let mut seen = state_res::StateMap::new(); + let mut seen_id = None; - let mut prev_ids = vec![AuthEvents::new(&pdu.prev_events)]; + let mut prev_ids = vec![PrevEvents::new(&pdu.prev_events)]; // This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue // in the case of a failed request to the server that sent the event 'inner: loop { - // TODO: if this is ever more than 1 at a time we must do actual - // full state resolution not just auth match prev_ids.pop() { - Some(AuthEvents::Sequential(id)) => match db + Some(PrevEvents::Sequential(id)) => match db .rooms .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? { - // We know the state snapshot for this events parents so we can simply auth the - // incoming event and append to DB and append to state if it passes + // We found a common ancestor Some(state_hash) => { + seen_id = Some(id.clone()); seen = db.rooms.state_full(&state_hash)?; + if let Some(pdu) = db.rooms.get_pdu(&id)? { + if pdu.state_key.is_some() { + // This becomes the state after the common event + seen.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu, + ); + } + } break 'inner; } // We need to fill in information about this event's `prev_events` (parents) @@ -540,8 +548,8 @@ pub async fn send_transaction_message_route<'a>( // TODO: do we need this assert_eq!(room_id, &prev_pdu.room_id); - prev_ids.push(AuthEvents::new(&prev_pdu.prev_events)); - missing.push(AuthEvents::Sequential(prev_pdu)); + prev_ids.push(PrevEvents::new(&prev_pdu.prev_events)); + missing.push(PrevEvents::Sequential(prev_pdu)); } // We can't hard fail because there are some valid errors, just // keep checking PDU's @@ -556,7 +564,7 @@ pub async fn send_transaction_message_route<'a>( }; } }, - Some(AuthEvents::Fork(ids)) => { + Some(PrevEvents::Fork(ids)) => { error!( "prev_events > 1: {}", serde_json::to_string_pretty(&pdu).unwrap() @@ -583,10 +591,19 @@ pub async fn send_transaction_message_route<'a>( .rooms .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? { - // We know the state snapshot for this events parents so we can simply auth the - // incoming event and append to DB and append to state if it passes + // We found a common ancestor Some(state_hash) => { + seen_id = Some(id.clone()); seen = db.rooms.state_full(&state_hash)?; + if let Some(pdu) = db.rooms.get_pdu(&id)? { + if pdu.state_key.is_some() { + // This becomes the state after the common event + seen.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu, + ); + } + } break 'inner; } None => match send_request( @@ -621,10 +638,10 @@ pub async fn send_transaction_message_route<'a>( }, } } - prev_ids.push(AuthEvents::new( + prev_ids.push(PrevEvents::new( &prev_fork_ids.into_iter().collect::>(), )); - missing.push(AuthEvents::new(&missing_fork)); + missing.push(PrevEvents::new(&missing_fork)); } // All done finding missing events None => { @@ -633,49 +650,94 @@ pub async fn send_transaction_message_route<'a>( } } + // We can treat this event as sequential and simply apply it against the current state of the room + // because we know that state + if missing.is_empty() { + // Back to the original incoming event + // If it is a non state event we still must add it and associate a statehash with the pdu_id + if value.get("state_key").is_none() { + // TODO: Some auth needs to be done for non state events + if !db.rooms.is_joined(&pdu.sender, room_id)? { + error!("Sender is not joined {}", pdu.kind); + resolved_map.insert(event_id, Err("Sender not found in room".into())); + continue 'outer; + } + + append_state(&db, &pdu)?; + resolved_map.insert(event_id, Ok::<(), String>(())); + continue 'outer; + } else { + let incoming = pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + incoming.clone(), + &seen + .iter() + .map(|(k, v)| (k.clone(), v.event_id.clone())) + .collect::>(), + Some( + seen.iter() + .map(|(_k, v)| (v.event_id.clone(), v.convert_for_state_res())) + .collect::>(), + ), // TODO: make mut and keep around, this is all the auth events + &db.rooms, + ) { + Ok(true) => { + append_state(&db, &pdu)?; + resolved_map.insert(event_id, Ok::<(), String>(())); + continue 'outer; + } + Ok(false) => { + resolved_map.insert(event_id, Err("Failed event auth".to_string())); + error!("Failed sequential event auth for incoming"); + continue 'outer; + } + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); + continue 'outer; + } + } + } + } + + // Well, now we have to actually do a bunch of work :( + // The steps are as follows + // 1. Rebuild the sending servers forward extremity, ignoring our own fork + // a) iterate "oldest" -> most recent authenticating each event with the state after the previous + // b) build a `snapshot_map` containing the state after the event for each missing event (EventId -> StateMap) + // 2. Build our side of the fork (TODO do we have to re-auth these, is state at an event relative to the server its from) + // 3. resolve the two states (our current with the state after the most recent missing event) + // Now build up state let mut state_snapshot = seen .iter() .map(|(k, v)| (k.clone(), v.event_id.clone())) - .collect(); - let mut accum_event_map = seen + .collect::>(); + + // TODO: So this is super memory inefficient we clone the state_snapshot every time + // we need it for state events and non + let mut snapshot_map = BTreeMap::new(); + snapshot_map.insert(seen_id.clone().unwrap(), state_snapshot.clone()); + + let accum_event_map = seen .iter() .map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) - .collect::>>(); - - if !missing.is_empty() { - // This is the state at incoming pdu - state_snapshot = match state_res::StateResolution::resolve_incoming( - room_id, - &RoomVersionId::Version6, - &state_snapshot, + .chain( missing .iter() .cloned() .flatten() - .filter(|pdu| pdu.state_key.is_some()) // remove non state events - .map(|pdu| ((pdu.kind, pdu.state_key.unwrap()), pdu.event_id)) - .collect(), - Some(accum_event_map), - &db.rooms, - ) { - Ok(res) => res, - Err(err) => { - resolved_map.insert(event_id, Err(err.to_string())); - error!("{}", err); - continue; - } - } - } + .map(|pdu| (pdu.event_id.clone(), pdu.convert_for_state_res())), + ) + .collect::>>(); - // TODO: this only accounts for sequentially missing events no holes will be filled - // and I'm still not sure what happens when a fork introduces multiple `prev_events` - // - // We need to go from oldest (furthest ancestor of the incoming event) to the - // prev_event of the incoming event so we reverse the order oldest -> most recent - for missing_pdu in missing.into_iter().rev() { + // 4. Passes authorization rules based on the event's auth events, otherwise it is rejected. + // 5. Passes authorization rules based on the state at the event, otherwise it is rejected. + for missing_pdu in missing.iter().rev() { match missing_pdu { - AuthEvents::Sequential(missing_pdu) => { + PrevEvents::Sequential(missing_pdu) => { // For state events if missing_pdu.state_key.is_some() { let missing_pdu = missing_pdu.convert_for_state_res(); @@ -691,46 +753,9 @@ pub async fn send_transaction_message_route<'a>( // TODO: do we need this assert_eq!(room_id, missing_pdu.room_id()); - let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - // // Since we know the state from event to event we can do the - // // slightly more efficient force_state - // db.rooms.force_state( - // room_id, - // state_snapshot - // .iter() - // .map(|(k, v)| { - // ( - // k.clone(), - // serde_json::to_vec( - // &db.rooms - // .get_pdu(v) - // .expect("db err") - // .expect("we know of all the pdus"), - // ) - // .expect("serde can serialize pdus"), - // ) - // }) - // .collect(), - // )?; - - db.rooms - .append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?; - // Now append the new missing event to DB it will be part of the - // next events state_snapshot - db.rooms.append_pdu( - &PduEvent::from(&*missing_pdu), - &utils::to_canonical_object(&*missing_pdu) - .expect("Pdu is valid canonical object"), - count, - pdu_id.clone().into(), - &db.globals, - &db.account_data, - &db.admin, - )?; + // We can't add to DB yet since we don't know if it passes + // current room state + // append_state(&db, &PduEvent::from(&*missing_pdu))?; // Only after the state is recorded in the DB can we update the state_snapshot // This will update the state snapshot so it is correct next loop through @@ -738,7 +763,8 @@ pub async fn send_transaction_message_route<'a>( (missing_pdu.kind(), missing_pdu.state_key()), missing_pdu.event_id(), ); - accum_event_map.insert(missing_pdu.event_id(), missing_pdu); + // Keep track of the state after for resolution + snapshot_map.insert(missing_pdu.event_id(), state_snapshot.clone()); } Ok(false) => { error!( @@ -754,200 +780,302 @@ pub async fn send_transaction_message_route<'a>( // TODO: what to do when missing events fail (not incoming events) } } - // All events between the event we know about and the incoming event need to be accounted - // for } else { // TODO: Some auth needs to be done for non state events if !db .rooms .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? { - error!("Sender is not joined {}", missing_pdu.kind); + error!("fork Sender is not joined {}", missing_pdu.kind); // TODO: we probably should not be getting events for different rooms - // - // I think we need to keep going until we reach - // the incoming pdu so do not error here continue; } - let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &missing_pdu)?; - db.rooms.append_pdu( - &missing_pdu, - &utils::to_canonical_object(&missing_pdu) - .expect("Pdu is valid canonical object"), - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; + // TODO: a better way to signal non state events... + snapshot_map.insert(missing_pdu.event_id.clone(), state_snapshot.clone()); // Continue this inner for loop adding all the missing events } } - AuthEvents::Fork(pdus) => { - for missing_pdu in pdus.into_iter().rev() { - // For state events - if missing_pdu.state_key.is_some() { - let missing_pdu = missing_pdu.convert_for_state_res(); - match state_res::StateResolution::apply_event( - room_id, - &RoomVersionId::Version6, - missing_pdu.clone(), - &state_snapshot, - Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? - &db.rooms, - ) { - Ok(true) => { - // TODO: do we need this - assert_eq!(room_id, missing_pdu.room_id()); - - let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms - .append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?; - db.rooms.append_pdu( - &PduEvent::from(&*missing_pdu), - &utils::to_canonical_object(&*missing_pdu) - .expect("Pdu is valid canonical object"), - count, - pdu_id.clone().into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - - // Only after the state is recorded in the DB can we update the state_snapshot - // This will update the state snapshot so it is correct next loop through - state_snapshot.insert( - (missing_pdu.kind(), missing_pdu.state_key()), - missing_pdu.event_id(), - ); - accum_event_map.insert(missing_pdu.event_id(), missing_pdu); + PrevEvents::Fork(pdus) => { + let mut state_sets = BTreeSet::new(); + for pdu in pdus { + let mut state_at = state_snapshot.clone(); + if pdu.state_key.is_some() { + state_at.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu.event_id.clone(), + ); + } + state_sets.insert(state_at); + } + + if state_sets.len() <= 1 { + for missing_pdu in pdus.iter() { + // For state events + if missing_pdu.state_key.is_some() { + let missing_pdu = missing_pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + missing_pdu.clone(), + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + // TODO: do we need this + assert_eq!(room_id, missing_pdu.room_id()); + + state_snapshot.insert( + (missing_pdu.kind(), missing_pdu.state_key()), + missing_pdu.event_id(), + ); + snapshot_map + .insert(missing_pdu.event_id(), state_snapshot.clone()); + } + Ok(false) => { + error!( + "apply missing fork: {}", + serde_json::to_string_pretty(&*missing_pdu).unwrap() + ); + continue; + } + Err(e) => { + error!("fork state-res: {}", e); + // TODO: what to do when missing events fail (not incoming events) + } } - Ok(false) => { - error!( - "apply missing fork: {}", - serde_json::to_string_pretty(&*missing_pdu).unwrap() - ); + } else { + // TODO: Some auth needs to be done for non state events + if !db + .rooms + .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? + { + error!("fork Sender is not joined {}", missing_pdu.kind); + // TODO: we probably should not be getting events for different rooms continue; } - Err(e) => { - error!("fork state-res: {}", e); - // TODO: what to do when missing events fail (not incoming events) + snapshot_map + .insert(missing_pdu.event_id.clone(), state_snapshot.clone()); + // Continue this inner for loop adding all the missing events + } + } + } else { + match state_res::StateResolution::resolve( + room_id, + &RoomVersionId::Version6, + &state_sets.into_iter().collect::>(), + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(resolved) => { + for id in resolved + .values() + .filter(|id| pdus.iter().any(|pduid| pduid.event_id == **id)) + { + snapshot_map.insert(id.clone(), resolved.clone()); } } - } else { - // TODO: Some auth needs to be done for non state events - if !db - .rooms - .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? - { - error!("fork Sender is not joined {}", missing_pdu.kind); - // TODO: we probably should not be getting events for different rooms - continue; + Err(err) => { + error!("{}", err); } - - let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &missing_pdu)?; - db.rooms.append_pdu( - &missing_pdu, - &utils::to_canonical_object(&missing_pdu) - .expect("Pdu is valid canonical object"), - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - // Continue this inner for loop adding all the missing events } } } } } - // Back to the original incoming event - // If it is a non state event we still must add it and associate a statehash with the pdu_id - if value.get("state_key").is_none() { - // TODO: Some auth needs to be done for non state events - if !db.rooms.is_joined(&pdu.sender, room_id)? { - error!("Sender is not joined {}", pdu.kind); - resolved_map.insert(event_id, Err("Sender not found in room".into())); - continue; + // We know these events are valid for each state at the event + // we have already authed them + let known_fork = if let Some(id) = seen_id { + db.rooms.state_from(room_id, &id)? + } else { + vec![] + }; + for pdu in known_fork.clone().into_iter().rev() { + if pdu.state_key.is_some() { + seen.insert((pdu.kind.clone(), pdu.state_key.clone().unwrap()), pdu); } + } - let count = db.globals.next_count()?; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &pdu)?; - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - resolved_map.insert(event_id, Ok::<(), String>(())); - continue; + let current_room_state = seen + .into_iter() + .map(|(k, v)| (k, v.event_id)) + .collect::>(); + + let mut state_sets = BTreeSet::new(); + state_sets.insert(current_room_state.clone()); + + // The first item is the most recent + if let Some(fork) = missing.first() { + for pdu in fork.clone().into_iter() { + if let Some(map) = snapshot_map.get(&pdu.event_id) { + state_sets.insert(map.clone()); + } + } } + let mut state_sets = state_sets.into_iter().collect::>(); + // If we have actual differences resolve + if state_sets.len() > 1 { + // Set the incoming event to have parents from both forks + // ours/theirs + pdu.prev_events = missing + .first() + .cloned() + .into_iter() + .flatten() + .map(|pdu| pdu.event_id) + .chain(known_fork.first().cloned().map(|pdu| pdu.event_id)) + .collect(); + + // The incoming event is a child of both forks now + for set in &mut state_sets { + set.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu.event_id.clone(), + ); + } + + // If we have holes or a fork I am less sure what can be guaranteed about our state? + // Or what must be done to fix holes and forks? + match state_res::StateResolution::resolve( + room_id, + &RoomVersionId::Version6, + &state_sets, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(resolved) if resolved.values().any(|id| id == &event_id) => { + for resolved_missing in missing + .into_iter() + .rev() // We want the oldest pdu's first + .flatten() + .filter(|pdu| resolved.values().any(|res_id| res_id == &pdu.event_id)) + { + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + db.rooms.append_pdu( + &resolved_missing, + &crate::utils::to_canonical_object(&resolved_missing) + .expect("PDU is valid canonical JSON"), + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + } + + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + // Since we know the state we force_state + // saving the incoming pdu's id with our new state + db.rooms.force_state_with_pdu( + &*pdu_id, + room_id, + resolved + .iter() + .map(|(k, v)| { + ( + k.clone(), + serde_json::to_vec( + &db.rooms + .get_pdu(v) + .expect("db err") + .expect("we know of all the pdus"), + ) + .expect("serde can serialize pdus"), + ) + }) + .collect(), + )?; + db.rooms.append_pdu( + &pdu, + &value, + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + resolved_map.insert(event_id, Ok::<(), String>(())); + } + Ok(resolved) => { + for resolved_missing in missing + .into_iter() + .flatten() + .filter(|pdu| resolved.values().any(|res_id| res_id == &pdu.event_id)) + { + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + db.rooms.append_pdu( + &resolved_missing, + &crate::utils::to_canonical_object(&resolved_missing) + .expect("PDU is valid canonical JSON"), + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + } + resolved_map.insert(event_id, Err("Failed event auth".into())); + error!( + "auth failed: {}", + serde_json::to_string_pretty(&pdu).unwrap() + ); + } + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); + } + } // If we have iterated through the incoming missing events sequentially we know that - // the original incoming event is the youngest child and so can be simply authed and append + // the original incoming event is the youngest child and so can be simply authed and appended // to the state - // If we have holes or a fork I am less sure what can be guaranteed about our state? - // Or what must be done to fix holes and forks? - match state_res::StateResolution::apply_event( - room_id, - &RoomVersionId::Version6, - pdu.convert_for_state_res(), // We know this a state event - &state_snapshot, - Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? - &db.rooms, - ) { - Ok(true) => { - let count = db.globals.next_count()?; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &pdu)?; - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; + } else { + // If it is a non state event we still must add it and associate a statehash with the pdu_id + if pdu.state_key.is_none() { + // TODO: Some auth needs to be done for non state events + if !db.rooms.is_joined(&pdu.sender, room_id)? { + error!("Sender is not joined {}", pdu.kind); + resolved_map.insert(event_id, Err("Sender not found in room".into())); + continue 'outer; + } + + append_state(&db, &pdu)?; resolved_map.insert(event_id, Ok::<(), String>(())); - } - Ok(false) => { - resolved_map.insert(event_id, Err("Failed event auth".into())); - error!( - "auth failed: {}", - serde_json::to_string_pretty(&pdu).unwrap() - ); - } - Err(err) => { - resolved_map.insert(event_id, Err(err.to_string())); - error!("{}", err); + } else { + let incoming = pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + incoming, + ¤t_room_state, + Some(accum_event_map), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + append_state(&db, &pdu)?; + resolved_map.insert(event_id, Ok::<(), String>(())); + } + Ok(false) => { + resolved_map.insert(event_id, Err("Failed event auth".to_string())); + error!("Failed sequential event auth for incoming"); + } + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); + } + } } } } @@ -955,6 +1083,24 @@ pub async fn send_transaction_message_route<'a>( Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) } +fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> { + let count = db.globals.next_count()?; + let mut pdu_id = pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_to_state(&pdu_id, pdu)?; + db.rooms.append_pdu( + pdu, + &utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"), + count, + pdu_id.clone().into(), + &db.globals, + &db.account_data, + &db.admin, + ) +} + #[cfg_attr( feature = "conduit_bin", post("/_matrix/federation/v1/get_missing_events/<_>", data = "")