diff --git a/Cargo.lock b/Cargo.lock index 5062b8c..a5cdedb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,15 +21,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" -[[package]] -name = "ansi_term" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi 0.3.9", -] - [[package]] name = "arrayref" version = "0.3.6" @@ -182,19 +173,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "chrono" -version = "0.4.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" -dependencies = [ - "libc", - "num-integer", - "num-traits", - "time 0.1.44", - "winapi 0.3.9", -] - [[package]] name = "color_quant" version = "1.1.0" @@ -254,7 +232,7 @@ version = "0.15.0-dev" source = "git+https://github.com/SergioBenitez/cookie-rs.git?rev=1c3ca83#1c3ca838543b60a4448d279dc4b903cc7a2bc22a" dependencies = [ "percent-encoding", - "time 0.2.23", + "time", "version_check", ] @@ -575,19 +553,6 @@ dependencies = [ "byteorder", ] -[[package]] -name = "generator" -version = "0.6.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cdc09201b2e8ca1b19290cf7e65de2246b8e91fb6874279722189c4de7b94dc" -dependencies = [ - "cc", - "libc", - "log", - "rustc_version", - "winapi 0.3.9", -] - [[package]] name = "getrandom" version = "0.1.15" @@ -596,7 +561,7 @@ checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6" dependencies = [ "cfg-if 0.1.10", "libc", - "wasi 0.9.0+wasi-snapshot-preview1", + "wasi", ] [[package]] @@ -886,9 +851,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614" +checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" [[package]] name = "linked-hash-map" @@ -914,19 +879,6 @@ dependencies = [ "cfg-if 0.1.10", ] -[[package]] -name = "loom" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed" -dependencies = [ - "cfg-if 0.1.10", - "generator", - "scoped-tls", - "serde", - "serde_json", -] - [[package]] name = "lru-cache" version = "0.1.2" @@ -948,15 +900,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" -[[package]] -name = "matchers" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" -dependencies = [ - "regex-automata", -] - [[package]] name = "matches" version = "0.1.8" @@ -1468,31 +1411,6 @@ dependencies = [ "syn", ] -[[package]] -name = "regex" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c" -dependencies = [ - "regex-syntax", -] - -[[package]] -name = "regex-automata" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" -dependencies = [ - "byteorder", - "regex-syntax", -] - -[[package]] -name = "regex-syntax" -version = "0.6.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" - [[package]] name = "remove_dir_all" version = "0.5.3" @@ -1585,7 +1503,7 @@ dependencies = [ "rocket_http", "serde", "state", - "time 0.2.23", + "time", "tokio", "ubyte", "version_check", @@ -1622,7 +1540,7 @@ dependencies = [ "ref-cast", "smallvec", "state", - "time 0.2.23", + "time", "tokio", "tokio-rustls", "uncased", @@ -1633,7 +1551,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.0.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "assign", "js_int", @@ -1651,7 +1569,7 @@ dependencies = [ [[package]] name = "ruma-api" version = "0.17.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "http", "percent-encoding", @@ -1666,7 +1584,7 @@ dependencies = [ [[package]] name = "ruma-api-macros" version = "0.17.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1677,7 +1595,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.2.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "ruma-api", "ruma-common", @@ -1691,7 +1609,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.10.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "assign", "http", @@ -1710,7 +1628,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.2.0" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "maplit", @@ -1723,7 +1641,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.22.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "ruma-common", @@ -1737,7 +1655,7 @@ dependencies = [ [[package]] name = "ruma-events-macros" version = "0.22.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1748,7 +1666,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.0.3" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "ruma-api", @@ -1763,7 +1681,7 @@ dependencies = [ [[package]] name = "ruma-identifiers" version = "0.17.4" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "paste", "rand", @@ -1777,7 +1695,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-macros" version = "0.17.4" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro2", "quote", @@ -1788,7 +1706,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.1.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "serde", ] @@ -1796,7 +1714,7 @@ dependencies = [ [[package]] name = "ruma-serde" version = "0.2.3" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "form_urlencoded", "itoa", @@ -1809,7 +1727,7 @@ dependencies = [ [[package]] name = "ruma-serde-macros" version = "0.2.0" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1820,7 +1738,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.6.0-dev.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "base64 0.12.3", "ring", @@ -1948,18 +1866,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.117" +version = "1.0.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a" +checksum = "06c64263859d87aa2eb554587e2d23183398d617427327cf2b3d0ed8c69e4800" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.117" +version = "1.0.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e" +checksum = "c84d3526699cd55261af4b941e4e725444df67aa4f9e6a3564f18030d12672df" dependencies = [ "proc-macro2", "quote", @@ -1995,16 +1913,6 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" -[[package]] -name = "sharded-slab" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4921be914e16899a80adefb821f8ddb7974e3f1250223575a44ed994882127" -dependencies = [ - "lazy_static", - "loom", -] - [[package]] name = "signal-hook-registry" version = "1.2.2" @@ -2078,17 +1986,15 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" [[package]] name = "state-res" version = "0.1.0" -source = "git+https://github.com/ruma/state-res?branch=timo-spec-comp#99214e6fa6b9843b0d9e1f6ef0698d7fdb234fb2" +source = "git+https://github.com/ruma/state-res?branch=conflict#e4ba824806e7b780c23ca8120b57c5a7e4ab787d" dependencies = [ "itertools", - "js_int", "maplit", "ruma", "serde", "serde_json", "thiserror", "tracing", - "tracing-subscriber", ] [[package]] @@ -2206,26 +2112,6 @@ dependencies = [ "syn", ] -[[package]] -name = "thread_local" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" -dependencies = [ - "lazy_static", -] - -[[package]] -name = "time" -version = "0.1.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi 0.3.9", -] - [[package]] name = "time" version = "0.2.23" @@ -2407,49 +2293,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tracing-log" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-serde" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b" -dependencies = [ - "serde", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1fa8f0c8f4c594e4fc9debc1990deab13238077271ba84dd853d54902ee3401" -dependencies = [ - "ansi_term", - "chrono", - "lazy_static", - "matchers", - "regex", - "serde", - "serde_json", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", - "tracing-core", - "tracing-log", - "tracing-serde", -] - [[package]] name = "trust-dns-proto" version = "0.19.6" @@ -2599,12 +2442,6 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasm-bindgen" version = "0.2.69" diff --git a/Cargo.toml b/Cargo.toml index b1dec17..8d94770 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,13 +18,13 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "1f1f44f33 #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } # Used for matrix spec type definitions and helpers -ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "e8882fe8142d7b55ed4c8ccc6150946945f9e237" } +ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "ee814aa84934530d76f5e4b275d739805b49bdef" } # ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "unstable-join" } # ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] } # Used when doing state resolution # state-res = { git = "https://github.com/timokoesters/state-res", branch = "spec-comp", features = ["unstable-pre-spec"] } -state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec", "gen-eventid"] } +state-res = { git = "https://github.com/ruma/state-res", branch = "conflict", features = ["unstable-pre-spec", "gen-eventid"] } # state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] } # Used for long polling and federation sender, should be the same as rocket::tokio diff --git a/src/client_server/media.rs b/src/client_server/media.rs index e6bd182..f31bff1 100644 --- a/src/client_server/media.rs +++ b/src/client_server/media.rs @@ -45,7 +45,8 @@ pub async fn create_content_route( db.flush().await?; - Ok(create_content::Response { content_uri: mxc }.into()) + // TODO: blurhash is the other field not sure if that is important + Ok(create_content::Response::new(mxc).into()) } #[cfg_attr( diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 6d3a690..c713a9b 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -667,7 +667,7 @@ async fn join_room_by_id_helper( // this is a `state_res::StateEvent` that holds a `ruma::Pdu` let pdu = event_map .get(ev_id) - .expect("Found event_id in sorted events that is not in resolved state"); + .expect("found event_id in sorted events that is not in resolved state"); // We do not rebuild the PDU in this case only insert to DB let count = db.globals.next_count()?; diff --git a/src/database/rooms.rs b/src/database/rooms.rs index fb139a6..4542c65 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -36,16 +36,6 @@ use super::admin::AdminCommand; /// hashing the entire state. pub type StateHashId = IVec; -/// An enum that represents the two valid states when searching -/// for an events "parent". -/// -/// An events parent is any event we are aware of that is part of -/// the events `prev_events` array. -pub(crate) enum ClosestParent { - Append, - Insert(u64), -} - #[derive(Clone)] pub struct Rooms { pub edus: edus::RoomEdus, @@ -145,6 +135,29 @@ impl Rooms { .collect::>>() } + /// Returns all state entries we know of from a specific event_id onward. + /// The PDU's are ordered most recent to least recent. + /// + /// This is used to resolve a forward extremity. We have the other servers + /// arm now we need ours. + /// TODO: Probably return Vec> + pub fn state_from(&self, room_id: &RoomId, event_id: &EventId) -> Result> { + let mut last_ids = self.get_pdu_leaves(room_id)?; + let mut collected = vec![]; + while let Some(find_id) = last_ids.pop() { + if event_id == &find_id { + break; + } + + if let Some(pdu) = self.get_pdu(&find_id)? { + last_ids.extend(pdu.prev_events.to_vec()); + collected.push(pdu); + } + } + + Ok(collected) + } + /// Returns all state entries for this type. pub fn state_type( &self, @@ -205,7 +218,7 @@ impl Rooms { }) } - /// Returns the last state hash key added to the db. + /// Returns the state hash key for the given pdu. pub fn pdu_state_hash(&self, pdu_id: &[u8]) -> Result> { Ok(self.pduid_statehash.get(pdu_id)?) } @@ -263,6 +276,36 @@ impl Rooms { .is_some()) } + /// Force the creation of a new StateHash and insert it into the db. This also associates + /// the given `pdu` with the new StateHash. + /// + /// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot. + pub fn force_state_with_pdu( + &self, + new_pdu_id: &[u8], + room_id: &RoomId, + state: HashMap<(EventType, String), Vec>, + ) -> Result<()> { + let state_hash = + self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::>())?; + let mut prefix = state_hash.to_vec(); + prefix.push(0xff); + + for ((event_type, state_key), pdu_id) in state { + let mut state_id = prefix.clone(); + state_id.extend_from_slice(&event_type.as_ref().as_bytes()); + state_id.push(0xff); + state_id.extend_from_slice(&state_key.as_bytes()); + self.stateid_pduid.insert(state_id, pdu_id.to_vec())?; + } + + self.pduid_statehash.insert(new_pdu_id, &*state_hash)?; + self.roomid_statehash + .insert(room_id.as_bytes(), &*state_hash)?; + + Ok(()) + } + /// Force the creation of a new StateHash and insert it into the db. /// /// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot. @@ -281,7 +324,7 @@ impl Rooms { state_id.extend_from_slice(&event_type.as_ref().as_bytes()); state_id.push(0xff); state_id.extend_from_slice(&state_key.as_bytes()); - self.stateid_pduid.insert(state_id, pdu_id)?; + self.stateid_pduid.insert(state_id, pdu_id.to_vec())?; } self.roomid_statehash @@ -411,54 +454,6 @@ impl Rooms { } } - /// Recursively search for a PDU from our DB that is also in the - /// `prev_events` field of the incoming PDU. - /// - /// First we check if the last PDU inserted to the given room is a parent - /// if not we recursively check older `prev_events` to insert the incoming - /// event after. - pub(crate) fn get_latest_pduid_before( - &self, - room: &RoomId, - incoming_prev_ids: &[EventId], - their_state: &BTreeMap>, - ) -> Result> { - match self.pduid_pdu.scan_prefix(room.as_bytes()).last() { - Some(Ok(val)) - if incoming_prev_ids.contains( - &serde_json::from_slice::(&val.1) - .map_err(|_| { - Error::bad_database("last DB entry contains invalid PDU bytes") - })? - .event_id, - ) => - { - Ok(Some(ClosestParent::Append)) - } - _ => { - let mut prev_ids = incoming_prev_ids.to_vec(); - while let Some(id) = prev_ids.pop() { - match self.get_pdu_id(&id)? { - Some(pdu_id) => { - return Ok(Some(ClosestParent::Insert(self.pdu_count(&pdu_id)?))); - } - None => { - prev_ids.extend(their_state.get(&id).map_or( - Err(Error::BadServerResponse( - "Failed to find previous event for PDU in state", - )), - // `prev_event_ids` will return an empty Vec instead of failing - // so it works perfect for our use here - |pdu| Ok(pdu.prev_event_ids()), - )?); - } - } - } - Ok(None) - } - } - } - /// Returns the leaf pdus of a room. pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result> { let mut prefix = room_id.as_bytes().to_vec(); diff --git a/src/pdu.rs b/src/pdu.rs index 75ef492..f6ec415 100644 --- a/src/pdu.rs +++ b/src/pdu.rs @@ -17,7 +17,7 @@ use std::{ time::UNIX_EPOCH, }; -#[derive(Deserialize, Serialize, Debug)] +#[derive(Clone, Deserialize, Serialize, Debug)] pub struct PduEvent { pub event_id: EventId, pub room_id: RoomId, diff --git a/src/server_server.rs b/src/server_server.rs index da046d3..e271891 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1,7 +1,4 @@ -use crate::{ - client_server, database::rooms::ClosestParent, utils, ConduitResult, Database, Error, PduEvent, - Result, Ruma, -}; +use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma}; use get_profile_information::v1::ProfileField; use http::header::{HeaderValue, AUTHORIZATION, HOST}; use log::{error, warn}; @@ -21,10 +18,10 @@ use ruma::{ OutgoingRequest, }, directory::{IncomingFilter, IncomingRoomNetwork}, - EventId, RoomId, ServerName, ServerSigningKeyId, UserId, + EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, }; use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, convert::TryFrom, fmt::Debug, sync::Arc, @@ -390,6 +387,34 @@ pub async fn get_public_rooms_route( .into()) } +#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum PrevEvents { + Sequential(T), + Fork(Vec), +} + +impl IntoIterator for PrevEvents { + type Item = T; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + match self { + Self::Sequential(item) => vec![item].into_iter(), + Self::Fork(list) => list.into_iter(), + } + } +} + +impl PrevEvents { + pub fn new(id: &[T]) -> Self { + match id { + [] => panic!("All events must have previous event"), + [single_id] => Self::Sequential(single_id.clone()), + rest => Self::Fork(rest.to_vec()), + } + } +} + #[cfg_attr( feature = "conduit_bin", put("/_matrix/federation/v1/send/<_>", data = "") @@ -445,16 +470,23 @@ pub async fn send_transaction_message_route<'a>( // discard the event whereas the Client Server API's /send/{eventType} endpoint // would return a M_BAD_JSON error. let mut resolved_map = BTreeMap::new(); - for pdu in &body.pdus { - // Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped. + let mut pdu_idx = 0; + // This is `for pdu in &body.pdus` but we need to do fancy continue/break so we need loop labels + 'outer: loop { + if pdu_idx == body.pdus.len() { + break 'outer; + } + let pdu = &body.pdus[pdu_idx]; + pdu_idx += 1; + // Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped. // state-res checks signatures - 2. Passes signature checks, otherwise event is dropped. - // 3. Passes hash checks, otherwise it is redacted before being processed further. + // TODO: redact event if hashing fails let (event_id, value) = crate::pdu::process_incoming_pdu(pdu); - let pdu = serde_json::from_value::( + let mut pdu = serde_json::from_value::( serde_json::to_value(&value).expect("CanonicalJsonObj is a valid JsonValue"), ) .expect("all ruma pdus are conduit pdus"); @@ -466,128 +498,469 @@ pub async fn send_transaction_message_route<'a>( continue; } - // If it is not a state event, we can skip state-res... maybe - if value.get("state_key").is_none() { - if !db.rooms.is_joined(&pdu.sender, room_id)? { - warn!("Sender is not joined {}", pdu.kind); - resolved_map.insert(event_id, Err("User is not in this room".into())); - continue; - } - - let count = db.globals.next_count()?; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); + // The events that must be resolved to catch up to the incoming event + let mut missing = vec![]; + let mut seen = state_res::StateMap::new(); + let mut seen_id = None; + + let mut prev_ids = vec![PrevEvents::new(&pdu.prev_events)]; + + // This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue + // in the case of a failed request to the server that sent the event + 'inner: loop { + match prev_ids.pop() { + Some(PrevEvents::Sequential(id)) => match db + .rooms + .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? + { + // We found a common ancestor + Some(state_hash) => { + seen_id = Some(id.clone()); + seen = db.rooms.state_full(&state_hash)?; + if let Some(pdu) = db.rooms.get_pdu(&id)? { + if pdu.state_key.is_some() { + // This becomes the state after the common event + seen.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu, + ); + } + } + break 'inner; + } + // We need to fill in information about this event's `prev_events` (parents) + None => { + match send_request( + &db.globals, + body.body.origin.clone(), + ruma::api::federation::event::get_event::v1::Request::new(&id), + ) + .await + { + Ok(res) => { + let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu); + let prev_pdu = serde_json::from_value::( + serde_json::to_value(&val) + .expect("CanonicalJsonObj is a valid JsonValue"), + ) + .expect("all ruma pdus are conduit pdus"); + + // TODO: do we need this + assert_eq!(room_id, &prev_pdu.room_id); + + prev_ids.push(PrevEvents::new(&prev_pdu.prev_events)); + missing.push(PrevEvents::Sequential(prev_pdu)); + } + // We can't hard fail because there are some valid errors, just + // keep checking PDU's + // + // As an example a possible error + // {"errcode":"M_FORBIDDEN","error":"Host not in room."} + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + // We have to give up on this PDU + continue 'outer; + } + }; + } + }, + Some(PrevEvents::Fork(ids)) => { + error!( + "prev_events > 1: {}", + serde_json::to_string_pretty(&pdu).unwrap() + ); + // Don't kill our server with state-res + // TODO: set this at a reasonable level this is for debug/wip purposes + if ids.len() > 5 { + error!( + "prev_events > 1: {}", + serde_json::to_string_pretty(&pdu).unwrap() + ); + resolved_map.insert( + event_id, + Err("Previous events are too large for state-res".into()), + ); + continue 'outer; + } - db.rooms.append_to_state(&pdu_id, &pdu)?; + // We want this to stay unique incase the fork comes together? + let mut prev_fork_ids = BTreeSet::new(); + let mut missing_fork = vec![]; + for id in &ids { + match db + .rooms + .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? + { + // We found a common ancestor + Some(state_hash) => { + seen_id = Some(id.clone()); + seen = db.rooms.state_full(&state_hash)?; + if let Some(pdu) = db.rooms.get_pdu(&id)? { + if pdu.state_key.is_some() { + // This becomes the state after the common event + seen.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu, + ); + } + } + break 'inner; + } + None => match send_request( + &db.globals, + body.body.origin.clone(), + ruma::api::federation::event::get_event::v1::Request::new(&id), + ) + .await + { + Ok(res) => { + let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu); + let prev_pdu = serde_json::from_value::( + serde_json::to_value(&val) + .expect("CanonicalJsonObj is a valid JsonValue"), + ) + .expect("all ruma pdus are conduit pdus"); + + // TODO: do we need this + assert_eq!(room_id, &prev_pdu.room_id); + + for id in &prev_pdu.prev_events { + prev_fork_ids.insert(id.clone()); + } + missing_fork.push(prev_pdu); + } + // We can't hard fail because there are some valid errors, just + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + // We have to give up on this PDU + continue 'outer; + } + }, + } + } + prev_ids.push(PrevEvents::new( + &prev_fork_ids.into_iter().collect::>(), + )); + missing.push(PrevEvents::new(&missing_fork)); + } + // All done finding missing events + None => { + break 'inner; + } + } + } - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; + // We can treat this event as sequential and simply apply it against the current state of the room + // because we know that state + if missing.is_empty() { + // Back to the original incoming event + // If it is a non state event we still must add it and associate a statehash with the pdu_id + if value.get("state_key").is_none() { + // TODO: Some auth needs to be done for non state events + if !db.rooms.is_joined(&pdu.sender, room_id)? { + error!("Sender is not joined {}", pdu.kind); + resolved_map.insert(event_id, Err("Sender not found in room".into())); + continue 'outer; + } - resolved_map.insert(event_id, Ok::<(), String>(())); - continue; + append_state(&db, &pdu)?; + resolved_map.insert(event_id, Ok::<(), String>(())); + continue 'outer; + } else { + let incoming = pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + incoming.clone(), + &seen + .iter() + .map(|(k, v)| (k.clone(), v.event_id.clone())) + .collect::>(), + Some( + seen.iter() + .map(|(_k, v)| (v.event_id.clone(), v.convert_for_state_res())) + .collect::>(), + ), // TODO: make mut and keep around, this is all the auth events + &db.rooms, + ) { + Ok(true) => { + append_state(&db, &pdu)?; + resolved_map.insert(event_id, Ok::<(), String>(())); + continue 'outer; + } + Ok(false) => { + resolved_map.insert(event_id, Err("Failed event auth".to_string())); + error!("Failed sequential event auth for incoming"); + continue 'outer; + } + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); + continue 'outer; + } + } + } } - // We have a state event so we need info for state-res - let get_state_response = match send_request( - &db.globals, - body.body.origin.clone(), - ruma::api::federation::event::get_room_state::v1::Request { - room_id, - event_id: &event_id, - }, - ) - .await - { - Ok(res) => res, - // We can't hard fail because there are some valid errors, just - // keep checking PDU's - // - // As an example a possible error - // {"errcode":"M_FORBIDDEN","error":"Host not in room."} - Err(err) => { - resolved_map.insert(event_id, Err(err.to_string())); - continue; - } - }; + // Well, now we have to actually do a bunch of work :( + // The steps are as follows + // 1. Rebuild the sending servers forward extremity, ignoring our own fork + // a) iterate "oldest" -> most recent authenticating each event with the state after the previous + // b) build a `snapshot_map` containing the state after the event for each missing event (EventId -> StateMap) + // 2. Build our side of the fork (TODO do we have to re-auth these, is state at an event relative to the server its from) + // 3. resolve the two states (our current with the state after the most recent missing event) - let their_current_state = get_state_response - .pdus + // Now build up state + let mut state_snapshot = seen .iter() - .chain(get_state_response.auth_chain.iter()) // add auth events - .map(|pdu| { - let (event_id, json) = crate::pdu::process_incoming_pdu(pdu); - ( - event_id.clone(), - Arc::new( - // When creating a StateEvent the event_id arg will be used - // over any found in the json and it will not use ruma::reference_hash - // to generate one - state_res::StateEvent::from_id_canon_obj(event_id, json) - .expect("valid pdu json"), - ), - ) - }) + .map(|(k, v)| (k.clone(), v.event_id.clone())) .collect::>(); - let our_current_state = db.rooms.room_state_full(room_id)?; - // State resolution takes care of these checks + // TODO: So this is super memory inefficient we clone the state_snapshot every time + // we need it for state events and non + let mut snapshot_map = BTreeMap::new(); + snapshot_map.insert(seen_id.clone().unwrap(), state_snapshot.clone()); + + let accum_event_map = seen + .iter() + .map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) + .chain( + missing + .iter() + .cloned() + .flatten() + .map(|pdu| (pdu.event_id.clone(), pdu.convert_for_state_res())), + ) + .collect::>>(); + // 4. Passes authorization rules based on the event's auth events, otherwise it is rejected. // 5. Passes authorization rules based on the state at the event, otherwise it is rejected. + for missing_pdu in missing.iter().rev() { + match missing_pdu { + PrevEvents::Sequential(missing_pdu) => { + // For state events + if missing_pdu.state_key.is_some() { + let missing_pdu = missing_pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + missing_pdu.clone(), + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + // TODO: do we need this + assert_eq!(room_id, missing_pdu.room_id()); + + // We can't add to DB yet since we don't know if it passes + // current room state + // append_state(&db, &PduEvent::from(&*missing_pdu))?; + + // Only after the state is recorded in the DB can we update the state_snapshot + // This will update the state snapshot so it is correct next loop through + state_snapshot.insert( + (missing_pdu.kind(), missing_pdu.state_key()), + missing_pdu.event_id(), + ); + // Keep track of the state after for resolution + snapshot_map.insert(missing_pdu.event_id(), state_snapshot.clone()); + } + Ok(false) => { + error!( + "apply missing: {}", + serde_json::to_string_pretty(&*missing_pdu).unwrap() + ); + continue; + } + Err(e) => { + error!("{}", e); + // This is not a fatal error but we do eventually need to handle + // events failing that are not the incoming events + // TODO: what to do when missing events fail (not incoming events) + } + } + } else { + // TODO: Some auth needs to be done for non state events + if !db + .rooms + .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? + { + error!("fork Sender is not joined {}", missing_pdu.kind); + // TODO: we probably should not be getting events for different rooms + continue; + } - // TODO: 6. Passes authorization rules based on the current state of the room, otherwise it is "soft failed". - match state_res::StateResolution::resolve( - room_id, - &ruma::RoomVersionId::Version6, - &[ - our_current_state - .iter() - .map(|((ev, sk), v)| ((ev.clone(), sk.to_owned()), v.event_id.clone())) - .collect::>(), - their_current_state - .iter() - .map(|(_id, v)| ((v.kind(), v.state_key()), v.event_id())) - .collect::>(), - ], - Some( - our_current_state - .iter() - .map(|(_k, v)| (v.event_id.clone(), v.convert_for_state_res())) - .chain( - their_current_state - .iter() - .map(|(id, ev)| (id.clone(), ev.clone())), - ) - .collect::>(), - ), - &db.rooms, - ) { - Ok(resolved) if resolved.values().any(|id| &event_id == id) => { - // If the event is older than the last event in pduid_pdu Tree then find the - // closest ancestor we know of and insert after the known ancestor by - // altering the known events pduid to = same roomID + same count bytes + 0x1 - // pushing a single byte every time a simple append cannot be done. - match db.rooms.get_latest_pduid_before( - room_id, - &pdu.prev_events, - &their_current_state, - )? { - Some(ClosestParent::Append) => { + // TODO: a better way to signal non state events... + snapshot_map.insert(missing_pdu.event_id.clone(), state_snapshot.clone()); + // Continue this inner for loop adding all the missing events + } + } + PrevEvents::Fork(pdus) => { + let mut state_sets = BTreeSet::new(); + for pdu in pdus { + let mut state_at = state_snapshot.clone(); + if pdu.state_key.is_some() { + state_at.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu.event_id.clone(), + ); + } + state_sets.insert(state_at); + } + + if state_sets.len() <= 1 { + for missing_pdu in pdus.iter() { + // For state events + if missing_pdu.state_key.is_some() { + let missing_pdu = missing_pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + missing_pdu.clone(), + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + // TODO: do we need this + assert_eq!(room_id, missing_pdu.room_id()); + + state_snapshot.insert( + (missing_pdu.kind(), missing_pdu.state_key()), + missing_pdu.event_id(), + ); + snapshot_map + .insert(missing_pdu.event_id(), state_snapshot.clone()); + } + Ok(false) => { + error!( + "apply missing fork: {}", + serde_json::to_string_pretty(&*missing_pdu).unwrap() + ); + continue; + } + Err(e) => { + error!("fork state-res: {}", e); + // TODO: what to do when missing events fail (not incoming events) + } + } + } else { + // TODO: Some auth needs to be done for non state events + if !db + .rooms + .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? + { + error!("fork Sender is not joined {}", missing_pdu.kind); + // TODO: we probably should not be getting events for different rooms + continue; + } + snapshot_map + .insert(missing_pdu.event_id.clone(), state_snapshot.clone()); + // Continue this inner for loop adding all the missing events + } + } + } else { + match state_res::StateResolution::resolve( + room_id, + &RoomVersionId::Version6, + &state_sets.into_iter().collect::>(), + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(resolved) => { + for id in resolved + .values() + .filter(|id| pdus.iter().any(|pduid| pduid.event_id == **id)) + { + snapshot_map.insert(id.clone(), resolved.clone()); + } + } + Err(err) => { + error!("{}", err); + } + } + } + } + } + } + + // We know these events are valid for each state at the event + // we have already authed them + let known_fork = if let Some(id) = seen_id { + db.rooms.state_from(room_id, &id)? + } else { + vec![] + }; + for pdu in known_fork.clone().into_iter().rev() { + if pdu.state_key.is_some() { + seen.insert((pdu.kind.clone(), pdu.state_key.clone().unwrap()), pdu); + } + } + + let current_room_state = seen + .into_iter() + .map(|(k, v)| (k, v.event_id)) + .collect::>(); + + let mut state_sets = BTreeSet::new(); + state_sets.insert(current_room_state.clone()); + + // The first item is the most recent + if let Some(fork) = missing.first() { + for pdu in fork.clone().into_iter() { + if let Some(map) = snapshot_map.get(&pdu.event_id) { + state_sets.insert(map.clone()); + } + } + } + + let mut state_sets = state_sets.into_iter().collect::>(); + // If we have actual differences resolve + if state_sets.len() > 1 { + // Set the incoming event to have parents from both forks + // ours/theirs + pdu.prev_events = missing + .first() + .cloned() + .into_iter() + .flatten() + .map(|pdu| pdu.event_id) + .chain(known_fork.first().cloned().map(|pdu| pdu.event_id)) + .collect(); + + // The incoming event is a child of both forks now + for set in &mut state_sets { + set.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu.event_id.clone(), + ); + } + + // If we have holes or a fork I am less sure what can be guaranteed about our state? + // Or what must be done to fix holes and forks? + match state_res::StateResolution::resolve( + room_id, + &RoomVersionId::Version6, + &state_sets, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(resolved) if resolved.values().any(|id| id == &event_id) => { + for resolved_missing in missing + .into_iter() + .rev() // We want the oldest pdu's first + .flatten() + .filter(|pdu| resolved.values().any(|res_id| res_id == &pdu.event_id)) + { let count = db.globals.next_count()?; let mut pdu_id = room_id.as_bytes().to_vec(); pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); - db.rooms.append_pdu( - &pdu, - &value, + &resolved_missing, + &crate::utils::to_canonical_object(&resolved_missing) + .expect("PDU is valid canonical JSON"), count, pdu_id.into(), &db.globals, @@ -595,18 +968,58 @@ pub async fn send_transaction_message_route<'a>( &db.admin, )?; } - Some(ClosestParent::Insert(old_count)) => { - let count = old_count; + + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + // Since we know the state we force_state + // saving the incoming pdu's id with our new state + db.rooms.force_state_with_pdu( + &*pdu_id, + room_id, + resolved + .iter() + .map(|(k, v)| { + ( + k.clone(), + serde_json::to_vec( + &db.rooms + .get_pdu(v) + .expect("db err") + .expect("we know of all the pdus"), + ) + .expect("serde can serialize pdus"), + ) + }) + .collect(), + )?; + db.rooms.append_pdu( + &pdu, + &value, + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + resolved_map.insert(event_id, Ok::<(), String>(())); + } + Ok(resolved) => { + for resolved_missing in missing + .into_iter() + .flatten() + .filter(|pdu| resolved.values().any(|res_id| res_id == &pdu.event_id)) + { + let count = db.globals.next_count()?; let mut pdu_id = room_id.as_bytes().to_vec(); pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); - // Create a new count that is after old_count but before - // the pdu appended after - pdu_id.push(1); - db.rooms.append_pdu( - &pdu, - &value, + &resolved_missing, + &crate::utils::to_canonical_object(&resolved_missing) + .expect("PDU is valid canonical JSON"), count, pdu_id.into(), &db.globals, @@ -614,28 +1027,78 @@ pub async fn send_transaction_message_route<'a>( &db.admin, )?; } - _ => { - error!("Not a sequential event or no parents found"); - continue; - } + resolved_map.insert(event_id, Err("Failed event auth".into())); + error!( + "auth failed: {}", + serde_json::to_string_pretty(&pdu).unwrap() + ); + } + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); + } + } + // If we have iterated through the incoming missing events sequentially we know that + // the original incoming event is the youngest child and so can be simply authed and appended + // to the state + } else { + // If it is a non state event we still must add it and associate a statehash with the pdu_id + if pdu.state_key.is_none() { + // TODO: Some auth needs to be done for non state events + if !db.rooms.is_joined(&pdu.sender, room_id)? { + error!("Sender is not joined {}", pdu.kind); + resolved_map.insert(event_id, Err("Sender not found in room".into())); + continue 'outer; } + append_state(&db, &pdu)?; resolved_map.insert(event_id, Ok::<(), String>(())); + } else { + let incoming = pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + incoming, + ¤t_room_state, + Some(accum_event_map), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + append_state(&db, &pdu)?; + resolved_map.insert(event_id, Ok::<(), String>(())); + } + Ok(false) => { + resolved_map.insert(event_id, Err("Failed event auth".to_string())); + error!("Failed sequential event auth for incoming"); + } + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); + } + } } - // If the eventId is not found in the resolved state auth has failed - Ok(_) => { - resolved_map.insert( - event_id, - Err("This event failed authentication, not found in resolved set".into()), - ); - } - Err(e) => { - resolved_map.insert(event_id, Err(e.to_string())); - } - }; + } } - Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) + Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) +} + +fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> { + let count = db.globals.next_count()?; + let mut pdu_id = pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_to_state(&pdu_id, pdu)?; + db.rooms.append_pdu( + pdu, + &utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"), + count, + pdu_id.clone().into(), + &db.globals, + &db.account_data, + &db.admin, + ) } #[cfg_attr(