From 8093e29b1a1d50cfa0e3618feca88def88ed5b69 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Mon, 7 Dec 2020 17:08:21 -0500 Subject: [PATCH] Basic federation /send functionality --- Cargo.lock | 214 +++----------------- Cargo.toml | 2 +- src/client_server/media.rs | 3 +- src/database/rooms.rs | 4 +- src/server_server.rs | 399 +++++++++++++++++++------------------ 5 files changed, 233 insertions(+), 389 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5062b8c..0520b0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,15 +21,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" -[[package]] -name = "ansi_term" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi 0.3.9", -] - [[package]] name = "arrayref" version = "0.3.6" @@ -182,19 +173,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "chrono" -version = "0.4.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" -dependencies = [ - "libc", - "num-integer", - "num-traits", - "time 0.1.44", - "winapi 0.3.9", -] - [[package]] name = "color_quant" version = "1.1.0" @@ -254,7 +232,7 @@ version = "0.15.0-dev" source = "git+https://github.com/SergioBenitez/cookie-rs.git?rev=1c3ca83#1c3ca838543b60a4448d279dc4b903cc7a2bc22a" dependencies = [ "percent-encoding", - "time 0.2.23", + "time", "version_check", ] @@ -575,19 +553,6 @@ dependencies = [ "byteorder", ] -[[package]] -name = "generator" -version = "0.6.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cdc09201b2e8ca1b19290cf7e65de2246b8e91fb6874279722189c4de7b94dc" -dependencies = [ - "cc", - "libc", - "log", - "rustc_version", - "winapi 0.3.9", -] - [[package]] name = "getrandom" version = "0.1.15" @@ -596,7 +561,7 @@ checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6" dependencies = [ "cfg-if 0.1.10", "libc", - "wasi 0.9.0+wasi-snapshot-preview1", + "wasi", ] [[package]] @@ -886,9 +851,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614" +checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" [[package]] name = "linked-hash-map" @@ -914,19 +879,6 @@ dependencies = [ "cfg-if 0.1.10", ] -[[package]] -name = "loom" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed" -dependencies = [ - "cfg-if 0.1.10", - "generator", - "scoped-tls", - "serde", - "serde_json", -] - [[package]] name = "lru-cache" version = "0.1.2" @@ -948,15 +900,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" -[[package]] -name = "matchers" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" -dependencies = [ - "regex-automata", -] - [[package]] name = "matches" version = "0.1.8" @@ -1468,31 +1411,6 @@ dependencies = [ "syn", ] -[[package]] -name = "regex" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c" -dependencies = [ - "regex-syntax", -] - -[[package]] -name = "regex-automata" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" -dependencies = [ - "byteorder", - "regex-syntax", -] - -[[package]] -name = "regex-syntax" -version = "0.6.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" - [[package]] name = "remove_dir_all" version = "0.5.3" @@ -1585,7 +1503,7 @@ dependencies = [ "rocket_http", "serde", "state", - "time 0.2.23", + "time", "tokio", "ubyte", "version_check", @@ -1622,7 +1540,7 @@ dependencies = [ "ref-cast", "smallvec", "state", - "time 0.2.23", + "time", "tokio", "tokio-rustls", "uncased", @@ -1633,7 +1551,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.0.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "assign", "js_int", @@ -1651,7 +1569,7 @@ dependencies = [ [[package]] name = "ruma-api" version = "0.17.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "http", "percent-encoding", @@ -1666,7 +1584,7 @@ dependencies = [ [[package]] name = "ruma-api-macros" version = "0.17.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1677,7 +1595,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.2.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "ruma-api", "ruma-common", @@ -1691,7 +1609,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.10.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "assign", "http", @@ -1710,7 +1628,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.2.0" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "maplit", @@ -1723,7 +1641,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.22.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "ruma-common", @@ -1737,7 +1655,7 @@ dependencies = [ [[package]] name = "ruma-events-macros" version = "0.22.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1748,7 +1666,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.0.3" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "ruma-api", @@ -1763,7 +1681,7 @@ dependencies = [ [[package]] name = "ruma-identifiers" version = "0.17.4" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "paste", "rand", @@ -1777,7 +1695,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-macros" version = "0.17.4" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro2", "quote", @@ -1788,7 +1706,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.1.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "serde", ] @@ -1796,7 +1714,7 @@ dependencies = [ [[package]] name = "ruma-serde" version = "0.2.3" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "form_urlencoded", "itoa", @@ -1809,7 +1727,7 @@ dependencies = [ [[package]] name = "ruma-serde-macros" version = "0.2.0" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1820,7 +1738,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.6.0-dev.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "base64 0.12.3", "ring", @@ -1948,18 +1866,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.117" +version = "1.0.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a" +checksum = "06c64263859d87aa2eb554587e2d23183398d617427327cf2b3d0ed8c69e4800" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.117" +version = "1.0.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e" +checksum = "c84d3526699cd55261af4b941e4e725444df67aa4f9e6a3564f18030d12672df" dependencies = [ "proc-macro2", "quote", @@ -1995,16 +1913,6 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" -[[package]] -name = "sharded-slab" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4921be914e16899a80adefb821f8ddb7974e3f1250223575a44ed994882127" -dependencies = [ - "lazy_static", - "loom", -] - [[package]] name = "signal-hook-registry" version = "1.2.2" @@ -2078,17 +1986,14 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" [[package]] name = "state-res" version = "0.1.0" -source = "git+https://github.com/ruma/state-res?branch=timo-spec-comp#99214e6fa6b9843b0d9e1f6ef0698d7fdb234fb2" dependencies = [ "itertools", - "js_int", "maplit", "ruma", "serde", "serde_json", "thiserror", "tracing", - "tracing-subscriber", ] [[package]] @@ -2206,26 +2111,6 @@ dependencies = [ "syn", ] -[[package]] -name = "thread_local" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" -dependencies = [ - "lazy_static", -] - -[[package]] -name = "time" -version = "0.1.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi 0.3.9", -] - [[package]] name = "time" version = "0.2.23" @@ -2407,49 +2292,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tracing-log" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-serde" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b" -dependencies = [ - "serde", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1fa8f0c8f4c594e4fc9debc1990deab13238077271ba84dd853d54902ee3401" -dependencies = [ - "ansi_term", - "chrono", - "lazy_static", - "matchers", - "regex", - "serde", - "serde_json", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", - "tracing-core", - "tracing-log", - "tracing-serde", -] - [[package]] name = "trust-dns-proto" version = "0.19.6" @@ -2599,12 +2441,6 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasm-bindgen" version = "0.2.69" diff --git a/Cargo.toml b/Cargo.toml index b1dec17..757a9b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "1f1f44f33 #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } # Used for matrix spec type definitions and helpers -ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "e8882fe8142d7b55ed4c8ccc6150946945f9e237" } +ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "ee814aa84934530d76f5e4b275d739805b49bdef" } # ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "unstable-join" } # ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] } diff --git a/src/client_server/media.rs b/src/client_server/media.rs index e6bd182..f31bff1 100644 --- a/src/client_server/media.rs +++ b/src/client_server/media.rs @@ -45,7 +45,8 @@ pub async fn create_content_route( db.flush().await?; - Ok(create_content::Response { content_uri: mxc }.into()) + // TODO: blurhash is the other field not sure if that is important + Ok(create_content::Response::new(mxc).into()) } #[cfg_attr( diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 5080594..a9005f4 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -269,7 +269,7 @@ impl Rooms { pub fn force_state( &self, room_id: &RoomId, - state: HashMap<(EventType, String), Vec>, + state: &HashMap<(EventType, String), Vec>, ) -> Result<()> { let state_hash = self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::>())?; @@ -281,7 +281,7 @@ impl Rooms { state_id.extend_from_slice(&event_type.as_ref().as_bytes()); state_id.push(0xff); state_id.extend_from_slice(&state_key.as_bytes()); - self.stateid_pduid.insert(state_id, pdu_id)?; + self.stateid_pduid.insert(state_id, pdu_id.to_vec())?; } self.roomid_statehash diff --git a/src/server_server.rs b/src/server_server.rs index 0f0c1a4..047847c 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -21,7 +21,7 @@ use ruma::{ OutgoingRequest, }, directory::{IncomingFilter, IncomingRoomNetwork}, - EventId, RoomId, ServerName, ServerSigningKeyId, UserId, + EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, }; use std::{ collections::BTreeMap, @@ -445,7 +445,14 @@ pub async fn send_transaction_message_route<'a>( // discard the event whereas the Client Server API's /send/{eventType} endpoint // would return a M_BAD_JSON error. let mut resolved_map = BTreeMap::new(); - for pdu in &body.pdus { + let mut pdu_idx = 0; + // This is `for pdu in &body.pdus` but we need to do fancy continue/break so we need loop labels + 'outer: loop { + if pdu_idx == body.pdus.len() { + break 'outer; + } + let pdu = &body.pdus[pdu_idx]; + pdu_idx += 1; // Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped. // state-res checks signatures - 2. Passes signature checks, otherwise event is dropped. @@ -467,12 +474,13 @@ pub async fn send_transaction_message_route<'a>( } // The events that must be resolved to catch up to the incoming event - let mut missing = BTreeMap::new(); + let mut missing = vec![]; let mut seen = BTreeMap::new(); let mut prev_ids = pdu.prev_events.to_vec(); // Don't kill our server with state-res - if prev_ids.len() > 20 { + // TODO: set this at a reasonable level this is for debug/wip purposes + if prev_ids.len() > 5 { resolved_map.insert( event_id, Err("Event has abnormally large prev_events count".into()), @@ -480,39 +488,48 @@ pub async fn send_transaction_message_route<'a>( continue; } - while let Some(id) = prev_ids.pop() { + // This is `while let Some(event_id) = prev_ids.pop()` but with a fancy continue + // in the case of a failed request to the server that sent the event + 'inner: loop { + let id = if let Some(id) = prev_ids.pop() { + id + } else { + break 'inner; + }; + match db .rooms .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? { + // We know the state snapshot for this events parents so we can simply auth the + // incoming event and append to DB and append to state if it passes Some(state_hash) => { seen = db.rooms.state_full(&state_hash)?; - break; + break 'inner; } + // We need to fill in information about this event's `prev_events` (parents) None => { - // TODO: as soon as https://github.com/ruma/ruma/pull/364 is accepted - // use `::get_event::v1::Request...` - let last_event = vec![id.clone()]; - let mut req = - ruma::api::federation::event::get_missing_events::v1::Request::new( - room_id, - &[], - &last_event, - ); - req.limit = ruma::uint!(1); // We have a state event so we need info for state-res - let get_event = match send_request(&db.globals, body.body.origin.clone(), req) - .await + match send_request( + &db.globals, + body.body.origin.clone(), + ruma::api::federation::event::get_event::v1::Request::new(&id), + ) + .await { Ok(res) => { - for ev in &res.events { - let (id, val) = crate::pdu::process_incoming_pdu(ev); - let pdu = state_res::StateEvent::from_id_canon_obj(id.clone(), val) - .expect("Pdu is a valid StateEvent"); - - prev_ids.extend(pdu.prev_event_ids()); - missing.insert(id, pdu); - } + let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu); + let prev_pdu = serde_json::from_value::( + serde_json::to_value(&val) + .expect("CanonicalJsonObj is a valid JsonValue"), + ) + .expect("all ruma pdus are conduit pdus"); + + // TODO: do we need this + assert_eq!(room_id, &prev_pdu.room_id); + + prev_ids.extend(prev_pdu.prev_events.to_vec()); + missing.push(prev_pdu); } // We can't hard fail because there are some valid errors, just // keep checking PDU's @@ -521,18 +538,142 @@ pub async fn send_transaction_message_route<'a>( // {"errcode":"M_FORBIDDEN","error":"Host not in room."} Err(err) => { resolved_map.insert(event_id, Err(err.to_string())); - continue; + // We have to give up on this PDU + continue 'outer; } }; } } } - // If it is not a state event, we can skip state-res... maybe + // Now build up + let mut state_snapshot = seen + .iter() + .map(|(k, v)| (k.clone(), v.event_id.clone())) + .collect(); + let mut accum_event_map = seen + .iter() + .map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) + .collect::>>(); + // TODO: this only accounts for sequentially missing events no holes will be filled + // and I'm still not sure what happens when fork introduces multiple `prev_events` + // + // We need to go from oldest (furthest ancestor of the incoming event) to the + // prev_event of the incoming event so we reverse the order oldest -> most recent + for missing_pdu in missing.into_iter().rev() { + // For state events + if missing_pdu.state_key.is_some() { + let missing_pdu = missing_pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + missing_pdu.clone(), + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + // TODO: do we need this + assert_eq!(room_id, missing_pdu.room_id()); + + let count = db.globals.next_count()?; + let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + // Since we know the state from event to event we can do the + // slightly more efficient force_state + db.rooms.force_state( + room_id, + state_snapshot + .iter() + .map(|(k, v)| { + ( + k.clone(), + serde_json::to_vec( + &db.rooms + .get_pdu(v) + .expect("db err") + .expect("we know of all the pdus"), + ) + .expect("serde can serialize pdus"), + ) + }) + .collect(), + )?; + // Now append the new missing event to DB it will be part of the + // next events state_snapshot + db.rooms.append_pdu( + &PduEvent::from(&*missing_pdu), + &utils::to_canonical_object(&*missing_pdu) + .expect("Pdu is valid canonical object"), + count, + pdu_id.clone().into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + + // Only after the state is recorded in the DB can we update the state_snapshot + // This will update the state snapshot so it is correct next loop through + state_snapshot.insert( + (missing_pdu.kind(), missing_pdu.state_key()), + missing_pdu.event_id(), + ); + accum_event_map.insert(missing_pdu.event_id(), missing_pdu); + } + Ok(false) => { + error!("{:?}", missing_pdu); + continue; + } + Err(e) => { + error!("{}", e); + // I think we need to keep going until we reach + // the incoming pdu so do not error here + continue; // The continue is not needed but to remind us that this is not an error + } + } + // All between the event we know about and the incoming event need to be accounted + // for + } else { + // TODO: Some auth needs to be done for non state events + if !db + .rooms + .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? + { + error!("Sender is not joined {}", missing_pdu.kind); + // I think we need to keep going until we reach + // the incoming pdu so do not error here + continue; + } + + let count = db.globals.next_count()?; + let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_to_state(&pdu_id, &missing_pdu)?; + db.rooms.append_pdu( + &missing_pdu, + &value, + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + // Continue this inner for loop adding all the missing events + } + } + + // Back to the original incoming event + // If it is a non state event we still must add it and create a statehash for it if value.get("state_key").is_none() { + // TODO: Some auth needs to be done for non state events if !db.rooms.is_joined(&pdu.sender, room_id)? { - warn!("Sender is not joined {}", pdu.kind); - resolved_map.insert(event_id, Err("User is not in this room".into())); + error!("Sender is not joined {}", pdu.kind); + // I think we need to keep going until we reach + // the incoming pdu so do not error here continue; } @@ -551,183 +692,49 @@ pub async fn send_transaction_message_route<'a>( &db.account_data, &db.admin, )?; - resolved_map.insert(event_id, Ok::<(), String>(())); continue; } - // We have a state event so we need info for state-res - let get_state_response = match send_request( - &db.globals, - body.body.origin.clone(), - ruma::api::federation::event::get_room_state::v1::Request { - room_id, - event_id: &event_id, - }, - ) - .await - { - Ok(res) => res, - // We can't hard fail because there are some valid errors, just - // keep checking PDU's - // - // As an example a possible error - // {"errcode":"M_FORBIDDEN","error":"Host not in room."} - Err(err) => { - resolved_map.insert(event_id, Err(err.to_string())); - continue; - } - }; - - let their_current_state = get_state_response - .pdus - .iter() - // .chain(get_state_response.auth_chain.iter()) // add auth events - .map(|pdu| { - let (event_id, json) = crate::pdu::process_incoming_pdu(pdu); - ( - event_id.clone(), - Arc::new( - // When creating a StateEvent the event_id arg will be used - // over any found in the json and it will not use ruma::reference_hash - // to generate one - state_res::StateEvent::from_id_canon_obj(event_id, json) - .expect("valid pdu json"), - ), - ) - }) - // Add the incoming event to their state this will ensure it is within the - // resolved state if indeed it passes state-res - .chain(Some(( - event_id.clone(), - Arc::new( - state_res::StateEvent::from_id_canon_obj(event_id.clone(), value.clone()) - .expect("valid pdu json"), - ), - ))) - .collect::>(); - - let our_current_state = db.rooms.room_state_full(room_id)?; - // State resolution takes care of these checks - // 4. Passes authorization rules based on the event's auth events, otherwise it is rejected. - // 5. Passes authorization rules based on the state at the event, otherwise it is rejected. - - // TODO: 6. Passes authorization rules based on the current state of the room, otherwise it is "soft failed". - match state_res::StateResolution::resolve( + // If we have iterated through the incoming missing events sequentially we know that + // the original incoming event is the youngest child and so can be simply authed and append + // to the state + // If we have holes or a fork I am less sure what can be guaranteed about our state? + match state_res::StateResolution::apply_event( room_id, - &ruma::RoomVersionId::Version6, - &[ - our_current_state - .iter() - .map(|((ev, sk), v)| ((ev.clone(), sk.to_owned()), v.event_id.clone())) - .collect::>(), - their_current_state - .iter() - .map(|(_id, v)| ((v.kind(), v.state_key()), v.event_id())) - // We must ensure that our incoming event is part of state-res and not - // accidentally removed from the BTree because of being sorted first by - // event_id - .chain(Some(( - ( - pdu.kind.clone(), - pdu.state_key - .clone() - .expect("Found state event without state_key"), - ), - pdu.event_id.clone(), - ))) - .collect::>(), - ], - Some( - our_current_state - .iter() - .map(|(_k, v)| (v.event_id.clone(), v.convert_for_state_res())) - .chain( - their_current_state - .iter() - .map(|(id, ev)| (id.clone(), ev.clone())), - ) - .collect::>(), - ), + &RoomVersionId::Version6, + pdu.convert_for_state_res(), // We know this a state event + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? &db.rooms, ) { - Ok(resolved) if resolved.values().any(|id| &event_id == id) => { - let res_state = resolved.iter().map(|(k, v)| { - Ok::<_, Error>(( - k.clone(), - db.rooms - .get_pdu(v)? - .ok_or_else(|| Error::bad_database("Pdu with eventId not found"))?, - )) - }); - // If the event is older than the last event in pduid_pdu Tree then find the - // closest ancestor we know of and insert after the known ancestor by - // altering the known events pduid to = same roomID + same count bytes + 0x1 - // pushing a single byte every time a simple append cannot be done. - match db.rooms.get_latest_pduid_before( - room_id, - &pdu.prev_events, - &their_current_state, - )? { - Some(ClosestParent::Append) => { - let count = db.globals.next_count()?; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &pdu)?; - - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - } - Some(ClosestParent::Insert(old_count)) => { - let count = old_count; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - // Create a new count that is after old_count but before - // the pdu appended after - pdu_id.push(1); - - db.rooms.append_to_state(&pdu_id, &pdu)?; - - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - } - _ => { - error!("Not a sequential event or no parents found"); - continue; - } - } - + Ok(true) => { + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_to_state(&pdu_id, &pdu)?; + db.rooms.append_pdu( + &pdu, + &value, + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; resolved_map.insert(event_id, Ok::<(), String>(())); } - // If the eventId is not found in the resolved state auth has failed - Ok(res) => { - dbg!(res); - resolved_map.insert( - event_id, - Err("This event failed authentication, not found in resolved set".into()), - ); + Ok(false) => { + resolved_map.insert(event_id, Err("Failed event auth".into())); + error!("{:?}", pdu); } - Err(e) => { - resolved_map.insert(event_id, Err(e.to_string())); + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); } - }; + } } Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into())