From 2864a7097f571c3dfa62f2e8fbcc47c63d9b2e05 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Mon, 7 Dec 2020 10:32:09 -0500 Subject: [PATCH 1/6] Wip work to correctly resolve incoming PDUs --- src/database/rooms.rs | 2 +- src/server_server.rs | 103 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/src/database/rooms.rs b/src/database/rooms.rs index fb139a6..5080594 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -205,7 +205,7 @@ impl Rooms { }) } - /// Returns the last state hash key added to the db. + /// Returns the state hash key for the given pdu. pub fn pdu_state_hash(&self, pdu_id: &[u8]) -> Result> { Ok(self.pduid_statehash.get(pdu_id)?) } diff --git a/src/server_server.rs b/src/server_server.rs index da046d3..0f0c1a4 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -466,6 +466,68 @@ pub async fn send_transaction_message_route<'a>( continue; } + // The events that must be resolved to catch up to the incoming event + let mut missing = BTreeMap::new(); + let mut seen = BTreeMap::new(); + + let mut prev_ids = pdu.prev_events.to_vec(); + // Don't kill our server with state-res + if prev_ids.len() > 20 { + resolved_map.insert( + event_id, + Err("Event has abnormally large prev_events count".into()), + ); + continue; + } + + while let Some(id) = prev_ids.pop() { + match db + .rooms + .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? + { + Some(state_hash) => { + seen = db.rooms.state_full(&state_hash)?; + break; + } + None => { + // TODO: as soon as https://github.com/ruma/ruma/pull/364 is accepted + // use `::get_event::v1::Request...` + let last_event = vec![id.clone()]; + let mut req = + ruma::api::federation::event::get_missing_events::v1::Request::new( + room_id, + &[], + &last_event, + ); + req.limit = ruma::uint!(1); + // We have a state event so we need info for state-res + let get_event = match send_request(&db.globals, body.body.origin.clone(), req) + .await + { + Ok(res) => { + for ev in &res.events { + let (id, val) = crate::pdu::process_incoming_pdu(ev); + let pdu = state_res::StateEvent::from_id_canon_obj(id.clone(), val) + .expect("Pdu is a valid StateEvent"); + + prev_ids.extend(pdu.prev_event_ids()); + missing.insert(id, pdu); + } + } + // We can't hard fail because there are some valid errors, just + // keep checking PDU's + // + // As an example a possible error + // {"errcode":"M_FORBIDDEN","error":"Host not in room."} + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + continue; + } + }; + } + } + } + // If it is not a state event, we can skip state-res... maybe if value.get("state_key").is_none() { if !db.rooms.is_joined(&pdu.sender, room_id)? { @@ -480,7 +542,6 @@ pub async fn send_transaction_message_route<'a>( pdu_id.extend_from_slice(&count.to_be_bytes()); db.rooms.append_to_state(&pdu_id, &pdu)?; - db.rooms.append_pdu( &pdu, &value, @@ -521,7 +582,7 @@ pub async fn send_transaction_message_route<'a>( let their_current_state = get_state_response .pdus .iter() - .chain(get_state_response.auth_chain.iter()) // add auth events + // .chain(get_state_response.auth_chain.iter()) // add auth events .map(|pdu| { let (event_id, json) = crate::pdu::process_incoming_pdu(pdu); ( @@ -535,6 +596,15 @@ pub async fn send_transaction_message_route<'a>( ), ) }) + // Add the incoming event to their state this will ensure it is within the + // resolved state if indeed it passes state-res + .chain(Some(( + event_id.clone(), + Arc::new( + state_res::StateEvent::from_id_canon_obj(event_id.clone(), value.clone()) + .expect("valid pdu json"), + ), + ))) .collect::>(); let our_current_state = db.rooms.room_state_full(room_id)?; @@ -554,6 +624,18 @@ pub async fn send_transaction_message_route<'a>( their_current_state .iter() .map(|(_id, v)| ((v.kind(), v.state_key()), v.event_id())) + // We must ensure that our incoming event is part of state-res and not + // accidentally removed from the BTree because of being sorted first by + // event_id + .chain(Some(( + ( + pdu.kind.clone(), + pdu.state_key + .clone() + .expect("Found state event without state_key"), + ), + pdu.event_id.clone(), + ))) .collect::>(), ], Some( @@ -570,6 +652,14 @@ pub async fn send_transaction_message_route<'a>( &db.rooms, ) { Ok(resolved) if resolved.values().any(|id| &event_id == id) => { + let res_state = resolved.iter().map(|(k, v)| { + Ok::<_, Error>(( + k.clone(), + db.rooms + .get_pdu(v)? + .ok_or_else(|| Error::bad_database("Pdu with eventId not found"))?, + )) + }); // If the event is older than the last event in pduid_pdu Tree then find the // closest ancestor we know of and insert after the known ancestor by // altering the known events pduid to = same roomID + same count bytes + 0x1 @@ -585,6 +675,8 @@ pub async fn send_transaction_message_route<'a>( pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); + db.rooms.append_to_state(&pdu_id, &pdu)?; + db.rooms.append_pdu( &pdu, &value, @@ -604,6 +696,8 @@ pub async fn send_transaction_message_route<'a>( // the pdu appended after pdu_id.push(1); + db.rooms.append_to_state(&pdu_id, &pdu)?; + db.rooms.append_pdu( &pdu, &value, @@ -623,7 +717,8 @@ pub async fn send_transaction_message_route<'a>( resolved_map.insert(event_id, Ok::<(), String>(())); } // If the eventId is not found in the resolved state auth has failed - Ok(_) => { + Ok(res) => { + dbg!(res); resolved_map.insert( event_id, Err("This event failed authentication, not found in resolved set".into()), @@ -635,7 +730,7 @@ pub async fn send_transaction_message_route<'a>( }; } - Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) + Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) } #[cfg_attr( From 8093e29b1a1d50cfa0e3618feca88def88ed5b69 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Mon, 7 Dec 2020 17:08:21 -0500 Subject: [PATCH 2/6] Basic federation /send functionality --- Cargo.lock | 214 +++----------------- Cargo.toml | 2 +- src/client_server/media.rs | 3 +- src/database/rooms.rs | 4 +- src/server_server.rs | 399 +++++++++++++++++++------------------ 5 files changed, 233 insertions(+), 389 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5062b8c..0520b0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,15 +21,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" -[[package]] -name = "ansi_term" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi 0.3.9", -] - [[package]] name = "arrayref" version = "0.3.6" @@ -182,19 +173,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "chrono" -version = "0.4.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" -dependencies = [ - "libc", - "num-integer", - "num-traits", - "time 0.1.44", - "winapi 0.3.9", -] - [[package]] name = "color_quant" version = "1.1.0" @@ -254,7 +232,7 @@ version = "0.15.0-dev" source = "git+https://github.com/SergioBenitez/cookie-rs.git?rev=1c3ca83#1c3ca838543b60a4448d279dc4b903cc7a2bc22a" dependencies = [ "percent-encoding", - "time 0.2.23", + "time", "version_check", ] @@ -575,19 +553,6 @@ dependencies = [ "byteorder", ] -[[package]] -name = "generator" -version = "0.6.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cdc09201b2e8ca1b19290cf7e65de2246b8e91fb6874279722189c4de7b94dc" -dependencies = [ - "cc", - "libc", - "log", - "rustc_version", - "winapi 0.3.9", -] - [[package]] name = "getrandom" version = "0.1.15" @@ -596,7 +561,7 @@ checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6" dependencies = [ "cfg-if 0.1.10", "libc", - "wasi 0.9.0+wasi-snapshot-preview1", + "wasi", ] [[package]] @@ -886,9 +851,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614" +checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" [[package]] name = "linked-hash-map" @@ -914,19 +879,6 @@ dependencies = [ "cfg-if 0.1.10", ] -[[package]] -name = "loom" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed" -dependencies = [ - "cfg-if 0.1.10", - "generator", - "scoped-tls", - "serde", - "serde_json", -] - [[package]] name = "lru-cache" version = "0.1.2" @@ -948,15 +900,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" -[[package]] -name = "matchers" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" -dependencies = [ - "regex-automata", -] - [[package]] name = "matches" version = "0.1.8" @@ -1468,31 +1411,6 @@ dependencies = [ "syn", ] -[[package]] -name = "regex" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c" -dependencies = [ - "regex-syntax", -] - -[[package]] -name = "regex-automata" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" -dependencies = [ - "byteorder", - "regex-syntax", -] - -[[package]] -name = "regex-syntax" -version = "0.6.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" - [[package]] name = "remove_dir_all" version = "0.5.3" @@ -1585,7 +1503,7 @@ dependencies = [ "rocket_http", "serde", "state", - "time 0.2.23", + "time", "tokio", "ubyte", "version_check", @@ -1622,7 +1540,7 @@ dependencies = [ "ref-cast", "smallvec", "state", - "time 0.2.23", + "time", "tokio", "tokio-rustls", "uncased", @@ -1633,7 +1551,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.0.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "assign", "js_int", @@ -1651,7 +1569,7 @@ dependencies = [ [[package]] name = "ruma-api" version = "0.17.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "http", "percent-encoding", @@ -1666,7 +1584,7 @@ dependencies = [ [[package]] name = "ruma-api-macros" version = "0.17.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1677,7 +1595,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.2.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "ruma-api", "ruma-common", @@ -1691,7 +1609,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.10.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "assign", "http", @@ -1710,7 +1628,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.2.0" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "maplit", @@ -1723,7 +1641,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.22.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "ruma-common", @@ -1737,7 +1655,7 @@ dependencies = [ [[package]] name = "ruma-events-macros" version = "0.22.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1748,7 +1666,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.0.3" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "ruma-api", @@ -1763,7 +1681,7 @@ dependencies = [ [[package]] name = "ruma-identifiers" version = "0.17.4" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "paste", "rand", @@ -1777,7 +1695,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-macros" version = "0.17.4" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro2", "quote", @@ -1788,7 +1706,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.1.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "serde", ] @@ -1796,7 +1714,7 @@ dependencies = [ [[package]] name = "ruma-serde" version = "0.2.3" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "form_urlencoded", "itoa", @@ -1809,7 +1727,7 @@ dependencies = [ [[package]] name = "ruma-serde-macros" version = "0.2.0" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1820,7 +1738,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.6.0-dev.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "base64 0.12.3", "ring", @@ -1948,18 +1866,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.117" +version = "1.0.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a" +checksum = "06c64263859d87aa2eb554587e2d23183398d617427327cf2b3d0ed8c69e4800" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.117" +version = "1.0.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e" +checksum = "c84d3526699cd55261af4b941e4e725444df67aa4f9e6a3564f18030d12672df" dependencies = [ "proc-macro2", "quote", @@ -1995,16 +1913,6 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" -[[package]] -name = "sharded-slab" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4921be914e16899a80adefb821f8ddb7974e3f1250223575a44ed994882127" -dependencies = [ - "lazy_static", - "loom", -] - [[package]] name = "signal-hook-registry" version = "1.2.2" @@ -2078,17 +1986,14 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" [[package]] name = "state-res" version = "0.1.0" -source = "git+https://github.com/ruma/state-res?branch=timo-spec-comp#99214e6fa6b9843b0d9e1f6ef0698d7fdb234fb2" dependencies = [ "itertools", - "js_int", "maplit", "ruma", "serde", "serde_json", "thiserror", "tracing", - "tracing-subscriber", ] [[package]] @@ -2206,26 +2111,6 @@ dependencies = [ "syn", ] -[[package]] -name = "thread_local" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" -dependencies = [ - "lazy_static", -] - -[[package]] -name = "time" -version = "0.1.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi 0.3.9", -] - [[package]] name = "time" version = "0.2.23" @@ -2407,49 +2292,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tracing-log" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-serde" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b" -dependencies = [ - "serde", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1fa8f0c8f4c594e4fc9debc1990deab13238077271ba84dd853d54902ee3401" -dependencies = [ - "ansi_term", - "chrono", - "lazy_static", - "matchers", - "regex", - "serde", - "serde_json", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", - "tracing-core", - "tracing-log", - "tracing-serde", -] - [[package]] name = "trust-dns-proto" version = "0.19.6" @@ -2599,12 +2441,6 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasm-bindgen" version = "0.2.69" diff --git a/Cargo.toml b/Cargo.toml index b1dec17..757a9b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "1f1f44f33 #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } # Used for matrix spec type definitions and helpers -ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "e8882fe8142d7b55ed4c8ccc6150946945f9e237" } +ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "ee814aa84934530d76f5e4b275d739805b49bdef" } # ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "unstable-join" } # ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] } diff --git a/src/client_server/media.rs b/src/client_server/media.rs index e6bd182..f31bff1 100644 --- a/src/client_server/media.rs +++ b/src/client_server/media.rs @@ -45,7 +45,8 @@ pub async fn create_content_route( db.flush().await?; - Ok(create_content::Response { content_uri: mxc }.into()) + // TODO: blurhash is the other field not sure if that is important + Ok(create_content::Response::new(mxc).into()) } #[cfg_attr( diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 5080594..a9005f4 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -269,7 +269,7 @@ impl Rooms { pub fn force_state( &self, room_id: &RoomId, - state: HashMap<(EventType, String), Vec>, + state: &HashMap<(EventType, String), Vec>, ) -> Result<()> { let state_hash = self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::>())?; @@ -281,7 +281,7 @@ impl Rooms { state_id.extend_from_slice(&event_type.as_ref().as_bytes()); state_id.push(0xff); state_id.extend_from_slice(&state_key.as_bytes()); - self.stateid_pduid.insert(state_id, pdu_id)?; + self.stateid_pduid.insert(state_id, pdu_id.to_vec())?; } self.roomid_statehash diff --git a/src/server_server.rs b/src/server_server.rs index 0f0c1a4..047847c 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -21,7 +21,7 @@ use ruma::{ OutgoingRequest, }, directory::{IncomingFilter, IncomingRoomNetwork}, - EventId, RoomId, ServerName, ServerSigningKeyId, UserId, + EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, }; use std::{ collections::BTreeMap, @@ -445,7 +445,14 @@ pub async fn send_transaction_message_route<'a>( // discard the event whereas the Client Server API's /send/{eventType} endpoint // would return a M_BAD_JSON error. let mut resolved_map = BTreeMap::new(); - for pdu in &body.pdus { + let mut pdu_idx = 0; + // This is `for pdu in &body.pdus` but we need to do fancy continue/break so we need loop labels + 'outer: loop { + if pdu_idx == body.pdus.len() { + break 'outer; + } + let pdu = &body.pdus[pdu_idx]; + pdu_idx += 1; // Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped. // state-res checks signatures - 2. Passes signature checks, otherwise event is dropped. @@ -467,12 +474,13 @@ pub async fn send_transaction_message_route<'a>( } // The events that must be resolved to catch up to the incoming event - let mut missing = BTreeMap::new(); + let mut missing = vec![]; let mut seen = BTreeMap::new(); let mut prev_ids = pdu.prev_events.to_vec(); // Don't kill our server with state-res - if prev_ids.len() > 20 { + // TODO: set this at a reasonable level this is for debug/wip purposes + if prev_ids.len() > 5 { resolved_map.insert( event_id, Err("Event has abnormally large prev_events count".into()), @@ -480,39 +488,48 @@ pub async fn send_transaction_message_route<'a>( continue; } - while let Some(id) = prev_ids.pop() { + // This is `while let Some(event_id) = prev_ids.pop()` but with a fancy continue + // in the case of a failed request to the server that sent the event + 'inner: loop { + let id = if let Some(id) = prev_ids.pop() { + id + } else { + break 'inner; + }; + match db .rooms .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? { + // We know the state snapshot for this events parents so we can simply auth the + // incoming event and append to DB and append to state if it passes Some(state_hash) => { seen = db.rooms.state_full(&state_hash)?; - break; + break 'inner; } + // We need to fill in information about this event's `prev_events` (parents) None => { - // TODO: as soon as https://github.com/ruma/ruma/pull/364 is accepted - // use `::get_event::v1::Request...` - let last_event = vec![id.clone()]; - let mut req = - ruma::api::federation::event::get_missing_events::v1::Request::new( - room_id, - &[], - &last_event, - ); - req.limit = ruma::uint!(1); // We have a state event so we need info for state-res - let get_event = match send_request(&db.globals, body.body.origin.clone(), req) - .await + match send_request( + &db.globals, + body.body.origin.clone(), + ruma::api::federation::event::get_event::v1::Request::new(&id), + ) + .await { Ok(res) => { - for ev in &res.events { - let (id, val) = crate::pdu::process_incoming_pdu(ev); - let pdu = state_res::StateEvent::from_id_canon_obj(id.clone(), val) - .expect("Pdu is a valid StateEvent"); - - prev_ids.extend(pdu.prev_event_ids()); - missing.insert(id, pdu); - } + let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu); + let prev_pdu = serde_json::from_value::( + serde_json::to_value(&val) + .expect("CanonicalJsonObj is a valid JsonValue"), + ) + .expect("all ruma pdus are conduit pdus"); + + // TODO: do we need this + assert_eq!(room_id, &prev_pdu.room_id); + + prev_ids.extend(prev_pdu.prev_events.to_vec()); + missing.push(prev_pdu); } // We can't hard fail because there are some valid errors, just // keep checking PDU's @@ -521,18 +538,142 @@ pub async fn send_transaction_message_route<'a>( // {"errcode":"M_FORBIDDEN","error":"Host not in room."} Err(err) => { resolved_map.insert(event_id, Err(err.to_string())); - continue; + // We have to give up on this PDU + continue 'outer; } }; } } } - // If it is not a state event, we can skip state-res... maybe + // Now build up + let mut state_snapshot = seen + .iter() + .map(|(k, v)| (k.clone(), v.event_id.clone())) + .collect(); + let mut accum_event_map = seen + .iter() + .map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) + .collect::>>(); + // TODO: this only accounts for sequentially missing events no holes will be filled + // and I'm still not sure what happens when fork introduces multiple `prev_events` + // + // We need to go from oldest (furthest ancestor of the incoming event) to the + // prev_event of the incoming event so we reverse the order oldest -> most recent + for missing_pdu in missing.into_iter().rev() { + // For state events + if missing_pdu.state_key.is_some() { + let missing_pdu = missing_pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + missing_pdu.clone(), + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + // TODO: do we need this + assert_eq!(room_id, missing_pdu.room_id()); + + let count = db.globals.next_count()?; + let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + // Since we know the state from event to event we can do the + // slightly more efficient force_state + db.rooms.force_state( + room_id, + state_snapshot + .iter() + .map(|(k, v)| { + ( + k.clone(), + serde_json::to_vec( + &db.rooms + .get_pdu(v) + .expect("db err") + .expect("we know of all the pdus"), + ) + .expect("serde can serialize pdus"), + ) + }) + .collect(), + )?; + // Now append the new missing event to DB it will be part of the + // next events state_snapshot + db.rooms.append_pdu( + &PduEvent::from(&*missing_pdu), + &utils::to_canonical_object(&*missing_pdu) + .expect("Pdu is valid canonical object"), + count, + pdu_id.clone().into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + + // Only after the state is recorded in the DB can we update the state_snapshot + // This will update the state snapshot so it is correct next loop through + state_snapshot.insert( + (missing_pdu.kind(), missing_pdu.state_key()), + missing_pdu.event_id(), + ); + accum_event_map.insert(missing_pdu.event_id(), missing_pdu); + } + Ok(false) => { + error!("{:?}", missing_pdu); + continue; + } + Err(e) => { + error!("{}", e); + // I think we need to keep going until we reach + // the incoming pdu so do not error here + continue; // The continue is not needed but to remind us that this is not an error + } + } + // All between the event we know about and the incoming event need to be accounted + // for + } else { + // TODO: Some auth needs to be done for non state events + if !db + .rooms + .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? + { + error!("Sender is not joined {}", missing_pdu.kind); + // I think we need to keep going until we reach + // the incoming pdu so do not error here + continue; + } + + let count = db.globals.next_count()?; + let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_to_state(&pdu_id, &missing_pdu)?; + db.rooms.append_pdu( + &missing_pdu, + &value, + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + // Continue this inner for loop adding all the missing events + } + } + + // Back to the original incoming event + // If it is a non state event we still must add it and create a statehash for it if value.get("state_key").is_none() { + // TODO: Some auth needs to be done for non state events if !db.rooms.is_joined(&pdu.sender, room_id)? { - warn!("Sender is not joined {}", pdu.kind); - resolved_map.insert(event_id, Err("User is not in this room".into())); + error!("Sender is not joined {}", pdu.kind); + // I think we need to keep going until we reach + // the incoming pdu so do not error here continue; } @@ -551,183 +692,49 @@ pub async fn send_transaction_message_route<'a>( &db.account_data, &db.admin, )?; - resolved_map.insert(event_id, Ok::<(), String>(())); continue; } - // We have a state event so we need info for state-res - let get_state_response = match send_request( - &db.globals, - body.body.origin.clone(), - ruma::api::federation::event::get_room_state::v1::Request { - room_id, - event_id: &event_id, - }, - ) - .await - { - Ok(res) => res, - // We can't hard fail because there are some valid errors, just - // keep checking PDU's - // - // As an example a possible error - // {"errcode":"M_FORBIDDEN","error":"Host not in room."} - Err(err) => { - resolved_map.insert(event_id, Err(err.to_string())); - continue; - } - }; - - let their_current_state = get_state_response - .pdus - .iter() - // .chain(get_state_response.auth_chain.iter()) // add auth events - .map(|pdu| { - let (event_id, json) = crate::pdu::process_incoming_pdu(pdu); - ( - event_id.clone(), - Arc::new( - // When creating a StateEvent the event_id arg will be used - // over any found in the json and it will not use ruma::reference_hash - // to generate one - state_res::StateEvent::from_id_canon_obj(event_id, json) - .expect("valid pdu json"), - ), - ) - }) - // Add the incoming event to their state this will ensure it is within the - // resolved state if indeed it passes state-res - .chain(Some(( - event_id.clone(), - Arc::new( - state_res::StateEvent::from_id_canon_obj(event_id.clone(), value.clone()) - .expect("valid pdu json"), - ), - ))) - .collect::>(); - - let our_current_state = db.rooms.room_state_full(room_id)?; - // State resolution takes care of these checks - // 4. Passes authorization rules based on the event's auth events, otherwise it is rejected. - // 5. Passes authorization rules based on the state at the event, otherwise it is rejected. - - // TODO: 6. Passes authorization rules based on the current state of the room, otherwise it is "soft failed". - match state_res::StateResolution::resolve( + // If we have iterated through the incoming missing events sequentially we know that + // the original incoming event is the youngest child and so can be simply authed and append + // to the state + // If we have holes or a fork I am less sure what can be guaranteed about our state? + match state_res::StateResolution::apply_event( room_id, - &ruma::RoomVersionId::Version6, - &[ - our_current_state - .iter() - .map(|((ev, sk), v)| ((ev.clone(), sk.to_owned()), v.event_id.clone())) - .collect::>(), - their_current_state - .iter() - .map(|(_id, v)| ((v.kind(), v.state_key()), v.event_id())) - // We must ensure that our incoming event is part of state-res and not - // accidentally removed from the BTree because of being sorted first by - // event_id - .chain(Some(( - ( - pdu.kind.clone(), - pdu.state_key - .clone() - .expect("Found state event without state_key"), - ), - pdu.event_id.clone(), - ))) - .collect::>(), - ], - Some( - our_current_state - .iter() - .map(|(_k, v)| (v.event_id.clone(), v.convert_for_state_res())) - .chain( - their_current_state - .iter() - .map(|(id, ev)| (id.clone(), ev.clone())), - ) - .collect::>(), - ), + &RoomVersionId::Version6, + pdu.convert_for_state_res(), // We know this a state event + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? &db.rooms, ) { - Ok(resolved) if resolved.values().any(|id| &event_id == id) => { - let res_state = resolved.iter().map(|(k, v)| { - Ok::<_, Error>(( - k.clone(), - db.rooms - .get_pdu(v)? - .ok_or_else(|| Error::bad_database("Pdu with eventId not found"))?, - )) - }); - // If the event is older than the last event in pduid_pdu Tree then find the - // closest ancestor we know of and insert after the known ancestor by - // altering the known events pduid to = same roomID + same count bytes + 0x1 - // pushing a single byte every time a simple append cannot be done. - match db.rooms.get_latest_pduid_before( - room_id, - &pdu.prev_events, - &their_current_state, - )? { - Some(ClosestParent::Append) => { - let count = db.globals.next_count()?; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &pdu)?; - - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - } - Some(ClosestParent::Insert(old_count)) => { - let count = old_count; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - // Create a new count that is after old_count but before - // the pdu appended after - pdu_id.push(1); - - db.rooms.append_to_state(&pdu_id, &pdu)?; - - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - } - _ => { - error!("Not a sequential event or no parents found"); - continue; - } - } - + Ok(true) => { + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_to_state(&pdu_id, &pdu)?; + db.rooms.append_pdu( + &pdu, + &value, + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; resolved_map.insert(event_id, Ok::<(), String>(())); } - // If the eventId is not found in the resolved state auth has failed - Ok(res) => { - dbg!(res); - resolved_map.insert( - event_id, - Err("This event failed authentication, not found in resolved set".into()), - ); + Ok(false) => { + resolved_map.insert(event_id, Err("Failed event auth".into())); + error!("{:?}", pdu); } - Err(e) => { - resolved_map.insert(event_id, Err(e.to_string())); + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); } - }; + } } Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) From c7525fdd5ea29a1075ecc48a79f621eeb8488c16 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Sun, 13 Dec 2020 09:16:15 -0500 Subject: [PATCH 3/6] Happy path of incoming /send pdu resolution --- Cargo.lock | 1 + Cargo.toml | 2 +- src/database/rooms.rs | 2 +- src/pdu.rs | 2 +- src/server_server.rs | 490 +++++++++++++++++++++++++++++------------- 5 files changed, 339 insertions(+), 158 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0520b0d..a8fad68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1986,6 +1986,7 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" [[package]] name = "state-res" version = "0.1.0" +source = "git+https://github.com/ruma/state-res#dca71f76eea9f1378a54e96e5fc98d71dfbf5dde" dependencies = [ "itertools", "maplit", diff --git a/Cargo.toml b/Cargo.toml index 757a9b8..1c97bdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", # Used when doing state resolution # state-res = { git = "https://github.com/timokoesters/state-res", branch = "spec-comp", features = ["unstable-pre-spec"] } -state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec", "gen-eventid"] } +state-res = { git = "https://github.com/ruma/state-res", features = ["unstable-pre-spec", "gen-eventid"] } # state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] } # Used for long polling and federation sender, should be the same as rocket::tokio diff --git a/src/database/rooms.rs b/src/database/rooms.rs index a9005f4..9178eeb 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -269,7 +269,7 @@ impl Rooms { pub fn force_state( &self, room_id: &RoomId, - state: &HashMap<(EventType, String), Vec>, + state: HashMap<(EventType, String), Vec>, ) -> Result<()> { let state_hash = self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::>())?; diff --git a/src/pdu.rs b/src/pdu.rs index 75ef492..f6ec415 100644 --- a/src/pdu.rs +++ b/src/pdu.rs @@ -17,7 +17,7 @@ use std::{ time::UNIX_EPOCH, }; -#[derive(Deserialize, Serialize, Debug)] +#[derive(Clone, Deserialize, Serialize, Debug)] pub struct PduEvent { pub event_id: EventId, pub room_id: RoomId, diff --git a/src/server_server.rs b/src/server_server.rs index 047847c..71f731f 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -24,7 +24,7 @@ use ruma::{ EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, }; use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, convert::TryFrom, fmt::Debug, sync::Arc, @@ -390,6 +390,22 @@ pub async fn get_public_rooms_route( .into()) } +#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum PrevEvents { + Sequential(T), + Fork(Vec), +} + +impl PrevEvents { + pub fn new(id: &[T]) -> Self { + match id { + [] => panic!("All events must have previous event"), + [single_id] => Self::Sequential(single_id.clone()), + rest => Self::Fork(rest.to_vec()), + } + } +} + #[cfg_attr( feature = "conduit_bin", put("/_matrix/federation/v1/send/<_>", data = "") @@ -475,78 +491,140 @@ pub async fn send_transaction_message_route<'a>( // The events that must be resolved to catch up to the incoming event let mut missing = vec![]; - let mut seen = BTreeMap::new(); - - let mut prev_ids = pdu.prev_events.to_vec(); - // Don't kill our server with state-res - // TODO: set this at a reasonable level this is for debug/wip purposes - if prev_ids.len() > 5 { - resolved_map.insert( - event_id, - Err("Event has abnormally large prev_events count".into()), - ); - continue; - } + let mut seen = state_res::StateMap::new(); - // This is `while let Some(event_id) = prev_ids.pop()` but with a fancy continue + let mut prev_ids = vec![PrevEvents::new(&pdu.prev_events)]; + + // This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue // in the case of a failed request to the server that sent the event 'inner: loop { - let id = if let Some(id) = prev_ids.pop() { - id - } else { - break 'inner; - }; + // TODO: if this is ever more that 1 at a time we must do actual + // full state resolution not just auth + match prev_ids.pop() { + Some(PrevEvents::Sequential(id)) => match db + .rooms + .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? + { + // We know the state snapshot for this events parents so we can simply auth the + // incoming event and append to DB and append to state if it passes + Some(state_hash) => { + seen = db.rooms.state_full(&state_hash)?; + break 'inner; + } + // We need to fill in information about this event's `prev_events` (parents) + None => { + match send_request( + &db.globals, + body.body.origin.clone(), + ruma::api::federation::event::get_event::v1::Request::new(&id), + ) + .await + { + Ok(res) => { + let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu); + let prev_pdu = serde_json::from_value::( + serde_json::to_value(&val) + .expect("CanonicalJsonObj is a valid JsonValue"), + ) + .expect("all ruma pdus are conduit pdus"); + + // TODO: do we need this + assert_eq!(room_id, &prev_pdu.room_id); + + prev_ids.push(PrevEvents::new(&prev_pdu.prev_events)); + missing.push(PrevEvents::Sequential(prev_pdu)); + } + // We can't hard fail because there are some valid errors, just + // keep checking PDU's + // + // As an example a possible error + // {"errcode":"M_FORBIDDEN","error":"Host not in room."} + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + // We have to give up on this PDU + continue 'outer; + } + }; + } + }, + Some(PrevEvents::Fork(ids)) => { + error!( + "prev_events > 1: {}", + serde_json::to_string_pretty(&pdu).unwrap() + ); + // Don't kill our server with state-res + // TODO: set this at a reasonable level this is for debug/wip purposes + if ids.len() > 5 { + error!( + "prev_events > 1: {}", + serde_json::to_string_pretty(&pdu).unwrap() + ); + resolved_map.insert( + event_id, + Err("Previous events are too large for state-res".into()), + ); + continue 'outer; + } - match db - .rooms - .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? - { - // We know the state snapshot for this events parents so we can simply auth the - // incoming event and append to DB and append to state if it passes - Some(state_hash) => { - seen = db.rooms.state_full(&state_hash)?; - break 'inner; - } - // We need to fill in information about this event's `prev_events` (parents) - None => { - // We have a state event so we need info for state-res - match send_request( - &db.globals, - body.body.origin.clone(), - ruma::api::federation::event::get_event::v1::Request::new(&id), - ) - .await - { - Ok(res) => { - let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu); - let prev_pdu = serde_json::from_value::( - serde_json::to_value(&val) - .expect("CanonicalJsonObj is a valid JsonValue"), + // We want this to stay unique incase the fork comes together? + let mut prev_fork_ids = BTreeSet::new(); + let mut missing_fork = vec![]; + for id in &ids { + match db + .rooms + .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? + { + // We know the state snapshot for this events parents so we can simply auth the + // incoming event and append to DB and append to state if it passes + Some(state_hash) => { + seen = db.rooms.state_full(&state_hash)?; + break 'inner; + } + None => match send_request( + &db.globals, + body.body.origin.clone(), + ruma::api::federation::event::get_event::v1::Request::new(&id), ) - .expect("all ruma pdus are conduit pdus"); - - // TODO: do we need this - assert_eq!(room_id, &prev_pdu.room_id); - - prev_ids.extend(prev_pdu.prev_events.to_vec()); - missing.push(prev_pdu); - } - // We can't hard fail because there are some valid errors, just - // keep checking PDU's - // - // As an example a possible error - // {"errcode":"M_FORBIDDEN","error":"Host not in room."} - Err(err) => { - resolved_map.insert(event_id, Err(err.to_string())); - // We have to give up on this PDU - continue 'outer; + .await + { + Ok(res) => { + let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu); + let prev_pdu = serde_json::from_value::( + serde_json::to_value(&val) + .expect("CanonicalJsonObj is a valid JsonValue"), + ) + .expect("all ruma pdus are conduit pdus"); + + // TODO: do we need this + assert_eq!(room_id, &prev_pdu.room_id); + + for id in &prev_pdu.prev_events { + prev_fork_ids.insert(id.clone()); + } + missing_fork.push(prev_pdu); + } + // We can't hard fail because there are some valid errors, just + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + // We have to give up on this PDU + continue 'outer; + } + }, } - }; + } + prev_ids.push(PrevEvents::new( + &prev_fork_ids.into_iter().collect::>(), + )); + missing.push(PrevEvents::new(&missing_fork)); + } + // All done finding missing events + None => { + break 'inner; } } } - // Now build up + // Now build up state let mut state_snapshot = seen .iter() .map(|(k, v)| (k.clone(), v.event_id.clone())) @@ -556,124 +634,222 @@ pub async fn send_transaction_message_route<'a>( .map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) .collect::>>(); // TODO: this only accounts for sequentially missing events no holes will be filled - // and I'm still not sure what happens when fork introduces multiple `prev_events` + // and I'm still not sure what happens when a fork introduces multiple `prev_events` // // We need to go from oldest (furthest ancestor of the incoming event) to the // prev_event of the incoming event so we reverse the order oldest -> most recent for missing_pdu in missing.into_iter().rev() { - // For state events - if missing_pdu.state_key.is_some() { - let missing_pdu = missing_pdu.convert_for_state_res(); - match state_res::StateResolution::apply_event( - room_id, - &RoomVersionId::Version6, - missing_pdu.clone(), - &state_snapshot, - Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? - &db.rooms, - ) { - Ok(true) => { - // TODO: do we need this - assert_eq!(room_id, missing_pdu.room_id()); + match missing_pdu { + PrevEvents::Sequential(missing_pdu) => { + // For state events + if missing_pdu.state_key.is_some() { + let missing_pdu = missing_pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + missing_pdu.clone(), + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + // TODO: do we need this + assert_eq!(room_id, missing_pdu.room_id()); + + let count = db.globals.next_count()?; + let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + // // Since we know the state from event to event we can do the + // // slightly more efficient force_state + // db.rooms.force_state( + // room_id, + // state_snapshot + // .iter() + // .map(|(k, v)| { + // ( + // k.clone(), + // serde_json::to_vec( + // &db.rooms + // .get_pdu(v) + // .expect("db err") + // .expect("we know of all the pdus"), + // ) + // .expect("serde can serialize pdus"), + // ) + // }) + // .collect(), + // )?; + + db.rooms + .append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?; + // Now append the new missing event to DB it will be part of the + // next events state_snapshot + db.rooms.append_pdu( + &PduEvent::from(&*missing_pdu), + &utils::to_canonical_object(&*missing_pdu) + .expect("Pdu is valid canonical object"), + count, + pdu_id.clone().into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + + // Only after the state is recorded in the DB can we update the state_snapshot + // This will update the state snapshot so it is correct next loop through + state_snapshot.insert( + (missing_pdu.kind(), missing_pdu.state_key()), + missing_pdu.event_id(), + ); + accum_event_map.insert(missing_pdu.event_id(), missing_pdu); + } + Ok(false) => { + error!( + "apply missing: {}", + serde_json::to_string_pretty(&*missing_pdu).unwrap() + ); + continue; + } + Err(e) => { + error!("{}", e); + // This is not a fatal error but we do eventually need to handle + // events failing that are not the incoming events + // TODO: what to do when missing events fail (not incoming events) + } + } + // All events between the event we know about and the incoming event need to be accounted + // for + } else { + // TODO: Some auth needs to be done for non state events + if !db + .rooms + .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? + { + error!("Sender is not joined {}", missing_pdu.kind); + // TODO: we probably should not be getting events for different rooms + // + // I think we need to keep going until we reach + // the incoming pdu so do not error here + continue; + } let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); + let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); - // Since we know the state from event to event we can do the - // slightly more efficient force_state - db.rooms.force_state( - room_id, - state_snapshot - .iter() - .map(|(k, v)| { - ( - k.clone(), - serde_json::to_vec( - &db.rooms - .get_pdu(v) - .expect("db err") - .expect("we know of all the pdus"), - ) - .expect("serde can serialize pdus"), - ) - }) - .collect(), - )?; - // Now append the new missing event to DB it will be part of the - // next events state_snapshot + db.rooms.append_to_state(&pdu_id, &missing_pdu)?; db.rooms.append_pdu( - &PduEvent::from(&*missing_pdu), - &utils::to_canonical_object(&*missing_pdu) + &missing_pdu, + &utils::to_canonical_object(&missing_pdu) .expect("Pdu is valid canonical object"), count, - pdu_id.clone().into(), + pdu_id.into(), &db.globals, &db.account_data, &db.admin, )?; - - // Only after the state is recorded in the DB can we update the state_snapshot - // This will update the state snapshot so it is correct next loop through - state_snapshot.insert( - (missing_pdu.kind(), missing_pdu.state_key()), - missing_pdu.event_id(), - ); - accum_event_map.insert(missing_pdu.event_id(), missing_pdu); - } - Ok(false) => { - error!("{:?}", missing_pdu); - continue; - } - Err(e) => { - error!("{}", e); - // I think we need to keep going until we reach - // the incoming pdu so do not error here - continue; // The continue is not needed but to remind us that this is not an error + // Continue this inner for loop adding all the missing events } } - // All between the event we know about and the incoming event need to be accounted - // for - } else { - // TODO: Some auth needs to be done for non state events - if !db - .rooms - .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? - { - error!("Sender is not joined {}", missing_pdu.kind); - // I think we need to keep going until we reach - // the incoming pdu so do not error here - continue; + PrevEvents::Fork(pdus) => { + for missing_pdu in pdus.into_iter().rev() { + // For state events + if missing_pdu.state_key.is_some() { + let missing_pdu = missing_pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + missing_pdu.clone(), + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + // TODO: do we need this + assert_eq!(room_id, missing_pdu.room_id()); + + let count = db.globals.next_count()?; + let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms + .append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?; + db.rooms.append_pdu( + &PduEvent::from(&*missing_pdu), + &utils::to_canonical_object(&*missing_pdu) + .expect("Pdu is valid canonical object"), + count, + pdu_id.clone().into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + + // Only after the state is recorded in the DB can we update the state_snapshot + // This will update the state snapshot so it is correct next loop through + state_snapshot.insert( + (missing_pdu.kind(), missing_pdu.state_key()), + missing_pdu.event_id(), + ); + accum_event_map.insert(missing_pdu.event_id(), missing_pdu); + } + Ok(false) => { + error!( + "apply missing fork: {}", + serde_json::to_string_pretty(&*missing_pdu).unwrap() + ); + continue; + } + Err(e) => { + error!("fork state-res: {}", e); + // TODO: what to do when missing events fail (not incoming events) + } + } + } else { + // TODO: Some auth needs to be done for non state events + if !db + .rooms + .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? + { + error!("fork Sender is not joined {}", missing_pdu.kind); + // TODO: we probably should not be getting events for different rooms + continue; + } + + let count = db.globals.next_count()?; + let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_to_state(&pdu_id, &missing_pdu)?; + db.rooms.append_pdu( + &missing_pdu, + &utils::to_canonical_object(&missing_pdu) + .expect("Pdu is valid canonical object"), + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + // Continue this inner for loop adding all the missing events + } + } } - - let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &missing_pdu)?; - db.rooms.append_pdu( - &missing_pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - // Continue this inner for loop adding all the missing events } } // Back to the original incoming event - // If it is a non state event we still must add it and create a statehash for it + // If it is a non state event we still must add it and associate a statehash with the pdu_id if value.get("state_key").is_none() { // TODO: Some auth needs to be done for non state events if !db.rooms.is_joined(&pdu.sender, room_id)? { error!("Sender is not joined {}", pdu.kind); - // I think we need to keep going until we reach - // the incoming pdu so do not error here + resolved_map.insert(event_id, Err("Sender not found in room".into())); continue; } @@ -700,6 +876,7 @@ pub async fn send_transaction_message_route<'a>( // the original incoming event is the youngest child and so can be simply authed and append // to the state // If we have holes or a fork I am less sure what can be guaranteed about our state? + // Or what must be done to fix holes and forks? match state_res::StateResolution::apply_event( room_id, &RoomVersionId::Version6, @@ -728,7 +905,10 @@ pub async fn send_transaction_message_route<'a>( } Ok(false) => { resolved_map.insert(event_id, Err("Failed event auth".into())); - error!("{:?}", pdu); + error!( + "auth failed: {}", + serde_json::to_string_pretty(&pdu).unwrap() + ); } Err(err) => { resolved_map.insert(event_id, Err(err.to_string())); From 8596dd7cd4bd91e3e0ab4b7d0a8cab60970a409d Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Mon, 14 Dec 2020 16:14:16 -0500 Subject: [PATCH 4/6] Remove find closest parent method, update state-res crate --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/database/rooms.rs | 58 ------------------------------------------- src/server_server.rs | 5 +--- 4 files changed, 3 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8fad68..a5cdedb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1986,7 +1986,7 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" [[package]] name = "state-res" version = "0.1.0" -source = "git+https://github.com/ruma/state-res#dca71f76eea9f1378a54e96e5fc98d71dfbf5dde" +source = "git+https://github.com/ruma/state-res?branch=conflict#e4ba824806e7b780c23ca8120b57c5a7e4ab787d" dependencies = [ "itertools", "maplit", diff --git a/Cargo.toml b/Cargo.toml index 1c97bdd..8d94770 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", # Used when doing state resolution # state-res = { git = "https://github.com/timokoesters/state-res", branch = "spec-comp", features = ["unstable-pre-spec"] } -state-res = { git = "https://github.com/ruma/state-res", features = ["unstable-pre-spec", "gen-eventid"] } +state-res = { git = "https://github.com/ruma/state-res", branch = "conflict", features = ["unstable-pre-spec", "gen-eventid"] } # state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] } # Used for long polling and federation sender, should be the same as rocket::tokio diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 9178eeb..394c7ea 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -36,16 +36,6 @@ use super::admin::AdminCommand; /// hashing the entire state. pub type StateHashId = IVec; -/// An enum that represents the two valid states when searching -/// for an events "parent". -/// -/// An events parent is any event we are aware of that is part of -/// the events `prev_events` array. -pub(crate) enum ClosestParent { - Append, - Insert(u64), -} - #[derive(Clone)] pub struct Rooms { pub edus: edus::RoomEdus, @@ -411,54 +401,6 @@ impl Rooms { } } - /// Recursively search for a PDU from our DB that is also in the - /// `prev_events` field of the incoming PDU. - /// - /// First we check if the last PDU inserted to the given room is a parent - /// if not we recursively check older `prev_events` to insert the incoming - /// event after. - pub(crate) fn get_latest_pduid_before( - &self, - room: &RoomId, - incoming_prev_ids: &[EventId], - their_state: &BTreeMap>, - ) -> Result> { - match self.pduid_pdu.scan_prefix(room.as_bytes()).last() { - Some(Ok(val)) - if incoming_prev_ids.contains( - &serde_json::from_slice::(&val.1) - .map_err(|_| { - Error::bad_database("last DB entry contains invalid PDU bytes") - })? - .event_id, - ) => - { - Ok(Some(ClosestParent::Append)) - } - _ => { - let mut prev_ids = incoming_prev_ids.to_vec(); - while let Some(id) = prev_ids.pop() { - match self.get_pdu_id(&id)? { - Some(pdu_id) => { - return Ok(Some(ClosestParent::Insert(self.pdu_count(&pdu_id)?))); - } - None => { - prev_ids.extend(their_state.get(&id).map_or( - Err(Error::BadServerResponse( - "Failed to find previous event for PDU in state", - )), - // `prev_event_ids` will return an empty Vec instead of failing - // so it works perfect for our use here - |pdu| Ok(pdu.prev_event_ids()), - )?); - } - } - } - Ok(None) - } - } - } - /// Returns the leaf pdus of a room. pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result> { let mut prefix = room_id.as_bytes().to_vec(); diff --git a/src/server_server.rs b/src/server_server.rs index 71f731f..c4478d3 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1,7 +1,4 @@ -use crate::{ - client_server, database::rooms::ClosestParent, utils, ConduitResult, Database, Error, PduEvent, - Result, Ruma, -}; +use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma}; use get_profile_information::v1::ProfileField; use http::header::{HeaderValue, AUTHORIZATION, HOST}; use log::{error, warn}; From 558905bba487361ddd904788e7383db4ca9ebdda Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Mon, 14 Dec 2020 16:15:22 -0500 Subject: [PATCH 5/6] WIP: use resolve_incoming from state-res crate --- src/server_server.rs | 64 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index c4478d3..144ea0a 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -387,13 +387,25 @@ pub async fn get_public_rooms_route( .into()) } -#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)] -pub enum PrevEvents { +#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum AuthEvents { Sequential(T), Fork(Vec), } -impl PrevEvents { +impl IntoIterator for AuthEvents { + type Item = T; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + match self { + Self::Sequential(item) => vec![item].into_iter(), + Self::Fork(list) => list.into_iter(), + } + } +} + +impl AuthEvents { pub fn new(id: &[T]) -> Self { match id { [] => panic!("All events must have previous event"), @@ -490,15 +502,15 @@ pub async fn send_transaction_message_route<'a>( let mut missing = vec![]; let mut seen = state_res::StateMap::new(); - let mut prev_ids = vec![PrevEvents::new(&pdu.prev_events)]; + let mut prev_ids = vec![AuthEvents::new(&pdu.prev_events)]; // This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue // in the case of a failed request to the server that sent the event 'inner: loop { - // TODO: if this is ever more that 1 at a time we must do actual + // TODO: if this is ever more than 1 at a time we must do actual // full state resolution not just auth match prev_ids.pop() { - Some(PrevEvents::Sequential(id)) => match db + Some(AuthEvents::Sequential(id)) => match db .rooms .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? { @@ -528,8 +540,8 @@ pub async fn send_transaction_message_route<'a>( // TODO: do we need this assert_eq!(room_id, &prev_pdu.room_id); - prev_ids.push(PrevEvents::new(&prev_pdu.prev_events)); - missing.push(PrevEvents::Sequential(prev_pdu)); + prev_ids.push(AuthEvents::new(&prev_pdu.prev_events)); + missing.push(AuthEvents::Sequential(prev_pdu)); } // We can't hard fail because there are some valid errors, just // keep checking PDU's @@ -544,7 +556,7 @@ pub async fn send_transaction_message_route<'a>( }; } }, - Some(PrevEvents::Fork(ids)) => { + Some(AuthEvents::Fork(ids)) => { error!( "prev_events > 1: {}", serde_json::to_string_pretty(&pdu).unwrap() @@ -609,10 +621,10 @@ pub async fn send_transaction_message_route<'a>( }, } } - prev_ids.push(PrevEvents::new( + prev_ids.push(AuthEvents::new( &prev_fork_ids.into_iter().collect::>(), )); - missing.push(PrevEvents::new(&missing_fork)); + missing.push(AuthEvents::new(&missing_fork)); } // All done finding missing events None => { @@ -630,6 +642,32 @@ pub async fn send_transaction_message_route<'a>( .iter() .map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) .collect::>>(); + + if !missing.is_empty() { + // This is the state at incoming pdu + state_snapshot = match state_res::StateResolution::resolve_incoming( + room_id, + &RoomVersionId::Version6, + &state_snapshot, + missing + .iter() + .cloned() + .flatten() + .filter(|pdu| pdu.state_key.is_some()) // remove non state events + .map(|pdu| ((pdu.kind, pdu.state_key.unwrap()), pdu.event_id)) + .collect(), + Some(accum_event_map), + &db.rooms, + ) { + Ok(res) => res, + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); + continue; + } + } + } + // TODO: this only accounts for sequentially missing events no holes will be filled // and I'm still not sure what happens when a fork introduces multiple `prev_events` // @@ -637,7 +675,7 @@ pub async fn send_transaction_message_route<'a>( // prev_event of the incoming event so we reverse the order oldest -> most recent for missing_pdu in missing.into_iter().rev() { match missing_pdu { - PrevEvents::Sequential(missing_pdu) => { + AuthEvents::Sequential(missing_pdu) => { // For state events if missing_pdu.state_key.is_some() { let missing_pdu = missing_pdu.convert_for_state_res(); @@ -751,7 +789,7 @@ pub async fn send_transaction_message_route<'a>( // Continue this inner for loop adding all the missing events } } - PrevEvents::Fork(pdus) => { + AuthEvents::Fork(pdus) => { for missing_pdu in pdus.into_iter().rev() { // For state events if missing_pdu.state_key.is_some() { From 26afeb0c6dcfae9221c4fbecf8cc6810d30ad22d Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Thu, 17 Dec 2020 12:08:28 -0500 Subject: [PATCH 6/6] Resolve state when forked and keep track of state snapshots --- src/client_server/membership.rs | 2 +- src/database/rooms.rs | 53 +++ src/server_server.rs | 652 +++++++++++++++++++------------- 3 files changed, 453 insertions(+), 254 deletions(-) diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 6d3a690..c713a9b 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -667,7 +667,7 @@ async fn join_room_by_id_helper( // this is a `state_res::StateEvent` that holds a `ruma::Pdu` let pdu = event_map .get(ev_id) - .expect("Found event_id in sorted events that is not in resolved state"); + .expect("found event_id in sorted events that is not in resolved state"); // We do not rebuild the PDU in this case only insert to DB let count = db.globals.next_count()?; diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 394c7ea..4542c65 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -135,6 +135,29 @@ impl Rooms { .collect::>>() } + /// Returns all state entries we know of from a specific event_id onward. + /// The PDU's are ordered most recent to least recent. + /// + /// This is used to resolve a forward extremity. We have the other servers + /// arm now we need ours. + /// TODO: Probably return Vec> + pub fn state_from(&self, room_id: &RoomId, event_id: &EventId) -> Result> { + let mut last_ids = self.get_pdu_leaves(room_id)?; + let mut collected = vec![]; + while let Some(find_id) = last_ids.pop() { + if event_id == &find_id { + break; + } + + if let Some(pdu) = self.get_pdu(&find_id)? { + last_ids.extend(pdu.prev_events.to_vec()); + collected.push(pdu); + } + } + + Ok(collected) + } + /// Returns all state entries for this type. pub fn state_type( &self, @@ -253,6 +276,36 @@ impl Rooms { .is_some()) } + /// Force the creation of a new StateHash and insert it into the db. This also associates + /// the given `pdu` with the new StateHash. + /// + /// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot. + pub fn force_state_with_pdu( + &self, + new_pdu_id: &[u8], + room_id: &RoomId, + state: HashMap<(EventType, String), Vec>, + ) -> Result<()> { + let state_hash = + self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::>())?; + let mut prefix = state_hash.to_vec(); + prefix.push(0xff); + + for ((event_type, state_key), pdu_id) in state { + let mut state_id = prefix.clone(); + state_id.extend_from_slice(&event_type.as_ref().as_bytes()); + state_id.push(0xff); + state_id.extend_from_slice(&state_key.as_bytes()); + self.stateid_pduid.insert(state_id, pdu_id.to_vec())?; + } + + self.pduid_statehash.insert(new_pdu_id, &*state_hash)?; + self.roomid_statehash + .insert(room_id.as_bytes(), &*state_hash)?; + + Ok(()) + } + /// Force the creation of a new StateHash and insert it into the db. /// /// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot. diff --git a/src/server_server.rs b/src/server_server.rs index 144ea0a..e271891 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -388,12 +388,12 @@ pub async fn get_public_rooms_route( } #[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] -pub enum AuthEvents { +pub enum PrevEvents { Sequential(T), Fork(Vec), } -impl IntoIterator for AuthEvents { +impl IntoIterator for PrevEvents { type Item = T; type IntoIter = std::vec::IntoIter; @@ -405,7 +405,7 @@ impl IntoIterator for AuthEvents { } } -impl AuthEvents { +impl PrevEvents { pub fn new(id: &[T]) -> Self { match id { [] => panic!("All events must have previous event"), @@ -478,15 +478,15 @@ pub async fn send_transaction_message_route<'a>( } let pdu = &body.pdus[pdu_idx]; pdu_idx += 1; - // Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped. + // Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped. // state-res checks signatures - 2. Passes signature checks, otherwise event is dropped. - // 3. Passes hash checks, otherwise it is redacted before being processed further. + // TODO: redact event if hashing fails let (event_id, value) = crate::pdu::process_incoming_pdu(pdu); - let pdu = serde_json::from_value::( + let mut pdu = serde_json::from_value::( serde_json::to_value(&value).expect("CanonicalJsonObj is a valid JsonValue"), ) .expect("all ruma pdus are conduit pdus"); @@ -501,23 +501,31 @@ pub async fn send_transaction_message_route<'a>( // The events that must be resolved to catch up to the incoming event let mut missing = vec![]; let mut seen = state_res::StateMap::new(); + let mut seen_id = None; - let mut prev_ids = vec![AuthEvents::new(&pdu.prev_events)]; + let mut prev_ids = vec![PrevEvents::new(&pdu.prev_events)]; // This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue // in the case of a failed request to the server that sent the event 'inner: loop { - // TODO: if this is ever more than 1 at a time we must do actual - // full state resolution not just auth match prev_ids.pop() { - Some(AuthEvents::Sequential(id)) => match db + Some(PrevEvents::Sequential(id)) => match db .rooms .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? { - // We know the state snapshot for this events parents so we can simply auth the - // incoming event and append to DB and append to state if it passes + // We found a common ancestor Some(state_hash) => { + seen_id = Some(id.clone()); seen = db.rooms.state_full(&state_hash)?; + if let Some(pdu) = db.rooms.get_pdu(&id)? { + if pdu.state_key.is_some() { + // This becomes the state after the common event + seen.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu, + ); + } + } break 'inner; } // We need to fill in information about this event's `prev_events` (parents) @@ -540,8 +548,8 @@ pub async fn send_transaction_message_route<'a>( // TODO: do we need this assert_eq!(room_id, &prev_pdu.room_id); - prev_ids.push(AuthEvents::new(&prev_pdu.prev_events)); - missing.push(AuthEvents::Sequential(prev_pdu)); + prev_ids.push(PrevEvents::new(&prev_pdu.prev_events)); + missing.push(PrevEvents::Sequential(prev_pdu)); } // We can't hard fail because there are some valid errors, just // keep checking PDU's @@ -556,7 +564,7 @@ pub async fn send_transaction_message_route<'a>( }; } }, - Some(AuthEvents::Fork(ids)) => { + Some(PrevEvents::Fork(ids)) => { error!( "prev_events > 1: {}", serde_json::to_string_pretty(&pdu).unwrap() @@ -583,10 +591,19 @@ pub async fn send_transaction_message_route<'a>( .rooms .pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())? { - // We know the state snapshot for this events parents so we can simply auth the - // incoming event and append to DB and append to state if it passes + // We found a common ancestor Some(state_hash) => { + seen_id = Some(id.clone()); seen = db.rooms.state_full(&state_hash)?; + if let Some(pdu) = db.rooms.get_pdu(&id)? { + if pdu.state_key.is_some() { + // This becomes the state after the common event + seen.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu, + ); + } + } break 'inner; } None => match send_request( @@ -621,10 +638,10 @@ pub async fn send_transaction_message_route<'a>( }, } } - prev_ids.push(AuthEvents::new( + prev_ids.push(PrevEvents::new( &prev_fork_ids.into_iter().collect::>(), )); - missing.push(AuthEvents::new(&missing_fork)); + missing.push(PrevEvents::new(&missing_fork)); } // All done finding missing events None => { @@ -633,49 +650,94 @@ pub async fn send_transaction_message_route<'a>( } } + // We can treat this event as sequential and simply apply it against the current state of the room + // because we know that state + if missing.is_empty() { + // Back to the original incoming event + // If it is a non state event we still must add it and associate a statehash with the pdu_id + if value.get("state_key").is_none() { + // TODO: Some auth needs to be done for non state events + if !db.rooms.is_joined(&pdu.sender, room_id)? { + error!("Sender is not joined {}", pdu.kind); + resolved_map.insert(event_id, Err("Sender not found in room".into())); + continue 'outer; + } + + append_state(&db, &pdu)?; + resolved_map.insert(event_id, Ok::<(), String>(())); + continue 'outer; + } else { + let incoming = pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + incoming.clone(), + &seen + .iter() + .map(|(k, v)| (k.clone(), v.event_id.clone())) + .collect::>(), + Some( + seen.iter() + .map(|(_k, v)| (v.event_id.clone(), v.convert_for_state_res())) + .collect::>(), + ), // TODO: make mut and keep around, this is all the auth events + &db.rooms, + ) { + Ok(true) => { + append_state(&db, &pdu)?; + resolved_map.insert(event_id, Ok::<(), String>(())); + continue 'outer; + } + Ok(false) => { + resolved_map.insert(event_id, Err("Failed event auth".to_string())); + error!("Failed sequential event auth for incoming"); + continue 'outer; + } + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); + continue 'outer; + } + } + } + } + + // Well, now we have to actually do a bunch of work :( + // The steps are as follows + // 1. Rebuild the sending servers forward extremity, ignoring our own fork + // a) iterate "oldest" -> most recent authenticating each event with the state after the previous + // b) build a `snapshot_map` containing the state after the event for each missing event (EventId -> StateMap) + // 2. Build our side of the fork (TODO do we have to re-auth these, is state at an event relative to the server its from) + // 3. resolve the two states (our current with the state after the most recent missing event) + // Now build up state let mut state_snapshot = seen .iter() .map(|(k, v)| (k.clone(), v.event_id.clone())) - .collect(); - let mut accum_event_map = seen + .collect::>(); + + // TODO: So this is super memory inefficient we clone the state_snapshot every time + // we need it for state events and non + let mut snapshot_map = BTreeMap::new(); + snapshot_map.insert(seen_id.clone().unwrap(), state_snapshot.clone()); + + let accum_event_map = seen .iter() .map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res())) - .collect::>>(); - - if !missing.is_empty() { - // This is the state at incoming pdu - state_snapshot = match state_res::StateResolution::resolve_incoming( - room_id, - &RoomVersionId::Version6, - &state_snapshot, + .chain( missing .iter() .cloned() .flatten() - .filter(|pdu| pdu.state_key.is_some()) // remove non state events - .map(|pdu| ((pdu.kind, pdu.state_key.unwrap()), pdu.event_id)) - .collect(), - Some(accum_event_map), - &db.rooms, - ) { - Ok(res) => res, - Err(err) => { - resolved_map.insert(event_id, Err(err.to_string())); - error!("{}", err); - continue; - } - } - } + .map(|pdu| (pdu.event_id.clone(), pdu.convert_for_state_res())), + ) + .collect::>>(); - // TODO: this only accounts for sequentially missing events no holes will be filled - // and I'm still not sure what happens when a fork introduces multiple `prev_events` - // - // We need to go from oldest (furthest ancestor of the incoming event) to the - // prev_event of the incoming event so we reverse the order oldest -> most recent - for missing_pdu in missing.into_iter().rev() { + // 4. Passes authorization rules based on the event's auth events, otherwise it is rejected. + // 5. Passes authorization rules based on the state at the event, otherwise it is rejected. + for missing_pdu in missing.iter().rev() { match missing_pdu { - AuthEvents::Sequential(missing_pdu) => { + PrevEvents::Sequential(missing_pdu) => { // For state events if missing_pdu.state_key.is_some() { let missing_pdu = missing_pdu.convert_for_state_res(); @@ -691,46 +753,9 @@ pub async fn send_transaction_message_route<'a>( // TODO: do we need this assert_eq!(room_id, missing_pdu.room_id()); - let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - // // Since we know the state from event to event we can do the - // // slightly more efficient force_state - // db.rooms.force_state( - // room_id, - // state_snapshot - // .iter() - // .map(|(k, v)| { - // ( - // k.clone(), - // serde_json::to_vec( - // &db.rooms - // .get_pdu(v) - // .expect("db err") - // .expect("we know of all the pdus"), - // ) - // .expect("serde can serialize pdus"), - // ) - // }) - // .collect(), - // )?; - - db.rooms - .append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?; - // Now append the new missing event to DB it will be part of the - // next events state_snapshot - db.rooms.append_pdu( - &PduEvent::from(&*missing_pdu), - &utils::to_canonical_object(&*missing_pdu) - .expect("Pdu is valid canonical object"), - count, - pdu_id.clone().into(), - &db.globals, - &db.account_data, - &db.admin, - )?; + // We can't add to DB yet since we don't know if it passes + // current room state + // append_state(&db, &PduEvent::from(&*missing_pdu))?; // Only after the state is recorded in the DB can we update the state_snapshot // This will update the state snapshot so it is correct next loop through @@ -738,7 +763,8 @@ pub async fn send_transaction_message_route<'a>( (missing_pdu.kind(), missing_pdu.state_key()), missing_pdu.event_id(), ); - accum_event_map.insert(missing_pdu.event_id(), missing_pdu); + // Keep track of the state after for resolution + snapshot_map.insert(missing_pdu.event_id(), state_snapshot.clone()); } Ok(false) => { error!( @@ -754,200 +780,302 @@ pub async fn send_transaction_message_route<'a>( // TODO: what to do when missing events fail (not incoming events) } } - // All events between the event we know about and the incoming event need to be accounted - // for } else { // TODO: Some auth needs to be done for non state events if !db .rooms .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? { - error!("Sender is not joined {}", missing_pdu.kind); + error!("fork Sender is not joined {}", missing_pdu.kind); // TODO: we probably should not be getting events for different rooms - // - // I think we need to keep going until we reach - // the incoming pdu so do not error here continue; } - let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &missing_pdu)?; - db.rooms.append_pdu( - &missing_pdu, - &utils::to_canonical_object(&missing_pdu) - .expect("Pdu is valid canonical object"), - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; + // TODO: a better way to signal non state events... + snapshot_map.insert(missing_pdu.event_id.clone(), state_snapshot.clone()); // Continue this inner for loop adding all the missing events } } - AuthEvents::Fork(pdus) => { - for missing_pdu in pdus.into_iter().rev() { - // For state events - if missing_pdu.state_key.is_some() { - let missing_pdu = missing_pdu.convert_for_state_res(); - match state_res::StateResolution::apply_event( - room_id, - &RoomVersionId::Version6, - missing_pdu.clone(), - &state_snapshot, - Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? - &db.rooms, - ) { - Ok(true) => { - // TODO: do we need this - assert_eq!(room_id, missing_pdu.room_id()); - - let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id().as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms - .append_to_state(&pdu_id, &PduEvent::from(&*missing_pdu))?; - db.rooms.append_pdu( - &PduEvent::from(&*missing_pdu), - &utils::to_canonical_object(&*missing_pdu) - .expect("Pdu is valid canonical object"), - count, - pdu_id.clone().into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - - // Only after the state is recorded in the DB can we update the state_snapshot - // This will update the state snapshot so it is correct next loop through - state_snapshot.insert( - (missing_pdu.kind(), missing_pdu.state_key()), - missing_pdu.event_id(), - ); - accum_event_map.insert(missing_pdu.event_id(), missing_pdu); + PrevEvents::Fork(pdus) => { + let mut state_sets = BTreeSet::new(); + for pdu in pdus { + let mut state_at = state_snapshot.clone(); + if pdu.state_key.is_some() { + state_at.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu.event_id.clone(), + ); + } + state_sets.insert(state_at); + } + + if state_sets.len() <= 1 { + for missing_pdu in pdus.iter() { + // For state events + if missing_pdu.state_key.is_some() { + let missing_pdu = missing_pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + missing_pdu.clone(), + &state_snapshot, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + // TODO: do we need this + assert_eq!(room_id, missing_pdu.room_id()); + + state_snapshot.insert( + (missing_pdu.kind(), missing_pdu.state_key()), + missing_pdu.event_id(), + ); + snapshot_map + .insert(missing_pdu.event_id(), state_snapshot.clone()); + } + Ok(false) => { + error!( + "apply missing fork: {}", + serde_json::to_string_pretty(&*missing_pdu).unwrap() + ); + continue; + } + Err(e) => { + error!("fork state-res: {}", e); + // TODO: what to do when missing events fail (not incoming events) + } } - Ok(false) => { - error!( - "apply missing fork: {}", - serde_json::to_string_pretty(&*missing_pdu).unwrap() - ); + } else { + // TODO: Some auth needs to be done for non state events + if !db + .rooms + .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? + { + error!("fork Sender is not joined {}", missing_pdu.kind); + // TODO: we probably should not be getting events for different rooms continue; } - Err(e) => { - error!("fork state-res: {}", e); - // TODO: what to do when missing events fail (not incoming events) + snapshot_map + .insert(missing_pdu.event_id.clone(), state_snapshot.clone()); + // Continue this inner for loop adding all the missing events + } + } + } else { + match state_res::StateResolution::resolve( + room_id, + &RoomVersionId::Version6, + &state_sets.into_iter().collect::>(), + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(resolved) => { + for id in resolved + .values() + .filter(|id| pdus.iter().any(|pduid| pduid.event_id == **id)) + { + snapshot_map.insert(id.clone(), resolved.clone()); } } - } else { - // TODO: Some auth needs to be done for non state events - if !db - .rooms - .is_joined(&missing_pdu.sender, &missing_pdu.room_id)? - { - error!("fork Sender is not joined {}", missing_pdu.kind); - // TODO: we probably should not be getting events for different rooms - continue; + Err(err) => { + error!("{}", err); } - - let count = db.globals.next_count()?; - let mut pdu_id = missing_pdu.room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &missing_pdu)?; - db.rooms.append_pdu( - &missing_pdu, - &utils::to_canonical_object(&missing_pdu) - .expect("Pdu is valid canonical object"), - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - // Continue this inner for loop adding all the missing events } } } } } - // Back to the original incoming event - // If it is a non state event we still must add it and associate a statehash with the pdu_id - if value.get("state_key").is_none() { - // TODO: Some auth needs to be done for non state events - if !db.rooms.is_joined(&pdu.sender, room_id)? { - error!("Sender is not joined {}", pdu.kind); - resolved_map.insert(event_id, Err("Sender not found in room".into())); - continue; + // We know these events are valid for each state at the event + // we have already authed them + let known_fork = if let Some(id) = seen_id { + db.rooms.state_from(room_id, &id)? + } else { + vec![] + }; + for pdu in known_fork.clone().into_iter().rev() { + if pdu.state_key.is_some() { + seen.insert((pdu.kind.clone(), pdu.state_key.clone().unwrap()), pdu); } + } - let count = db.globals.next_count()?; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &pdu)?; - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - resolved_map.insert(event_id, Ok::<(), String>(())); - continue; + let current_room_state = seen + .into_iter() + .map(|(k, v)| (k, v.event_id)) + .collect::>(); + + let mut state_sets = BTreeSet::new(); + state_sets.insert(current_room_state.clone()); + + // The first item is the most recent + if let Some(fork) = missing.first() { + for pdu in fork.clone().into_iter() { + if let Some(map) = snapshot_map.get(&pdu.event_id) { + state_sets.insert(map.clone()); + } + } } + let mut state_sets = state_sets.into_iter().collect::>(); + // If we have actual differences resolve + if state_sets.len() > 1 { + // Set the incoming event to have parents from both forks + // ours/theirs + pdu.prev_events = missing + .first() + .cloned() + .into_iter() + .flatten() + .map(|pdu| pdu.event_id) + .chain(known_fork.first().cloned().map(|pdu| pdu.event_id)) + .collect(); + + // The incoming event is a child of both forks now + for set in &mut state_sets { + set.insert( + (pdu.kind.clone(), pdu.state_key.clone().unwrap()), + pdu.event_id.clone(), + ); + } + + // If we have holes or a fork I am less sure what can be guaranteed about our state? + // Or what must be done to fix holes and forks? + match state_res::StateResolution::resolve( + room_id, + &RoomVersionId::Version6, + &state_sets, + Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(resolved) if resolved.values().any(|id| id == &event_id) => { + for resolved_missing in missing + .into_iter() + .rev() // We want the oldest pdu's first + .flatten() + .filter(|pdu| resolved.values().any(|res_id| res_id == &pdu.event_id)) + { + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + db.rooms.append_pdu( + &resolved_missing, + &crate::utils::to_canonical_object(&resolved_missing) + .expect("PDU is valid canonical JSON"), + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + } + + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + // Since we know the state we force_state + // saving the incoming pdu's id with our new state + db.rooms.force_state_with_pdu( + &*pdu_id, + room_id, + resolved + .iter() + .map(|(k, v)| { + ( + k.clone(), + serde_json::to_vec( + &db.rooms + .get_pdu(v) + .expect("db err") + .expect("we know of all the pdus"), + ) + .expect("serde can serialize pdus"), + ) + }) + .collect(), + )?; + db.rooms.append_pdu( + &pdu, + &value, + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + resolved_map.insert(event_id, Ok::<(), String>(())); + } + Ok(resolved) => { + for resolved_missing in missing + .into_iter() + .flatten() + .filter(|pdu| resolved.values().any(|res_id| res_id == &pdu.event_id)) + { + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + db.rooms.append_pdu( + &resolved_missing, + &crate::utils::to_canonical_object(&resolved_missing) + .expect("PDU is valid canonical JSON"), + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.admin, + )?; + } + resolved_map.insert(event_id, Err("Failed event auth".into())); + error!( + "auth failed: {}", + serde_json::to_string_pretty(&pdu).unwrap() + ); + } + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); + } + } // If we have iterated through the incoming missing events sequentially we know that - // the original incoming event is the youngest child and so can be simply authed and append + // the original incoming event is the youngest child and so can be simply authed and appended // to the state - // If we have holes or a fork I am less sure what can be guaranteed about our state? - // Or what must be done to fix holes and forks? - match state_res::StateResolution::apply_event( - room_id, - &RoomVersionId::Version6, - pdu.convert_for_state_res(), // We know this a state event - &state_snapshot, - Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ? - &db.rooms, - ) { - Ok(true) => { - let count = db.globals.next_count()?; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_to_state(&pdu_id, &pdu)?; - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; + } else { + // If it is a non state event we still must add it and associate a statehash with the pdu_id + if pdu.state_key.is_none() { + // TODO: Some auth needs to be done for non state events + if !db.rooms.is_joined(&pdu.sender, room_id)? { + error!("Sender is not joined {}", pdu.kind); + resolved_map.insert(event_id, Err("Sender not found in room".into())); + continue 'outer; + } + + append_state(&db, &pdu)?; resolved_map.insert(event_id, Ok::<(), String>(())); - } - Ok(false) => { - resolved_map.insert(event_id, Err("Failed event auth".into())); - error!( - "auth failed: {}", - serde_json::to_string_pretty(&pdu).unwrap() - ); - } - Err(err) => { - resolved_map.insert(event_id, Err(err.to_string())); - error!("{}", err); + } else { + let incoming = pdu.convert_for_state_res(); + match state_res::StateResolution::apply_event( + room_id, + &RoomVersionId::Version6, + incoming, + ¤t_room_state, + Some(accum_event_map), // TODO: make mut and check on Ok(..) ? + &db.rooms, + ) { + Ok(true) => { + append_state(&db, &pdu)?; + resolved_map.insert(event_id, Ok::<(), String>(())); + } + Ok(false) => { + resolved_map.insert(event_id, Err("Failed event auth".to_string())); + error!("Failed sequential event auth for incoming"); + } + Err(err) => { + resolved_map.insert(event_id, Err(err.to_string())); + error!("{}", err); + } + } } } } @@ -955,6 +1083,24 @@ pub async fn send_transaction_message_route<'a>( Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) } +fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> { + let count = db.globals.next_count()?; + let mut pdu_id = pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_to_state(&pdu_id, pdu)?; + db.rooms.append_pdu( + pdu, + &utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"), + count, + pdu_id.clone().into(), + &db.globals, + &db.account_data, + &db.admin, + ) +} + #[cfg_attr( feature = "conduit_bin", post("/_matrix/federation/v1/get_missing_events/<_>", data = "")