diff --git a/Cargo.lock b/Cargo.lock index 51ccff7..31bb645 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -187,7 +187,7 @@ dependencies = [ "log", "opentelemetry", "opentelemetry-jaeger", - "rand 0.8.3", + "rand", "regex", "reqwest", "ring", @@ -780,9 +780,9 @@ checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135" [[package]] name = "itertools" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b" +checksum = "37d572918e350e82412fe766d24b15e6682fb2ed2bbe018280caa810397cb319" dependencies = [ "either", ] @@ -810,9 +810,9 @@ dependencies = [ [[package]] name = "js_int" -version = "0.1.9" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b96797f53235a1d6dc985f244a69de54b04c45b7e0e357a35c85a45a847d92f2" +checksum = "fcae89e078a96b781b38f36225bb3a174b8f6e905dfec550dd16a13539c82acc" dependencies = [ "serde", ] @@ -1106,7 +1106,7 @@ dependencies = [ "lazy_static", "percent-encoding", "pin-project", - "rand 0.8.3", + "rand", "thiserror", ] @@ -1317,19 +1317,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom 0.1.16", - "libc", - "rand_chacha 0.2.2", - "rand_core 0.5.1", - "rand_hc 0.2.0", -] - [[package]] name = "rand" version = "0.8.3" @@ -1337,19 +1324,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" dependencies = [ "libc", - "rand_chacha 0.3.0", - "rand_core 0.6.2", - "rand_hc 0.3.0", -] - -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core 0.5.1", + "rand_chacha", + "rand_core", + "rand_hc", ] [[package]] @@ -1359,16 +1336,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" dependencies = [ "ppv-lite86", - "rand_core 0.6.2", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom 0.1.16", + "rand_core", ] [[package]] @@ -1380,22 +1348,13 @@ dependencies = [ "getrandom 0.2.2", ] -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core 0.5.1", -] - [[package]] name = "rand_hc" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" dependencies = [ - "rand_core 0.6.2", + "rand_core", ] [[package]] @@ -1556,7 +1515,7 @@ dependencies = [ "memchr", "num_cpus", "parking_lot", - "rand 0.8.3", + "rand", "ref-cast", "rocket_codegen", "rocket_http", @@ -1610,8 +1569,8 @@ dependencies = [ [[package]] name = "ruma" -version = "0.0.1" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.0.2" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "assign", "js_int", @@ -1622,14 +1581,16 @@ dependencies = [ "ruma-events", "ruma-federation-api", "ruma-identifiers", + "ruma-identity-service-api", + "ruma-push-gateway-api", "ruma-serde", "ruma-signatures", ] [[package]] name = "ruma-api" -version = "0.17.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.17.0-alpha.2" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "http", "percent-encoding", @@ -1643,8 +1604,8 @@ dependencies = [ [[package]] name = "ruma-api-macros" -version = "0.17.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.17.0-alpha.2" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1654,8 +1615,8 @@ dependencies = [ [[package]] name = "ruma-appservice-api" -version = "0.2.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.2.0-alpha.2" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "ruma-api", "ruma-common", @@ -1668,8 +1629,8 @@ dependencies = [ [[package]] name = "ruma-client-api" -version = "0.10.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.10.0-alpha.2" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "assign", "http", @@ -1687,8 +1648,8 @@ dependencies = [ [[package]] name = "ruma-common" -version = "0.2.0" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.3.0-alpha.1" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "js_int", "maplit", @@ -1700,8 +1661,8 @@ dependencies = [ [[package]] name = "ruma-events" -version = "0.22.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.22.0-alpha.2" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "js_int", "ruma-common", @@ -1714,8 +1675,8 @@ dependencies = [ [[package]] name = "ruma-events-macros" -version = "0.22.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.22.0-alpha.2" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1725,8 +1686,8 @@ dependencies = [ [[package]] name = "ruma-federation-api" -version = "0.0.3" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.1.0-alpha.1" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "js_int", "ruma-api", @@ -1740,22 +1701,22 @@ dependencies = [ [[package]] name = "ruma-identifiers" -version = "0.17.4" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.18.0-alpha.1" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "paste", - "rand 0.7.3", + "rand", "ruma-identifiers-macros", "ruma-identifiers-validation", "ruma-serde", + "ruma-serde-macros", "serde", - "strum", ] [[package]] name = "ruma-identifiers-macros" -version = "0.17.4" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.18.0-alpha.1" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "proc-macro2", "quote", @@ -1765,16 +1726,41 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" -version = "0.1.1" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.2.0" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" + +[[package]] +name = "ruma-identity-service-api" +version = "0.0.1" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ + "ruma-api", + "ruma-common", + "ruma-identifiers", + "ruma-serde", + "serde", + "serde_json", +] + +[[package]] +name = "ruma-push-gateway-api" +version = "0.0.1" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" +dependencies = [ + "js_int", + "ruma-api", + "ruma-common", + "ruma-events", + "ruma-identifiers", + "ruma-serde", "serde", + "serde_json", ] [[package]] name = "ruma-serde" -version = "0.2.3" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.3.0" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "form_urlencoded", "itoa", @@ -1786,8 +1772,8 @@ dependencies = [ [[package]] name = "ruma-serde-macros" -version = "0.2.0" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.3.0" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1797,10 +1783,10 @@ dependencies = [ [[package]] name = "ruma-signatures" -version = "0.6.0-dev.1" -source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" +version = "0.6.0-alpha.1" +source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" dependencies = [ - "base64 0.12.3", + "base64 0.13.0", "ring", "ruma-identifiers", "ruma-serde", @@ -2065,15 +2051,15 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" [[package]] name = "state-res" version = "0.1.0" -source = "git+https://github.com/ruma/state-res?branch=timo-spec-comp#a1c15253f0777baad251da47c3f2c016cfed6f7e" +source = "git+https://github.com/ruma/state-res?rev=d34a78c5b66de419862d9e592bde8e0007111ebd#d34a78c5b66de419862d9e592bde8e0007111ebd" dependencies = [ "itertools", + "log", "maplit", "ruma", "serde", "serde_json", "thiserror", - "tracing", ] [[package]] @@ -2125,27 +2111,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" -[[package]] -name = "strum" -version = "0.19.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b89a286a7e3b5720b9a477b23253bc50debac207c8d21505f8e70b36792f11b5" -dependencies = [ - "strum_macros", -] - -[[package]] -name = "strum_macros" -version = "0.19.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e61bb0be289045cb80bfce000512e32d09f8337e54c186725da381377ad1f8d5" -dependencies = [ - "heck", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "syn" version = "1.0.60" @@ -2165,7 +2130,7 @@ checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ "cfg-if", "libc", - "rand 0.8.3", + "rand", "redox_syscall 0.2.5", "remove_dir_all", "winapi", @@ -2470,7 +2435,7 @@ dependencies = [ "ipnet", "lazy_static", "log", - "rand 0.8.3", + "rand", "smallvec", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 8addf50..a45cba6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,14 +18,13 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "93e62c86e #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } # Used for matrix spec type definitions and helpers -ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "ee814aa84934530d76f5e4b275d739805b49bdef" } -# ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "unstable-join" } +ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" } +# ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "verified-export" } # ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] } # Used when doing state resolution # state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] } -state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec", "gen-eventid"] } -#state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] } +state-res = { git = "https://github.com/ruma/state-res", rev = "d34a78c5b66de419862d9e592bde8e0007111ebd", features = ["unstable-pre-spec", "gen-eventid"] }#state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] } # Used for long polling and federation sender, should be the same as rocket::tokio tokio = "1.2.0" diff --git a/src/client_server/account.rs b/src/client_server/account.rs index 044468b..1fa355d 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -241,11 +241,7 @@ pub async fn register_route( }, &conduit_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 2. Make conduit bot join @@ -266,11 +262,7 @@ pub async fn register_route( }, &conduit_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 3. Power levels @@ -304,11 +296,7 @@ pub async fn register_route( }, &conduit_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 4.1 Join Rules @@ -325,11 +313,7 @@ pub async fn register_route( }, &conduit_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 4.2 History Visibility @@ -348,11 +332,7 @@ pub async fn register_route( }, &conduit_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 4.3 Guest Access @@ -369,11 +349,7 @@ pub async fn register_route( }, &conduit_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 6. Events implied by name and topic @@ -392,11 +368,7 @@ pub async fn register_route( }, &conduit_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; db.rooms.build_and_append_pdu( @@ -412,11 +384,7 @@ pub async fn register_route( }, &conduit_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // Room alias @@ -438,11 +406,7 @@ pub async fn register_route( }, &conduit_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?; @@ -465,11 +429,7 @@ pub async fn register_route( }, &conduit_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; db.rooms.build_and_append_pdu( PduBuilder { @@ -488,11 +448,7 @@ pub async fn register_route( }, &user_id, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // Send welcome message @@ -517,11 +473,7 @@ pub async fn register_route( }, &conduit_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; } @@ -696,11 +648,7 @@ pub async fn deactivate_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; } @@ -716,3 +664,17 @@ pub async fn deactivate_route( } .into()) } + +/*/ +#[cfg_attr( + feature = "conduit_bin", + get("/_matrix/client/r0/account/3pid", data = "") +)] +pub async fn third_party_route( + body: Ruma>, +) -> ConduitResult { + let sender_user = body.sender_user.as_ref().expect("user is authenticated"); + + Ok(account::add_3pid::Response::default().into()) +} +*/ diff --git a/src/client_server/alias.rs b/src/client_server/alias.rs index 0a8ad08..03d4909 100644 --- a/src/client_server/alias.rs +++ b/src/client_server/alias.rs @@ -74,7 +74,7 @@ pub async fn get_alias_helper( .sending .send_federation_request( &db.globals, - room_alias.server_name().to_owned(), + room_alias.server_name(), federation::query::get_room_information::v1::Request { room_alias }, ) .await?; diff --git a/src/client_server/directory.rs b/src/client_server/directory.rs index 0dadde9..ae70ec5 100644 --- a/src/client_server/directory.rs +++ b/src/client_server/directory.rs @@ -141,7 +141,7 @@ pub async fn get_public_rooms_filtered_helper( .sending .send_federation_request( &db.globals, - other_server.to_owned(), + other_server, federation::directory::get_public_rooms_filtered::v1::Request { limit, since: since.as_deref(), diff --git a/src/client_server/media.rs b/src/client_server/media.rs index 2db4fc6..57fc2b0 100644 --- a/src/client_server/media.rs +++ b/src/client_server/media.rs @@ -80,7 +80,7 @@ pub async fn get_content_route( .sending .send_federation_request( &db.globals, - body.server_name.clone(), + &body.server_name, get_content::Request { allow_remote: false, server_name: &body.server_name, @@ -130,12 +130,12 @@ pub async fn get_content_thumbnail_route( .sending .send_federation_request( &db.globals, - body.server_name.clone(), + &body.server_name, get_content_thumbnail::Request { allow_remote: false, height: body.height, width: body.width, - method: body.method, + method: body.method.clone(), server_name: &body.server_name, media_id: &body.media_id, }, diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 287cfbb..2a10c81 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -21,7 +21,7 @@ use ruma::{ serde::{to_canonical_value, CanonicalJsonObject, Raw}, EventId, RoomId, RoomVersionId, ServerName, UserId, }; -use state_res::StateEvent; +use state_res::Event; use std::{ collections::{BTreeMap, HashMap, HashSet}, convert::TryFrom, @@ -127,11 +127,7 @@ pub async fn leave_room_route( }, &sender_user, &body.room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; db.flush().await?; @@ -168,11 +164,7 @@ pub async fn invite_user_route( }, &sender_user, &body.room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; db.flush().await?; @@ -225,11 +217,7 @@ pub async fn kick_user_route( }, &sender_user, &body.room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; db.flush().await?; @@ -286,11 +274,7 @@ pub async fn ban_user_route( }, &sender_user, &body.room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; db.flush().await?; @@ -339,11 +323,7 @@ pub async fn unban_user_route( }, &sender_user, &body.room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; db.flush().await?; @@ -479,7 +459,7 @@ async fn join_room_by_id_helper( .sending .send_federation_request( &db.globals, - remote_server.clone(), + remote_server, federation::membership::create_join_event_template::v1::Request { room_id, user_id: sender_user, @@ -558,7 +538,7 @@ async fn join_room_by_id_helper( .sending .send_federation_request( &db.globals, - remote_server.clone(), + remote_server, federation::membership::create_join_event::v2::Request { room_id, event_id: &event_id, @@ -605,19 +585,19 @@ async fn join_room_by_id_helper( .chain(iter::once(Ok((event_id, join_event)))) // Add join event we just created .map(|r| { let (event_id, value) = r?; - state_res::StateEvent::from_id_canon_obj(event_id.clone(), value.clone()) + PduEvent::from_id_val(&event_id, value.clone()) .map(|ev| (event_id, Arc::new(ev))) .map_err(|e| { warn!("{:?}: {}", value, e); Error::BadServerResponse("Invalid PDU in send_join response.") }) }) - .collect::>>>()?; + .collect::>>>()?; let control_events = event_map .values() - .filter(|pdu| pdu.is_power_event()) - .map(|pdu| pdu.event_id()) + .filter(|pdu| state_res::is_power_event(pdu)) + .map(|pdu| pdu.event_id.clone()) .collect::>(); // These events are not guaranteed to be sorted but they are resolved according to spec @@ -629,7 +609,6 @@ async fn join_room_by_id_helper( &room_id, &control_events, &mut event_map, - &db.rooms, &event_ids, ); @@ -640,7 +619,6 @@ async fn join_room_by_id_helper( &sorted_control_events, &BTreeMap::new(), // We have no "clean/resolved" events to add (these extend the `resolved_control_events`) &mut event_map, - &db.rooms, ) .expect("iterative auth check failed on resolved events"); @@ -657,14 +635,14 @@ async fn join_room_by_id_helper( .cloned() .collect::>(); - let power_level = resolved_control_events.get(&(EventType::RoomPowerLevels, "".into())); + let power_level = + resolved_control_events.get(&(EventType::RoomPowerLevels, Some("".into()))); // Sort the remaining non control events let sorted_event_ids = state_res::StateResolution::mainline_sort( room_id, &events_to_sort, power_level, &mut event_map, - &db.rooms, ); let resolved_events = state_res::StateResolution::iterative_auth_check( @@ -673,7 +651,6 @@ async fn join_room_by_id_helper( &sorted_event_ids, &resolved_control_events, &mut event_map, - &db.rooms, ) .expect("iterative auth check failed on resolved events"); @@ -696,17 +673,17 @@ async fn join_room_by_id_helper( pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); db.rooms.append_pdu( - &PduEvent::from(&**pdu), + &pdu, utils::to_canonical_object(&**pdu).expect("Pdu is valid canonical object"), count, pdu_id.clone().into(), - &db.globals, - &db.account_data, - &db.admin, + &db, )?; if state_events.contains(ev_id) { - state.insert((pdu.kind(), pdu.state_key()), pdu_id); + if let Some(key) = &pdu.state_key { + state.insert((pdu.kind(), key.to_string()), pdu_id); + } } } @@ -730,11 +707,7 @@ async fn join_room_by_id_helper( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; } diff --git a/src/client_server/message.rs b/src/client_server/message.rs index 39a61cb..04f27de 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -8,7 +8,10 @@ use ruma::{ events::EventContent, EventId, }; -use std::convert::{TryFrom, TryInto}; +use std::{ + collections::BTreeMap, + convert::{TryFrom, TryInto}, +}; #[cfg(feature = "conduit_bin")] use rocket::{get, put}; @@ -47,7 +50,7 @@ pub async fn send_message_event_route( return Ok(send_message_event::Response { event_id }.into()); } - let mut unsigned = serde_json::Map::new(); + let mut unsigned = BTreeMap::new(); unsigned.insert("transaction_id".to_owned(), body.txn_id.clone().into()); let event_id = db.rooms.build_and_append_pdu( @@ -66,11 +69,7 @@ pub async fn send_message_event_route( }, &sender_user, &body.room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; db.transaction_ids.add_txnid( diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index bd8425a..7e57c1e 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -64,11 +64,7 @@ pub async fn set_displayname_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // Presence update @@ -163,11 +159,7 @@ pub async fn set_avatar_url_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // Presence update diff --git a/src/client_server/push.rs b/src/client_server/push.rs index 5403f96..a4e6161 100644 --- a/src/client_server/push.rs +++ b/src/client_server/push.rs @@ -5,10 +5,11 @@ use ruma::{ error::ErrorKind, r0::push::{ delete_pushrule, get_pushers, get_pushrule, get_pushrule_actions, get_pushrule_enabled, - get_pushrules_all, set_pushrule, set_pushrule_actions, set_pushrule_enabled, RuleKind, + get_pushrules_all, set_pusher, set_pushrule, set_pushrule_actions, + set_pushrule_enabled, RuleKind, }, }, - events::EventType, + events::{push_rules, EventType}, push::{ ConditionalPushRuleInit, ContentPushRule, OverridePushRule, PatternedPushRuleInit, RoomPushRule, SenderPushRule, SimplePushRuleInit, UnderridePushRule, @@ -31,7 +32,7 @@ pub async fn get_pushrules_all_route( let event = db .account_data - .get::(None, &sender_user, EventType::PushRules)? + .get::(None, &sender_user, EventType::PushRules)? .ok_or(Error::BadRequest( ErrorKind::NotFound, "PushRules event not found.", @@ -56,7 +57,7 @@ pub async fn get_pushrule_route( let event = db .account_data - .get::(None, &sender_user, EventType::PushRules)? + .get::(None, &sender_user, EventType::PushRules)? .ok_or(Error::BadRequest( ErrorKind::NotFound, "PushRules event not found.", @@ -95,7 +96,10 @@ pub async fn get_pushrule_route( if let Some(rule) = rule { Ok(get_pushrule::Response { rule }.into()) } else { - Err(Error::BadRequest(ErrorKind::NotFound, "Push rule not found.")) + Err(Error::BadRequest( + ErrorKind::NotFound, + "Push rule not found.", + )) } } @@ -119,7 +123,7 @@ pub async fn set_pushrule_route( let mut event = db .account_data - .get::(None, &sender_user, EventType::PushRules)? + .get::(None, &sender_user, EventType::PushRules)? .ok_or(Error::BadRequest( ErrorKind::NotFound, "PushRules event not found.", @@ -266,7 +270,7 @@ pub async fn get_pushrule_actions_route( let mut event = db .account_data - .get::(None, &sender_user, EventType::PushRules)? + .get::(None, &sender_user, EventType::PushRules)? .ok_or(Error::BadRequest( ErrorKind::NotFound, "PushRules event not found.", @@ -330,7 +334,7 @@ pub async fn set_pushrule_actions_route( let mut event = db .account_data - .get::(None, &sender_user, EventType::PushRules)? + .get::(None, &sender_user, EventType::PushRules)? .ok_or(Error::BadRequest( ErrorKind::NotFound, "PushRules event not found.", @@ -434,7 +438,7 @@ pub async fn get_pushrule_enabled_route( let mut event = db .account_data - .get::(None, &sender_user, EventType::PushRules)? + .get::(None, &sender_user, EventType::PushRules)? .ok_or(Error::BadRequest( ErrorKind::NotFound, "PushRules event not found.", @@ -495,7 +499,7 @@ pub async fn set_pushrule_enabled_route( let mut event = db .account_data - .get::(None, &sender_user, EventType::PushRules)? + .get::(None, &sender_user, EventType::PushRules)? .ok_or(Error::BadRequest( ErrorKind::NotFound, "PushRules event not found.", @@ -599,7 +603,7 @@ pub async fn delete_pushrule_route( let mut event = db .account_data - .get::(None, &sender_user, EventType::PushRules)? + .get::(None, &sender_user, EventType::PushRules)? .ok_or(Error::BadRequest( ErrorKind::NotFound, "PushRules event not found.", @@ -673,22 +677,38 @@ pub async fn delete_pushrule_route( Ok(delete_pushrule::Response.into()) } -#[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/pushers"))] -#[tracing::instrument] -pub async fn get_pushers_route() -> ConduitResult { +#[cfg_attr( + feature = "conduit_bin", + get("/_matrix/client/r0/pushers", data = "") +)] +#[tracing::instrument(skip(db))] +pub async fn get_pushers_route( + db: State<'_, Database>, + body: Ruma, +) -> ConduitResult { + let sender = body.sender_user.as_ref().expect("authenticated endpoint"); + Ok(get_pushers::Response { - pushers: Vec::new(), + pushers: db.pusher.get_pusher(sender)?, } .into()) } -#[cfg_attr(feature = "conduit_bin", post("/_matrix/client/r0/pushers/set"))] +#[cfg_attr( + feature = "conduit_bin", + post("/_matrix/client/r0/pushers/set", data = "") +)] #[tracing::instrument(skip(db))] -pub async fn set_pushers_route(db: State<'_, Database>) -> ConduitResult { +pub async fn set_pushers_route( + db: State<'_, Database>, + body: Ruma, +) -> ConduitResult { + let sender = body.sender_user.as_ref().expect("authenticated endpoint"); + let pusher = body.pusher.clone(); + + db.pusher.set_pusher(sender, pusher)?; + db.flush().await?; - Ok(get_pushers::Response { - pushers: Vec::new(), - } - .into()) + Ok(set_pusher::Response::default().into()) } diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs index af277db..be5d3b1 100644 --- a/src/client_server/redact.rs +++ b/src/client_server/redact.rs @@ -32,11 +32,7 @@ pub async fn redact_event_route( }, &sender_user, &body.room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; db.flush().await?; diff --git a/src/client_server/room.rs b/src/client_server/room.rs index e2c931c..409028c 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -66,11 +66,7 @@ pub async fn create_room_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 2. Let the room creator join @@ -91,11 +87,7 @@ pub async fn create_room_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 3. Power levels @@ -136,11 +128,7 @@ pub async fn create_room_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 4. Events set by preset @@ -176,11 +164,7 @@ pub async fn create_room_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 4.2 History Visibility @@ -197,11 +181,7 @@ pub async fn create_room_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 4.3 Guest Access @@ -226,11 +206,7 @@ pub async fn create_room_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // 5. Events listed in initial_state @@ -245,16 +221,8 @@ pub async fn create_room_route( continue; } - db.rooms.build_and_append_pdu( - pdu_builder, - &sender_user, - &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, - )?; + db.rooms + .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db)?; } // 6. Events implied by name and topic @@ -274,11 +242,7 @@ pub async fn create_room_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; } @@ -296,11 +260,7 @@ pub async fn create_room_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; } @@ -323,11 +283,7 @@ pub async fn create_room_route( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; } @@ -416,11 +372,7 @@ pub async fn upgrade_room_route( }, sender_user, &body.room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // Get the old room federations status @@ -460,11 +412,7 @@ pub async fn upgrade_room_route( }, sender_user, &replacement_room, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // Join the new room @@ -485,11 +433,7 @@ pub async fn upgrade_room_route( }, sender_user, &replacement_room, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; // Recommended transferable state events list from the specs @@ -522,11 +466,7 @@ pub async fn upgrade_room_route( }, sender_user, &replacement_room, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; } @@ -569,11 +509,7 @@ pub async fn upgrade_room_route( }, sender_user, &body.room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; db.flush().await?; diff --git a/src/client_server/state.rs b/src/client_server/state.rs index 073d94f..57bf7e5 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -289,11 +289,7 @@ pub async fn send_state_event_for_key_helper( }, &sender_user, &room_id, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, )?; Ok(event_id) diff --git a/src/database.rs b/src/database.rs index 6dc9c70..73706a4 100644 --- a/src/database.rs +++ b/src/database.rs @@ -4,6 +4,7 @@ pub mod appservice; pub mod globals; pub mod key_backups; pub mod media; +pub mod pusher; pub mod rooms; pub mod sending; pub mod transaction_ids; @@ -17,9 +18,11 @@ use log::info; use rocket::futures::{self, channel::mpsc}; use ruma::{DeviceId, ServerName, UserId}; use serde::Deserialize; -use std::collections::HashMap; -use std::fs::remove_dir_all; -use std::sync::{Arc, RwLock}; +use std::{ + collections::HashMap, + fs::remove_dir_all, + sync::{Arc, RwLock}, +}; use tokio::sync::Semaphore; #[derive(Clone, Deserialize)] @@ -76,6 +79,7 @@ pub struct Database { pub sending: sending::Sending, pub admin: admin::Admin, pub appservice: appservice::Appservice, + pub pusher: pusher::PushData, pub _db: sled::Db, } @@ -104,7 +108,7 @@ impl Database { let (admin_sender, admin_receiver) = mpsc::unbounded(); let db = Self { - globals: globals::Globals::load(db.open_tree("global")?, config).await?, + globals: globals::Globals::load(db.open_tree("global")?, config)?, users: users::Users { userid_password: db.open_tree("userid_password")?, userid_displayname: db.open_tree("userid_displayname")?, @@ -184,6 +188,7 @@ impl Database { cached_registrations: Arc::new(RwLock::new(HashMap::new())), id_appserviceregistrations: db.open_tree("id_appserviceregistrations")?, }, + pusher: pusher::PushData::new(&db)?, _db: db, }; diff --git a/src/database/admin.rs b/src/database/admin.rs index 160f55a..3014385 100644 --- a/src/database/admin.rs +++ b/src/database/admin.rs @@ -59,11 +59,7 @@ impl Admin { }, &conduit_user, &conduit_room, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - &db.appservice, + &db, ) .unwrap(); } diff --git a/src/database/appservice.rs b/src/database/appservice.rs index 26ea5b9..764291d 100644 --- a/src/database/appservice.rs +++ b/src/database/appservice.rs @@ -1,6 +1,8 @@ use crate::{utils, Error, Result}; -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; #[derive(Clone)] pub struct Appservice { diff --git a/src/database/globals.rs b/src/database/globals.rs index 6004c10..6f89f6f 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -22,7 +22,7 @@ pub struct Globals { } impl Globals { - pub async fn load(globals: sled::Tree, config: Config) -> Result { + pub fn load(globals: sled::Tree, config: Config) -> Result { let bytes = &*globals .update_and_fetch("keypair", utils::generate_keypair)? .expect("utils::generate_keypair always returns Some"); diff --git a/src/database/key_backups.rs b/src/database/key_backups.rs index a50e45e..4c65354 100644 --- a/src/database/key_backups.rs +++ b/src/database/key_backups.rs @@ -2,7 +2,7 @@ use crate::{utils, Error, Result}; use ruma::{ api::client::{ error::ErrorKind, - r0::backup::{BackupAlgorithm, KeyData, Sessions}, + r0::backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup}, }, RoomId, UserId, }; @@ -129,7 +129,7 @@ impl KeyBackups { version: &str, room_id: &RoomId, session_id: &str, - key_data: &KeyData, + key_data: &KeyBackupData, globals: &super::globals::Globals, ) -> Result<()> { let mut key = user_id.to_string().as_bytes().to_vec(); @@ -153,7 +153,7 @@ impl KeyBackups { self.backupkeyid_backup.insert( &key, - &*serde_json::to_string(&key_data).expect("KeyData::to_string always works"), + &*serde_json::to_string(&key_data).expect("KeyBackupData::to_string always works"), )?; Ok(()) @@ -182,13 +182,17 @@ impl KeyBackups { .to_string()) } - pub fn get_all(&self, user_id: &UserId, version: &str) -> Result> { + pub fn get_all( + &self, + user_id: &UserId, + version: &str, + ) -> Result> { let mut prefix = user_id.to_string().as_bytes().to_vec(); prefix.push(0xff); prefix.extend_from_slice(version.as_bytes()); prefix.push(0xff); - let mut rooms = BTreeMap::::new(); + let mut rooms = BTreeMap::::new(); for result in self.backupkeyid_backup.scan_prefix(&prefix).map(|r| { let (key, value) = r?; @@ -211,15 +215,16 @@ impl KeyBackups { ) .map_err(|_| Error::bad_database("backupkeyid_backup room_id is invalid room id."))?; - let key_data = serde_json::from_slice(&value) - .map_err(|_| Error::bad_database("KeyData in backupkeyid_backup is invalid."))?; + let key_data = serde_json::from_slice(&value).map_err(|_| { + Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") + })?; Ok::<_, Error>((room_id, session_id, key_data)) }) { let (room_id, session_id, key_data) = result?; rooms .entry(room_id) - .or_insert_with(|| Sessions { + .or_insert_with(|| RoomKeyBackup { sessions: BTreeMap::new(), }) .sessions @@ -234,7 +239,7 @@ impl KeyBackups { user_id: &UserId, version: &str, room_id: &RoomId, - ) -> BTreeMap { + ) -> BTreeMap { let mut prefix = user_id.to_string().as_bytes().to_vec(); prefix.push(0xff); prefix.extend_from_slice(version.as_bytes()); @@ -257,7 +262,7 @@ impl KeyBackups { })?; let key_data = serde_json::from_slice(&value).map_err(|_| { - Error::bad_database("KeyData in backupkeyid_backup is invalid.") + Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") })?; Ok::<_, Error>((session_id, key_data)) @@ -272,7 +277,7 @@ impl KeyBackups { version: &str, room_id: &RoomId, session_id: &str, - ) -> Result> { + ) -> Result> { let mut key = user_id.to_string().as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(version.as_bytes()); @@ -284,8 +289,9 @@ impl KeyBackups { self.backupkeyid_backup .get(&key)? .map(|value| { - serde_json::from_slice(&value) - .map_err(|_| Error::bad_database("KeyData in backupkeyid_backup is invalid.")) + serde_json::from_slice(&value).map_err(|_| { + Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") + }) }) .transpose() } diff --git a/src/database/pusher.rs b/src/database/pusher.rs new file mode 100644 index 0000000..2380196 --- /dev/null +++ b/src/database/pusher.rs @@ -0,0 +1,578 @@ +use crate::{database::globals::Globals, Database, Error, PduEvent, Result}; +use log::{error, info, warn}; +use rocket::futures::stream::{FuturesUnordered, StreamExt}; +use ruma::{ + api::{ + client::r0::push::{Pusher, PusherKind}, + push_gateway::send_event_notification::v1::{ + self as send_event_notification, Device, Notification, NotificationCounts, + NotificationPriority, + }, + OutgoingRequest, + }, + events::room::{ + member::{MemberEventContent, MembershipState}, + message::{MessageEventContent, TextMessageEventContent}, + power_levels::PowerLevelsEventContent, + }, + events::EventType, + push::{Action, PushCondition, PushFormat, Ruleset, Tweak}, + uint, EventId, RoomAliasId, RoomId, UInt, UserId, +}; +use serde_json::value::RawValue as RawJsonValue; + +use std::{convert::TryFrom, fmt::Debug, time::Duration}; + +#[derive(Debug, Clone)] +pub struct PushData { + /// UserId + pushkey -> Pusher + pub(super) senderkey_pusher: sled::Tree, +} + +impl PushData { + pub fn new(db: &sled::Db) -> Result { + Ok(Self { + senderkey_pusher: db.open_tree("senderkey_pusher")?, + }) + } + + pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> { + let mut key = sender.as_bytes().to_vec(); + key.extend_from_slice(pusher.pushkey.as_bytes()); + + // There are 2 kinds of pushers but the spec says: null deletes the pusher. + if pusher.kind.is_none() { + return self + .senderkey_pusher + .remove(key) + .map(|_| ()) + .map_err(Into::into); + } + + self.senderkey_pusher.insert( + key, + &*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"), + )?; + + Ok(()) + } + + pub fn get_pusher(&self, sender: &UserId) -> Result> { + self.senderkey_pusher + .scan_prefix(sender.as_bytes()) + .values() + .map(|push: std::result::Result| { + let push = push.map_err(|_| Error::bad_database("Invalid push bytes in db."))?; + Ok(serde_json::from_slice(&*push) + .map_err(|_| Error::bad_database("Invalid Pusher in db."))?) + }) + .collect() + } +} + +pub async fn send_request( + globals: &crate::database::globals::Globals, + destination: &str, + request: T, +) -> Result +where + T: Debug, +{ + let destination = destination.replace("/_matrix/push/v1/notify", ""); + + let http_request = request + .try_into_http_request(&destination, Some("")) + .map_err(|e| { + warn!("Failed to find destination {}: {}", destination, e); + Error::BadServerResponse("Invalid destination") + })?; + + let mut reqwest_request = reqwest::Request::try_from(http_request) + .expect("all http requests are valid reqwest requests"); + + // TODO: we could keep this very short and let expo backoff do it's thing... + *reqwest_request.timeout_mut() = Some(Duration::from_secs(5)); + + let url = reqwest_request.url().clone(); + let reqwest_response = globals.reqwest_client().execute(reqwest_request).await; + + // Because reqwest::Response -> http::Response is complicated: + match reqwest_response { + Ok(mut reqwest_response) => { + let status = reqwest_response.status(); + let mut http_response = http::Response::builder().status(status); + let headers = http_response.headers_mut().unwrap(); + + for (k, v) in reqwest_response.headers_mut().drain() { + if let Some(key) = k { + headers.insert(key, v); + } + } + + let status = reqwest_response.status(); + + let body = reqwest_response + .bytes() + .await + .unwrap_or_else(|e| { + warn!("server error {}", e); + Vec::new().into() + }) // TODO: handle timeout + .into_iter() + .collect::>(); + + if status != 200 { + info!( + "Push gateway returned bad response {} {}\n{}\n{:?}", + destination, + status, + url, + crate::utils::string_from_bytes(&body) + ); + } + + let response = T::IncomingResponse::try_from( + http_response + .body(body) + .expect("reqwest body is valid http body"), + ); + response.map_err(|_| { + info!( + "Push gateway returned invalid response bytes {}\n{}", + destination, url + ); + Error::BadServerResponse("Push gateway returned bad response.") + }) + } + Err(e) => Err(e.into()), + } +} + +pub async fn send_push_notice( + user: &UserId, + unread: UInt, + pushers: &[Pusher], + ruleset: Ruleset, + pdu: &PduEvent, + db: &Database, +) -> Result<()> { + for rule in ruleset.into_iter() { + // TODO: can actions contain contradictory Actions + if rule + .actions + .iter() + .any(|act| matches!(act, ruma::push::Action::DontNotify)) + || !rule.enabled + { + continue; + } + + match rule.rule_id.as_str() { + ".m.rule.master" => {} + ".m.rule.suppress_notices" => { + if pdu.kind == EventType::RoomMessage + && pdu + .content + .get("msgtype") + .map_or(false, |ty| ty == "m.notice") + { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + ".m.rule.invite_for_me" => { + if let EventType::RoomMember = &pdu.kind { + if pdu.state_key.as_deref() == Some(user.as_str()) + && serde_json::from_value::(pdu.content.clone()) + .map_err(|_| Error::bad_database("PDU contained bad message content"))? + .membership + == MembershipState::Invite + { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + } + ".m.rule.member_event" => { + if let EventType::RoomMember = &pdu.kind { + // TODO use this? + let _member = serde_json::from_value::(pdu.content.clone()) + .map_err(|_| Error::bad_database("PDU contained bad message content"))?; + if let Some(conditions) = rule.conditions { + if conditions.iter().any(|cond| match cond { + PushCondition::EventMatch { key, pattern } => { + let mut json = + serde_json::to_value(pdu).expect("PDU is valid JSON"); + for key in key.split('.') { + json = json[key].clone(); + } + // TODO: this is baddddd + json.to_string().contains(pattern) + } + _ => false, + }) { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + } + } + ".m.rule.contains_display_name" => { + if let EventType::RoomMessage = &pdu.kind { + let msg_content = + serde_json::from_value::(pdu.content.clone()) + .map_err(|_| { + Error::bad_database("PDU contained bad message content") + })?; + if let MessageEventContent::Text(TextMessageEventContent { body, .. }) = + &msg_content + { + if body.contains(user.localpart()) { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + } + } + ".m.rule.tombstone" => { + if pdu.kind == EventType::RoomTombstone && pdu.state_key.as_deref() == Some("") { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + ".m.rule.roomnotif" => { + if let EventType::RoomMessage = &pdu.kind { + let msg_content = + serde_json::from_value::(pdu.content.clone()) + .map_err(|_| { + Error::bad_database("PDU contained bad message content") + })?; + if let MessageEventContent::Text(TextMessageEventContent { body, .. }) = + &msg_content + { + let power_level_cmp = |pl: PowerLevelsEventContent| { + &pl.notifications.room + <= pl.users.get(&pdu.sender).unwrap_or(&ruma::int!(0)) + }; + let deserialize = |pl: PduEvent| { + serde_json::from_value::(pl.content).ok() + }; + if body.contains("@room") + && db + .rooms + .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")? + .map(|(_, pl)| pl) + .map(deserialize) + .flatten() + .map_or(false, power_level_cmp) + { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + } + } + ".m.rule.contains_user_name" => { + if let EventType::RoomMessage = &pdu.kind { + let msg_content = + serde_json::from_value::(pdu.content.clone()) + .map_err(|_| { + Error::bad_database("PDU contained bad message content") + })?; + if let MessageEventContent::Text(TextMessageEventContent { body, .. }) = + &msg_content + { + if body.contains(user.localpart()) { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + } + } + ".m.rule.call" => { + if pdu.kind == EventType::CallInvite { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + ".m.rule.encrypted_room_one_to_one" => { + if db.rooms.room_members(&pdu.room_id).count() == 2 + && pdu.kind == EventType::RoomEncrypted + { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + ".m.rule.room_one_to_one" => { + if db.rooms.room_members(&pdu.room_id).count() == 2 + && pdu.kind == EventType::RoomMessage + { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + ".m.rule.message" => { + if pdu.kind == EventType::RoomMessage { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + ".m.rule.encrypted" => { + if pdu.kind == EventType::RoomEncrypted { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + break; + } + } + _ => {} + } + } + + Ok(()) +} + +async fn send_notice( + unread: UInt, + pushers: &[Pusher], + tweaks: Vec, + event: &PduEvent, + db: &Database, +) -> Result<()> { + let (http, _emails): (Vec<&Pusher>, _) = pushers + .iter() + .partition(|pusher| pusher.kind == Some(PusherKind::Http)); + + // TODO: + // Two problems with this + // 1. if "event_id_only" is the only format kind it seems we should never add more info + // 2. can pusher/devices have conflicting formats + let mut outgoing = FuturesUnordered::new(); + for pusher in http { + let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly); + let url = if let Some(url) = pusher.data.url.as_ref() { + url + } else { + error!("Http Pusher must have URL specified."); + continue; + }; + + let mut device = Device::new(pusher.app_id.clone(), pusher.pushkey.clone()); + let mut data_minus_url = pusher.data.clone(); + // The url must be stripped off according to spec + data_minus_url.url = None; + device.data = Some(data_minus_url); + + // Tweaks are only added if the format is NOT event_id_only + if !event_id_only { + device.tweaks = tweaks.clone(); + } + + let devices = vec![device]; + let mut notifi = Notice { + devices, + prio: NotificationPriority::Low, + event_id: Some(event.event_id.clone()), + room_id: Some(event.room_id.clone()), + counts: NotificationCounts::new(unread, uint!(0)), + ..Default::default() + }; + + notifi.counts = NotificationCounts::new(unread, uint!(0)); + + if event.kind == EventType::RoomEncrypted + || tweaks + .iter() + .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_))) + { + notifi.prio = NotificationPriority::High + } + + if !event_id_only { + notifi.sender = Some(event.sender.clone()); + notifi.event_type = Some(event.kind.clone()); + notifi.content = serde_json::value::to_raw_value(&event.content).ok(); + + if event.kind == EventType::RoomMember { + notifi.user_is_target = event.state_key.as_deref() == Some(event.sender.as_str()); + } + + let user_name = db.users.displayname(&event.sender)?; + notifi.sender_display_name = user_name; + let room_name = db + .rooms + .room_state_get(&event.room_id, &EventType::RoomName, "")? + .map(|(_, pdu)| match pdu.content.get("name") { + Some(serde_json::Value::String(s)) => Some(s.to_string()), + _ => None, + }) + .flatten(); + notifi.room_name = room_name; + + outgoing.push(send_helper(notifi, url, &db.globals)); + continue; + } + + outgoing.push(send_helper(notifi, url, &db.globals)); + } + + loop { + match outgoing.next().await { + Some(Ok(_)) => continue, + Some(Err(_)) => return Err(Error::BadServerResponse("Server failed to respond")), + None => break, + } + } + // TODO: email + // for email in emails {} + + Ok(()) +} + +/// Internal Notifications struct to avoid liftime errors when batching +/// the futures. +#[derive(Debug, Default)] +struct Notice { + event_id: Option, + room_id: Option, + event_type: Option, + sender: Option, + sender_display_name: Option, + room_name: Option, + room_alias: Option, + user_is_target: bool, + prio: NotificationPriority, + content: Option>, + counts: NotificationCounts, + devices: Vec, +} + +async fn send_helper(notice: Notice, url: &str, globals: &Globals) -> Result<()> { + let Notice { + event_id, + room_id, + event_type, + sender, + sender_display_name, + room_alias, + room_name, + user_is_target, + prio, + counts, + content, + devices, + } = notice; + let mut outgoing = Notification::new(&devices); + outgoing.event_id = event_id.as_ref(); + outgoing.room_id = room_id.as_ref(); + outgoing.event_type = event_type.as_ref(); + outgoing.sender = sender.as_ref(); + outgoing.sender_display_name = sender_display_name.as_deref(); + outgoing.room_alias = room_alias.as_ref(); + outgoing.room_name = room_name.as_deref(); + outgoing.user_is_target = user_is_target; + outgoing.prio = prio; + outgoing.counts = counts; + outgoing.content = content; + send_request( + globals, + &url, + send_event_notification::Request::new(outgoing), + ) + .await?; + Ok(()) +} diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 6ee4f19..f225206 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -2,7 +2,7 @@ mod edus; pub use edus::RoomEdus; -use crate::{pdu::PduBuilder, utils, Error, PduEvent, Result}; +use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result}; use log::error; use regex::Regex; use ring::digest; @@ -20,7 +20,7 @@ use ruma::{ EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use sled::IVec; -use state_res::{event_auth, Error as StateError, Requester, StateEvent, StateMap, StateStore}; +use state_res::{event_auth, StateMap}; use std::{ collections::{BTreeMap, HashMap}, @@ -67,44 +67,6 @@ pub struct Rooms { pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + Short, PduId = Count (without roomid) } -impl StateStore for Rooms { - fn get_event( - &self, - room_id: &RoomId, - event_id: &EventId, - ) -> state_res::Result> { - let pid = self - .get_pdu_id(event_id) - .map_err(StateError::custom)? - .ok_or_else(|| { - StateError::NotFound(format!( - "PDU via room_id and event_id not found in the db: {}", - event_id.as_str() - )) - })?; - - serde_json::from_slice( - &self - .pduid_pdu - .get(pid) - .map_err(StateError::custom)? - .ok_or_else(|| StateError::NotFound("PDU via pduid not found in db.".into()))?, - ) - .map_err(Into::into) - .and_then(|pdu: StateEvent| { - // conduit's PDU's always contain a room_id but some - // of ruma's do not so this must be an Option - if pdu.room_id() == room_id { - Ok(Arc::new(pdu)) - } else { - Err(StateError::NotFound( - "Found PDU for incorrect room in db.".into(), - )) - } - }) - } -} - impl Rooms { /// Builds a StateMap by iterating over all keys that start /// with state_hash, this gives the full state for the given state_hash. @@ -113,7 +75,7 @@ impl Rooms { &self, room_id: &RoomId, state_hash: &StateHashId, - ) -> Result> { + ) -> Result> { self.stateid_pduid .scan_prefix(&state_hash) .values() @@ -142,7 +104,7 @@ impl Rooms { pdu, )) }) - .collect::>>() + .collect() } /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). @@ -209,7 +171,7 @@ impl Rooms { content: serde_json::Value, ) -> Result> { let auth_events = state_res::auth_types_for_event( - kind.clone(), + &kind, sender, state_key.map(|s| s.to_string()), content, @@ -217,7 +179,13 @@ impl Rooms { let mut events = StateMap::new(); for (event_type, state_key) in auth_events { - if let Some((_, pdu)) = self.room_state_get(room_id, &event_type, &state_key)? { + if let Some((_, pdu)) = self.room_state_get( + room_id, + &event_type, + &state_key + .as_deref() + .ok_or_else(|| Error::Conflict("Found a non state event in auth events"))?, + )? { events.insert((event_type, state_key), pdu); } } @@ -295,7 +263,10 @@ impl Rooms { /// Returns the full room state. #[tracing::instrument(skip(self))] - pub fn room_state_full(&self, room_id: &RoomId) -> Result> { + pub fn room_state_full( + &self, + room_id: &RoomId, + ) -> Result> { if let Some(current_state_hash) = self.current_state_hash(room_id)? { self.state_full(&room_id, ¤t_state_hash) } else { @@ -456,9 +427,7 @@ impl Rooms { mut pdu_json: CanonicalJsonObject, count: u64, pdu_id: IVec, - globals: &super::globals::Globals, - account_data: &super::account_data::AccountData, - admin: &super::admin::Admin, + db: &Database, ) -> Result<()> { // Make unsigned fields correct. This is not properly documented in the spec, but state // events need to have previous content in the unsigned field, so clients can easily @@ -492,7 +461,7 @@ impl Rooms { // Mark as read first so the sending client doesn't get a notification even if appending // fails self.edus - .private_read_set(&pdu.room_id, &pdu.sender, count, &globals)?; + .private_read_set(&pdu.room_id, &pdu.sender, count, &db.globals)?; self.pduid_pdu.insert( &pdu_id, @@ -503,6 +472,9 @@ impl Rooms { self.eventid_pduid .insert(pdu.event_id.as_bytes(), &*pdu_id)?; + // See if the event matches any known pushers + db.sending.send_push_pdu(&*pdu_id)?; + match pdu.kind { EventType::RoomRedaction => { if let Some(redact_id) = &pdu.redacts { @@ -527,8 +499,8 @@ impl Rooms { ) })?, &pdu.sender, - account_data, - globals, + &db.account_data, + &db.globals, )?; } } @@ -546,10 +518,10 @@ impl Rooms { self.tokenids.insert(key, &[])?; } - if body.starts_with(&format!("@conduit:{}: ", globals.server_name())) + if body.starts_with(&format!("@conduit:{}: ", db.globals.server_name())) && self .id_from_alias( - &format!("#admins:{}", globals.server_name()) + &format!("#admins:{}", db.globals.server_name()) .try_into() .expect("#admins:server_name is a valid room alias"), )? @@ -576,10 +548,11 @@ impl Rooms { ); match parsed_config { Ok(yaml) => { - admin.send(AdminCommand::RegisterAppservice(yaml)); + db.admin + .send(AdminCommand::RegisterAppservice(yaml)); } Err(e) => { - admin.send(AdminCommand::SendMessage( + db.admin.send(AdminCommand::SendMessage( message::MessageEventContent::text_plain( format!( "Could not parse appservice config: {}", @@ -590,7 +563,7 @@ impl Rooms { } } } else { - admin.send(AdminCommand::SendMessage( + db.admin.send(AdminCommand::SendMessage( message::MessageEventContent::text_plain( "Expected code block in command body.", ), @@ -598,10 +571,10 @@ impl Rooms { } } "list_appservices" => { - admin.send(AdminCommand::ListAppservices); + db.admin.send(AdminCommand::ListAppservices); } _ => { - admin.send(AdminCommand::SendMessage( + db.admin.send(AdminCommand::SendMessage( message::MessageEventContent::text_plain(format!( "Command: {}, Args: {:?}", command, args @@ -708,11 +681,7 @@ impl Rooms { pdu_builder: PduBuilder, sender: &UserId, room_id: &RoomId, - globals: &super::globals::Globals, - sending: &super::sending::Sending, - admin: &super::admin::Admin, - account_data: &super::account_data::AccountData, - appservice: &super::appservice::Appservice, + db: &Database, ) -> Result { let PduBuilder { event_type, @@ -795,7 +764,7 @@ impl Rooms { if !match event_type { EventType::RoomEncryption => { // Only allow encryption events if it's allowed in the config - globals.allow_encryption() + db.globals.allow_encryption() } EventType::RoomMember => { let prev_event = self @@ -803,23 +772,17 @@ impl Rooms { ErrorKind::Unknown, "Membership can't be the first event", ))?)? - .map(|pdu| pdu.convert_for_state_res()); + .map(Arc::new); event_auth::valid_membership_change( - // TODO this is a bit of a hack but not sure how to have a type - // declared in `state_res` crate easily convert to/from conduit::PduEvent - Requester { - prev_event_ids: prev_events.to_owned(), - room_id: &room_id, - content: &content, - state_key: Some(state_key.to_owned()), - sender: &sender, - }, + Some(state_key.as_str()), + sender, + content.clone(), prev_event, None, // TODO: third party invite &auth_events .iter() .map(|((ty, key), pdu)| { - Ok(((ty.clone(), key.clone()), pdu.convert_for_state_res())) + Ok(((ty.clone(), key.clone()), Arc::new(pdu.clone()))) }) .collect::>>()?, ) @@ -907,13 +870,13 @@ impl Rooms { // Add origin because synapse likes that (and it's required in the spec) pdu_json.insert( "origin".to_owned(), - to_canonical_value(globals.server_name()) + to_canonical_value(db.globals.server_name()) .expect("server name is a valid CanonicalJsonValue"), ); ruma::signatures::hash_and_sign_event( - globals.server_name().as_str(), - globals.keypair(), + db.globals.server_name().as_str(), + db.globals.keypair(), &mut pdu_json, &RoomVersionId::Version6, ) @@ -934,24 +897,16 @@ impl Rooms { // Increment the last index and use that // This is also the next_batch/since value - let count = globals.next_count()?; + let count = db.globals.next_count()?; let mut pdu_id = room_id.as_bytes().to_vec(); pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. - let statehashid = self.append_to_state(&pdu_id, &pdu, &globals)?; - - self.append_pdu( - &pdu, - pdu_json, - count, - pdu_id.clone().into(), - globals, - account_data, - admin, - )?; + let statehashid = self.append_to_state(&pdu_id, &pdu, &db.globals)?; + + self.append_pdu(&pdu, pdu_json, count, pdu_id.clone().into(), db)?; // We set the room state after inserting the pdu, so that we never have a moment in time // where events in the current room state do not exist @@ -960,31 +915,28 @@ impl Rooms { for server in self .room_servers(room_id) .filter_map(|r| r.ok()) - .filter(|server| &**server != globals.server_name()) + .filter(|server| &**server != db.globals.server_name()) { - sending.send_pdu(&server, &pdu_id)?; + db.sending.send_pdu(&server, &pdu_id)?; } - for appservice in appservice.iter_all().filter_map(|r| r.ok()) { + for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) { if let Some(namespaces) = appservice.1.get("namespaces") { let users = namespaces .get("users") .and_then(|users| users.as_sequence()) - .map_or_else( - Vec::new, - |users| { - users - .iter() - .map(|users| { - users - .get("regex") - .and_then(|regex| regex.as_str()) - .and_then(|regex| Regex::new(regex).ok()) - }) - .filter_map(|o| o) - .collect::>() - }, - ); + .map_or_else(Vec::new, |users| { + users + .iter() + .map(|users| { + users + .get("regex") + .and_then(|regex| regex.as_str()) + .and_then(|regex| Regex::new(regex).ok()) + }) + .filter_map(|o| o) + .collect::>() + }); let aliases = namespaces .get("aliases") .and_then(|users| users.get("regex")) @@ -1001,7 +953,7 @@ impl Rooms { .get("sender_localpart") .and_then(|string| string.as_str()) .and_then(|string| { - UserId::parse_with_server_name(string, globals.server_name()).ok() + UserId::parse_with_server_name(string, db.globals.server_name()).ok() }); #[allow(clippy::blocks_in_if_conditions)] if bridge_user_id.map_or(false, |bridge_user_id| { @@ -1023,7 +975,7 @@ impl Rooms { .filter_map(|r| r.ok()) .any(|member| users.iter().any(|regex| regex.is_match(member.as_str()))) { - sending.send_pdu_appservice(&appservice.0, &pdu_id)?; + db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; } } } diff --git a/src/database/sending.rs b/src/database/sending.rs index 1ae063f..ffb2c31 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -1,56 +1,66 @@ use std::{ collections::HashMap, convert::TryFrom, - fmt::Debug, + fmt::{Debug, Display, Formatter}, sync::Arc, time::{Duration, Instant, SystemTime}, }; -use crate::{appservice_server, server_server, utils, Error, PduEvent, Result}; +use crate::{appservice_server, server_server, utils, Database, Error, PduEvent, Result}; use federation::transactions::send_transaction_message; use log::{info, warn}; use ring::digest; use rocket::futures::stream::{FuturesUnordered, StreamExt}; use ruma::{ api::{appservice, federation, OutgoingRequest}, - ServerName, + events::{push_rules, EventType}, + uint, ServerName, UInt, }; use sled::IVec; -use tokio::select; -use tokio::sync::Semaphore; +use tokio::{select, sync::Semaphore}; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum OutgoingKind { + Appservice(Box), + Push(Vec), + Normal(Box), +} + +impl Display for OutgoingKind { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + OutgoingKind::Appservice(name) => f.write_str(name.as_str()), + OutgoingKind::Normal(name) => f.write_str(name.as_str()), + OutgoingKind::Push(_) => f.write_str("Push notification TODO"), + } + } +} #[derive(Clone)] pub struct Sending { /// The state for a given state hash. - pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+)ServerName + PduId - pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+)ServerName + PduId (pduid can be empty for reservation) + pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)ServerName / UserId + PduId + pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId (pduid can be empty for reservation) pub(super) maximum_requests: Arc, } impl Sending { - pub fn start_handler( - &self, - globals: &super::globals::Globals, - rooms: &super::rooms::Rooms, - appservice: &super::appservice::Appservice, - ) { + pub fn start_handler(&self, db: &Database) { let servernamepduids = self.servernamepduids.clone(); let servercurrentpdus = self.servercurrentpdus.clone(); + let db = db.clone(); let maximum_requests = self.maximum_requests.clone(); - let rooms = rooms.clone(); - let globals = globals.clone(); - let appservice = appservice.clone(); tokio::spawn(async move { let mut futures = FuturesUnordered::new(); // Retry requests we could not finish yet - let mut current_transactions = HashMap::<(Box, bool), Vec>::new(); + let mut current_transactions = HashMap::>::new(); - for (key, server, pdu, is_appservice) in servercurrentpdus + for (key, (kind, pdu)) in servercurrentpdus .iter() .filter_map(|r| r.ok()) - .filter_map(|(key, _)| Self::parse_servercurrentpdus(key).ok()) + .filter_map(|(key, _)| Some((key, Self::parse_servercurrentpdus(key).ok()?))) { if pdu.is_empty() { // Remove old reservation key @@ -58,9 +68,7 @@ impl Sending { continue; } - let entry = current_transactions - .entry((server, is_appservice)) - .or_insert_with(Vec::new); + let entry = current_transactions.entry(kind).or_insert_with(Vec::new); if entry.len() > 30 { warn!("Dropping some current pdus because too many were queued. This should not happen."); @@ -71,42 +79,56 @@ impl Sending { entry.push(pdu); } - for ((server, is_appservice), pdus) in current_transactions { + for (kind, pdus) in current_transactions { // Create new reservation - let mut prefix = if is_appservice { - b"+".to_vec() - } else { - Vec::new() + let mut prefix = match &kind { + OutgoingKind::Appservice(server) => { + let mut p = b"+".to_vec(); + p.extend_from_slice(server.as_bytes()); + p + } + OutgoingKind::Push(id) => { + let mut p = b"$".to_vec(); + p.extend_from_slice(&id); + p + } + OutgoingKind::Normal(server) => { + let mut p = vec![]; + p.extend_from_slice(server.as_bytes()); + p + } }; - prefix.extend_from_slice(server.as_bytes()); prefix.push(0xff); servercurrentpdus.insert(prefix, &[]).unwrap(); - futures.push(Self::handle_event( - server, - is_appservice, - pdus, - &globals, - &rooms, - &appservice, - &maximum_requests, - )); + futures.push(Self::handle_event(kind, pdus, &db, &maximum_requests)); } - let mut last_failed_try: HashMap, (u32, Instant)> = HashMap::new(); + let mut last_failed_try: HashMap = HashMap::new(); let mut subscriber = servernamepduids.watch_prefix(b""); loop { select! { Some(response) = futures.next() => { match response { - Ok((server, is_appservice)) => { - let mut prefix = if is_appservice { - b"+".to_vec() - } else { - Vec::new() + Ok(outgoing_kind) => { + let mut prefix = match &outgoing_kind { + OutgoingKind::Appservice(server) => { + let mut p = b"+".to_vec(); + p.extend_from_slice(server.as_bytes()); + p + } + OutgoingKind::Push(id) => { + let mut p = b"$".to_vec(); + p.extend_from_slice(&id); + p + }, + OutgoingKind::Normal(server) => { + let mut p = vec![]; + p.extend_from_slice(server.as_bytes()); + p + }, }; - prefix.extend_from_slice(server.as_bytes()); prefix.push(0xff); for key in servercurrentpdus @@ -139,23 +161,42 @@ impl Sending { servernamepduids.remove(¤t_key).unwrap(); } - futures.push(Self::handle_event(server, is_appservice, new_pdus, &globals, &rooms, &appservice, &maximum_requests)); + futures.push( + Self::handle_event( + outgoing_kind.clone(), + new_pdus, + &db, + &maximum_requests, + ) + ); } else { servercurrentpdus.remove(&prefix).unwrap(); // servercurrentpdus with the prefix should be empty now } } - Err((server, is_appservice, e)) => { - info!("Couldn't send transaction to {}\n{}", server, e); - let mut prefix = if is_appservice { - b"+".to_vec() - } else { - Vec::new() + Err((outgoing_kind, e)) => { + info!("Couldn't send transaction to {}\n{}", outgoing_kind, e); + let mut prefix = match &outgoing_kind { + OutgoingKind::Appservice(serv) => { + let mut p = b"+".to_vec(); + p.extend_from_slice(serv.as_bytes()); + p + }, + OutgoingKind::Push(id) => { + let mut p = b"$".to_vec(); + p.extend_from_slice(&id); + p + }, + OutgoingKind::Normal(serv) => { + let mut p = vec![]; + p.extend_from_slice(serv.as_bytes()); + p + }, }; - prefix.extend_from_slice(server.as_bytes()); + prefix.push(0xff); - last_failed_try.insert(server.clone(), match last_failed_try.get(&server) { + last_failed_try.insert(outgoing_kind.clone(), match last_failed_try.get(&outgoing_kind) { Some(last_failed) => { (last_failed.0+1, Instant::now()) }, @@ -164,57 +205,74 @@ impl Sending { } }); servercurrentpdus.remove(&prefix).unwrap(); + } - }; + } }, Some(event) = &mut subscriber => { if let sled::Event::Insert { key, .. } = event { let servernamepduid = key.clone(); let mut parts = servernamepduid.splitn(2, |&b| b == 0xff); - if let Some((server, is_appservice, pdu_id)) = utils::string_from_bytes( + let exponential_backoff = |(tries, instant): &(u32, Instant)| { + // Fail if a request has failed recently (exponential backoff) + let mut min_elapsed_duration = Duration::from_secs(60) * (*tries) * (*tries); + if min_elapsed_duration > Duration::from_secs(60*60*24) { + min_elapsed_duration = Duration::from_secs(60*60*24); + } + + instant.elapsed() < min_elapsed_duration + }; + if let Some((outgoing_kind, pdu_id)) = utils::string_from_bytes( parts .next() .expect("splitn will always return 1 or more elements"), ) - .map_err(|_| Error::bad_database("ServerName in servernamepduid bytes are invalid.")) - .map(|server_str| { + .map_err(|_| Error::bad_database("[Utf8] ServerName in servernamepduid bytes are invalid.")) + .and_then(|ident_str| { // Appservices start with a plus - if server_str.starts_with('+') { - (server_str[1..].to_owned(), true) + Ok(if ident_str.starts_with('+') { + OutgoingKind::Appservice( + Box::::try_from(&ident_str[1..]) + .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))? + ) + } else if ident_str.starts_with('$') { + OutgoingKind::Push(ident_str[1..].as_bytes().to_vec()) } else { - (server_str, false) - } + OutgoingKind::Normal( + Box::::try_from(ident_str) + .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))? + ) + }) }) - .and_then(|(server_str, is_appservice)| Box::::try_from(server_str) - .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid.")).map(|s| (s, is_appservice))) - .ok() - .and_then(|(server, is_appservice)| parts + .and_then(|outgoing_kind| parts .next() .ok_or_else(|| Error::bad_database("Invalid servernamepduid in db.")) - .ok() - .map(|pdu_id| (server, is_appservice, pdu_id)) + .map(|pdu_id| (outgoing_kind, pdu_id)) ) - .filter(|(server, is_appservice, _)| { - #[allow(clippy::blocks_in_if_conditions)] - if last_failed_try.get(server).map_or(false, |(tries, instant)| { - // Fail if a request has failed recently (exponential backoff) - let mut min_elapsed_duration = Duration::from_secs(60) * *tries * *tries; - if min_elapsed_duration > Duration::from_secs(60*60*24) { - min_elapsed_duration = Duration::from_secs(60*60*24); - } - - instant.elapsed() < min_elapsed_duration - }) { + .ok() + .filter(|(outgoing_kind, _)| { + if last_failed_try.get(outgoing_kind).map_or(false, exponential_backoff) { return false; } - let mut prefix = if *is_appservice { - b"+".to_vec() - } else { - Vec::new() + let mut prefix = match outgoing_kind { + OutgoingKind::Appservice(serv) => { + let mut p = b"+".to_vec(); + p.extend_from_slice(serv.as_bytes()); + p + }, + OutgoingKind::Push(id) => { + let mut p = b"$".to_vec(); + p.extend_from_slice(&id); + p + }, + OutgoingKind::Normal(serv) => { + let mut p = vec![]; + p.extend_from_slice(serv.as_bytes()); + p + }, }; - prefix.extend_from_slice(server.as_bytes()); prefix.push(0xff); servercurrentpdus @@ -225,15 +283,38 @@ impl Sending { servercurrentpdus.insert(&key, &[]).unwrap(); servernamepduids.remove(&key).unwrap(); - futures.push(Self::handle_event(server, is_appservice, vec![pdu_id.into()], &globals, &rooms, &appservice, &maximum_requests)); + futures.push( + Self::handle_event( + outgoing_kind, + vec![pdu_id.into()], + &db, + &maximum_requests + ) + ); } } - } + }, } } }); } + pub fn send_push_pdu(&self, pdu_id: &[u8]) -> Result<()> { + // Make sure we don't cause utf8 errors when parsing to a String... + let pduid = String::from_utf8_lossy(pdu_id).as_bytes().to_vec(); + + // these are valid ServerName chars + // (byte.is_ascii_alphanumeric() || byte == b'-' || byte == b'.') + let mut key = b"$".to_vec(); + // keep each pdu push unique + key.extend_from_slice(pduid.as_slice()); + key.push(0xff); + key.extend_from_slice(pdu_id); + self.servernamepduids.insert(key, b"")?; + + Ok(()) + } + #[tracing::instrument(skip(self))] pub fn send_pdu(&self, server: &ServerName, pdu_id: &[u8]) -> Result<()> { let mut key = server.as_bytes().to_vec(); @@ -263,117 +344,197 @@ impl Sending { hash.as_ref().to_owned() } - #[tracing::instrument(skip(globals, rooms, appservice))] + #[tracing::instrument(skip(db))] async fn handle_event( - server: Box, - is_appservice: bool, + kind: OutgoingKind, pdu_ids: Vec, - globals: &super::globals::Globals, - rooms: &super::rooms::Rooms, - appservice: &super::appservice::Appservice, + db: &Database, maximum_requests: &Semaphore, - ) -> std::result::Result<(Box, bool), (Box, bool, Error)> { - if is_appservice { - let pdu_jsons = pdu_ids - .iter() - .map(|pdu_id| { - Ok::<_, (Box, Error)>( - rooms - .get_pdu_from_id(pdu_id) - .map_err(|e| (server.clone(), e))? - .ok_or_else(|| { - ( - server.clone(), - Error::bad_database( - "Event in servernamepduids not found in db.", - ), - ) - })? - .to_any_event(), - ) - }) - .filter_map(|r| r.ok()) - .collect::>(); - - let permit = maximum_requests.acquire().await; - let response = appservice_server::send_request( - &globals, - appservice - .get_registration(server.as_str()) - .unwrap() - .unwrap(), // TODO: handle error - appservice::event::push_events::v1::Request { - events: &pdu_jsons, - txn_id: &base64::encode_config( - Self::calculate_hash(&pdu_ids), - base64::URL_SAFE_NO_PAD, - ), - }, - ) - .await - .map(|_response| (server.clone(), is_appservice)) - .map_err(|e| (server, is_appservice, e)); + ) -> std::result::Result { + match kind { + OutgoingKind::Appservice(server) => { + let pdu_jsons = pdu_ids + .iter() + .map(|pdu_id| { + Ok::<_, (Box, Error)>( + db.rooms + .get_pdu_from_id(pdu_id) + .map_err(|e| (server.clone(), e))? + .ok_or_else(|| { + ( + server.clone(), + Error::bad_database( + "[Appservice] Event in servernamepduids not found in ", + ), + ) + })? + .to_any_event(), + ) + }) + .filter_map(|r| r.ok()) + .collect::>(); + appservice_server::send_request( + &db.globals, + db.appservice + .get_registration(server.as_str()) + .unwrap() + .unwrap(), // TODO: handle error + appservice::event::push_events::v1::Request { + events: &pdu_jsons, + txn_id: &utils::random_string(16), + }, + ) + .await + .map(|_response| OutgoingKind::Appservice(server.clone())) + .map_err(|e| (OutgoingKind::Appservice(server.clone()), e)) + } + OutgoingKind::Push(id) => { + let pdus = pdu_ids + .iter() + .map(|pdu_id| { + Ok::<_, (Vec, Error)>( + db.rooms + .get_pdu_from_id(pdu_id) + .map_err(|e| (id.clone(), e))? + .ok_or_else(|| { + ( + id.clone(), + Error::bad_database( + "[Push] Event in servernamepduids not found in db.", + ), + ) + })?, + ) + }) + .filter_map(|r| r.ok()) + .collect::>(); + + for pdu in &pdus { + // Redacted events are not notification targets (we don't send push for them) + if pdu.unsigned.get("redacted_because").is_some() { + continue; + } - drop(permit); + // Skip events that came from the admin room + if db + .rooms + .room_aliases(&pdu.room_id) + .any(|alias| match alias { + Ok(a) => a.as_str().starts_with("#admins:"), + _ => false, + }) + || pdu.sender.as_str().starts_with("@conduit:") + { + continue; + } - response - } else { - let pdu_jsons = pdu_ids - .iter() - .map(|pdu_id| { - Ok::<_, (Box, Error)>( - // TODO: check room version and remove event_id if needed - serde_json::from_str( - PduEvent::convert_to_outgoing_federation_event( - rooms - .get_pdu_json_from_id(pdu_id) - .map_err(|e| (server.clone(), e))? - .ok_or_else(|| { - ( - server.clone(), - Error::bad_database( - "Event in servernamepduids not found in db.", - ), - ) - })?, - ) - .json() - .get(), - ) - .expect("Raw<..> is always valid"), - ) - }) - .filter_map(|r| r.ok()) - .collect::>(); - - let permit = maximum_requests.acquire().await; - let response = server_server::send_request( - &globals, - server.clone(), - send_transaction_message::v1::Request { - origin: globals.server_name(), - pdus: &pdu_jsons, - edus: &[], - origin_server_ts: SystemTime::now(), - transaction_id: &base64::encode_config( - Self::calculate_hash(&pdu_ids), - base64::URL_SAFE_NO_PAD, - ), - }, - ) - .await - .map(|_response| (server.clone(), is_appservice)) - .map_err(|e| (server, is_appservice, e)); + for user in db.rooms.room_members(&pdu.room_id) { + let user = user.map_err(|e| (OutgoingKind::Push(id.clone()), e))?; + + // Don't notify the user of their own events + if user == pdu.sender { + continue; + } + + let pushers = db + .pusher + .get_pusher(&user) + .map_err(|e| (OutgoingKind::Push(id.clone()), e))?; + + let rules_for_user = db + .account_data + .get::(None, &user, EventType::PushRules) + .map_err(|e| (OutgoingKind::Push(id.clone()), e))? + .map(|ev| ev.content.global) + .unwrap_or_else(|| crate::push_rules::default_pushrules(&user)); + + let unread: UInt = if let Some(last_read) = db + .rooms + .edus + .private_read_get(&pdu.room_id, &user) + .map_err(|e| (OutgoingKind::Push(id.clone()), e))? + { + (db.rooms + .pdus_since(&user, &pdu.room_id, last_read) + .map_err(|e| (OutgoingKind::Push(id.clone()), e))? + .filter_map(|pdu| pdu.ok()) // Filter out buggy events + .filter(|(_, pdu)| { + matches!( + pdu.kind.clone(), + EventType::RoomMessage | EventType::RoomEncrypted + ) + }) + .count() as u32) + .into() + } else { + // Just return zero unread messages + uint!(0) + }; - drop(permit); + crate::database::pusher::send_push_notice( + &user, + unread, + &pushers, + rules_for_user, + pdu, + db, + ) + .await + .map_err(|e| (OutgoingKind::Push(id.clone()), e))?; + } + } - response + Ok(OutgoingKind::Push(id)) + } + OutgoingKind::Normal(server) => { + let pdu_jsons = pdu_ids + .iter() + .map(|pdu_id| { + Ok::<_, (OutgoingKind, Error)>( + // TODO: check room version and remove event_id if needed + serde_json::from_str( + PduEvent::convert_to_outgoing_federation_event( + db.rooms + .get_pdu_json_from_id(pdu_id) + .map_err(|e| (OutgoingKind::Normal(server.clone()), e))? + .ok_or_else(|| { + ( + OutgoingKind::Normal(server.clone()), + Error::bad_database( + "[Normal] Event in servernamepduids not found in db.", + ), + ) + })?, + ) + .json() + .get(), + ) + .expect("Raw<..> is always valid"), + ) + }) + .filter_map(|r| r.ok()) + .collect::>(); + + server_server::send_request( + &db.globals, + &*server, + send_transaction_message::v1::Request { + origin: db.globals.server_name(), + pdus: &pdu_jsons, + edus: &[], + origin_server_ts: SystemTime::now(), + transaction_id: &utils::random_string(16), + }, + ) + .await + .map(|_response| OutgoingKind::Normal(server.clone())) + .map_err(|e| (OutgoingKind::Normal(server.clone()), e)) + } } } - fn parse_servercurrentpdus(key: IVec) -> Result<(IVec, Box, IVec, bool)> { - let key2 = key.clone(); - let mut parts = key2.splitn(2, |&b| b == 0xff); + fn parse_servercurrentpdus(key: IVec) -> Result<(OutgoingKind, IVec)> { + let mut parts = key.splitn(2, |&b| b == 0xff); let server = parts.next().expect("splitn always returns one element"); let pdu = parts .next() @@ -384,27 +545,33 @@ impl Sending { })?; // Appservices start with a plus - let (server, is_appservice) = if server.starts_with('+') { - (&server[1..], true) + Ok::<_, Error>(if server.starts_with('+') { + ( + OutgoingKind::Appservice(Box::::try_from(server).map_err(|_| { + Error::bad_database("Invalid server string in server_currenttransaction") + })?), + IVec::from(pdu), + ) + } else if server.starts_with('$') { + ( + OutgoingKind::Push(server.as_bytes().to_vec()), + IVec::from(pdu), + ) } else { - (&*server, false) - }; - - Ok::<_, Error>(( - key, - Box::::try_from(server).map_err(|_| { - Error::bad_database("Invalid server string in server_currenttransaction") - })?, - IVec::from(pdu), - is_appservice, - )) + ( + OutgoingKind::Normal(Box::::try_from(server).map_err(|_| { + Error::bad_database("Invalid server string in server_currenttransaction") + })?), + IVec::from(pdu), + ) + }) } #[tracing::instrument(skip(self, globals))] pub async fn send_federation_request( &self, globals: &crate::database::globals::Globals, - destination: Box, + destination: &ServerName, request: T, ) -> Result where diff --git a/src/lib.rs b/src/lib.rs index 196626e..2d6ff72 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ #![allow(clippy::suspicious_else_formatting)] + pub mod appservice_server; pub mod client_server; mod database; diff --git a/src/main.rs b/src/main.rs index eff5552..dc1d9d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#![allow(clippy::suspicious_else_formatting)] #![warn(rust_2018_idioms)] #![allow(clippy::suspicious_else_formatting)] @@ -152,6 +153,7 @@ fn setup_rocket() -> (rocket::Rocket, Config) { client_server::get_key_changes_route, client_server::get_pushers_route, client_server::set_pushers_route, + // client_server::third_party_route, client_server::upgrade_room_route, server_server::get_server_version_route, server_server::get_server_keys_route, diff --git a/src/pdu.rs b/src/pdu.rs index bcf5ffb..eab1af7 100644 --- a/src/pdu.rs +++ b/src/pdu.rs @@ -9,14 +9,9 @@ use ruma::{ }; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{ - collections::BTreeMap, - convert::{TryFrom, TryInto}, - sync::Arc, - time::UNIX_EPOCH, -}; +use std::{cmp::Ordering, collections::BTreeMap, convert::TryFrom, time::UNIX_EPOCH}; -#[derive(Deserialize, Serialize, Debug)] +#[derive(Clone, Deserialize, Serialize, Debug)] pub struct PduEvent { pub event_id: EventId, pub room_id: RoomId, @@ -32,8 +27,8 @@ pub struct PduEvent { pub auth_events: Vec, #[serde(skip_serializing_if = "Option::is_none")] pub redacts: Option, - #[serde(default, skip_serializing_if = "serde_json::Map::is_empty")] - pub unsigned: serde_json::Map, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub unsigned: BTreeMap, pub hashes: EventHash, pub signatures: BTreeMap, BTreeMap>, } @@ -240,61 +235,85 @@ impl PduEvent { ) .expect("Raw::from_value always works") } + + pub fn from_id_val( + event_id: &EventId, + mut json: CanonicalJsonObject, + ) -> Result { + json.insert( + "event_id".to_string(), + ruma::serde::to_canonical_value(event_id).expect("event_id is a valid Value"), + ); + + serde_json::from_value(serde_json::to_value(json).expect("valid JSON")) + } } -impl From<&state_res::StateEvent> for PduEvent { - fn from(pdu: &state_res::StateEvent) -> Self { - Self { - event_id: pdu.event_id(), - room_id: pdu.room_id().clone(), - sender: pdu.sender().clone(), - origin_server_ts: (pdu - .origin_server_ts() - .duration_since(UNIX_EPOCH) - .expect("time is valid") - .as_millis() as u64) - .try_into() - .expect("time is valid"), - kind: pdu.kind(), - content: pdu.content().clone(), - state_key: Some(pdu.state_key()), - prev_events: pdu.prev_event_ids(), - depth: *pdu.depth(), - auth_events: pdu.auth_events(), - redacts: pdu.redacts().cloned(), - unsigned: pdu.unsigned().clone().into_iter().collect(), - hashes: pdu.hashes().clone(), - signatures: pdu.signatures(), - } +impl state_res::Event for PduEvent { + fn event_id(&self) -> &EventId { + &self.event_id + } + + fn room_id(&self) -> &RoomId { + &self.room_id + } + + fn sender(&self) -> &UserId { + &self.sender + } + fn kind(&self) -> EventType { + self.kind.clone() + } + + fn content(&self) -> serde_json::Value { + self.content.clone() + } + fn origin_server_ts(&self) -> std::time::SystemTime { + UNIX_EPOCH + std::time::Duration::from_millis(self.origin_server_ts.into()) + } + + fn state_key(&self) -> Option { + self.state_key.clone() + } + fn prev_events(&self) -> Vec { + self.prev_events.to_vec() + } + fn depth(&self) -> &UInt { + &self.depth + } + fn auth_events(&self) -> Vec { + self.auth_events.to_vec() + } + fn redacts(&self) -> Option<&EventId> { + self.redacts.as_ref() + } + fn hashes(&self) -> &EventHash { + &self.hashes + } + fn signatures(&self) -> BTreeMap, BTreeMap> { + self.signatures.clone() + } + fn unsigned(&self) -> &BTreeMap { + &self.unsigned } } -impl PduEvent { - pub fn convert_for_state_res(&self) -> Arc { - Arc::new( - // For consistency of eventId (just in case) we use the one - // generated by conduit for everything. - state_res::StateEvent::from_id_value( - self.event_id.clone(), - json!({ - "event_id": self.event_id, - "room_id": self.room_id, - "sender": self.sender, - "origin_server_ts": self.origin_server_ts, - "type": self.kind, - "content": self.content, - "state_key": self.state_key, - "prev_events": self.prev_events, - "depth": self.depth, - "auth_events": self.auth_events, - "redacts": self.redacts, - "unsigned": self.unsigned, - "hashes": self.hashes, - "signatures": self.signatures, - }), - ) - .expect("all conduit PDUs are state events"), - ) +// These impl's allow us to dedup state snapshots when resolving state +// for incoming events (federation/send/{txn}). +impl Eq for PduEvent {} +impl PartialEq for PduEvent { + fn eq(&self, other: &Self) -> bool { + self.event_id == other.event_id + } +} +impl PartialOrd for PduEvent { + fn partial_cmp(&self, other: &Self) -> Option { + self.event_id.partial_cmp(&other.event_id) + } +} +impl Ord for PduEvent { + fn cmp(&self, other: &Self) -> Ordering { + self.event_id.cmp(&other.event_id) } } @@ -328,7 +347,7 @@ pub struct PduBuilder { #[serde(rename = "type")] pub event_type: EventType, pub content: serde_json::Value, - pub unsigned: Option>, + pub unsigned: Option>, pub state_key: Option, pub redacts: Option, } diff --git a/src/ruma_wrapper.rs b/src/ruma_wrapper.rs index 188d1b6..8513221 100644 --- a/src/ruma_wrapper.rs +++ b/src/ruma_wrapper.rs @@ -3,15 +3,11 @@ use ruma::{ identifiers::{DeviceId, UserId}, Outgoing, }; -use std::{ - convert::{TryInto}, - ops::Deref, -}; +use std::{convert::TryInto, ops::Deref}; #[cfg(feature = "conduit_bin")] use { crate::utils, - ruma::api::{AuthScheme, OutgoingRequest}, log::{debug, warn}, rocket::{ data::{ @@ -24,8 +20,9 @@ use { tokio::io::AsyncReadExt, Request, State, }, + ruma::api::{AuthScheme, OutgoingRequest}, + std::convert::TryFrom, std::io::Cursor, - std::convert::TryFrom, }; /// This struct converts rocket requests into ruma structs by converting them into http requests @@ -41,10 +38,7 @@ pub struct Ruma { #[cfg(feature = "conduit_bin")] impl<'a, T: Outgoing + OutgoingRequest> FromTransformedData<'a> for Ruma where - ::Incoming: TryFrom>> + std::fmt::Debug, - <::Incoming as std::convert::TryFrom< - http::request::Request>, - >>::Error: std::fmt::Debug, + T::Incoming: IncomingRequest, { type Error = (); type Owned = Data; @@ -152,7 +146,7 @@ where let http_request = http_request.body(body.clone()).unwrap(); debug!("{:?}", http_request); - match ::Incoming::try_from(http_request) { + match ::try_from_http_request(http_request) { Ok(t) => Success(Ruma { body: t, sender_user, @@ -165,7 +159,7 @@ where }), Err(e) => { warn!("{:?}", e); - Failure((Status::raw(583), ())) + Failure((Status::BadRequest, ())) } } }) diff --git a/src/server_server.rs b/src/server_server.rs index b9b8e31..0038c33 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1,5 +1,4 @@ use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma}; -use get_profile_information::v1::ProfileField; use http::header::{HeaderValue, AUTHORIZATION, HOST}; use log::{info, warn}; use regex::Regex; @@ -13,7 +12,7 @@ use ruma::{ VerifyKey, }, event::get_missing_events, - query::get_profile_information, + query::get_profile_information::{self, v1::ProfileField}, transactions::send_transaction_message, }, OutgoingRequest, @@ -37,7 +36,7 @@ use { #[tracing::instrument(skip(globals))] pub async fn send_request( globals: &crate::database::globals::Globals, - destination: Box, + destination: &ServerName, request: T, ) -> Result where @@ -51,18 +50,18 @@ where .actual_destination_cache .read() .unwrap() - .get(&destination) + .get(destination) .cloned(); let (actual_destination, host) = if let Some(result) = maybe_result { result } else { - let result = find_actual_destination(globals, &destination).await; + let result = find_actual_destination(globals, destination).await; globals .actual_destination_cache .write() .unwrap() - .insert(destination.clone(), result.clone()); + .insert(Box::::from(destination), result.clone()); result }; @@ -583,15 +582,8 @@ pub async fn send_transaction_message_route<'a>( let next_room_state = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; - db.rooms.append_pdu( - &pdu, - value, - count, - pdu_id.clone().into(), - &db.globals, - &db.account_data, - &db.admin, - )?; + db.rooms + .append_pdu(&pdu, value, count, pdu_id.clone().into(), &db)?; db.rooms.set_room_state(&room_id, &next_room_state)?;