Browse Source

Merge branch 'sendtxn-start' into 'DevinR528/conduit-sendtxn'

Draft: More work on /federation/send/<txnid>

See merge request famedly/conduit!10
merge-requests/10/merge
Devin Ragotzy 5 years ago
parent
commit
8d95c59f5d
  1. 215
      Cargo.lock
  2. 4
      Cargo.toml
  3. 3
      src/client_server/media.rs
  4. 2
      src/client_server/membership.rs
  5. 115
      src/database/rooms.rs
  6. 2
      src/pdu.rs
  7. 741
      src/server_server.rs

215
Cargo.lock generated

@ -21,15 +21,6 @@ version = "1.2.0" @@ -21,15 +21,6 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "arrayref"
version = "0.3.6"
@ -182,19 +173,6 @@ version = "1.0.0" @@ -182,19 +173,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits",
"time 0.1.44",
"winapi 0.3.9",
]
[[package]]
name = "color_quant"
version = "1.1.0"
@ -254,7 +232,7 @@ version = "0.15.0-dev" @@ -254,7 +232,7 @@ version = "0.15.0-dev"
source = "git+https://github.com/SergioBenitez/cookie-rs.git?rev=1c3ca83#1c3ca838543b60a4448d279dc4b903cc7a2bc22a"
dependencies = [
"percent-encoding",
"time 0.2.23",
"time",
"version_check",
]
@ -575,19 +553,6 @@ dependencies = [ @@ -575,19 +553,6 @@ dependencies = [
"byteorder",
]
[[package]]
name = "generator"
version = "0.6.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cdc09201b2e8ca1b19290cf7e65de2246b8e91fb6874279722189c4de7b94dc"
dependencies = [
"cc",
"libc",
"log",
"rustc_version",
"winapi 0.3.9",
]
[[package]]
name = "getrandom"
version = "0.1.15"
@ -596,7 +561,7 @@ checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6" @@ -596,7 +561,7 @@ checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6"
dependencies = [
"cfg-if 0.1.10",
"libc",
"wasi 0.9.0+wasi-snapshot-preview1",
"wasi",
]
[[package]]
@ -886,9 +851,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" @@ -886,9 +851,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.80"
version = "0.2.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614"
checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb"
[[package]]
name = "linked-hash-map"
@ -914,19 +879,6 @@ dependencies = [ @@ -914,19 +879,6 @@ dependencies = [
"cfg-if 0.1.10",
]
[[package]]
name = "loom"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed"
dependencies = [
"cfg-if 0.1.10",
"generator",
"scoped-tls",
"serde",
"serde_json",
]
[[package]]
name = "lru-cache"
version = "0.1.2"
@ -948,15 +900,6 @@ version = "0.1.0" @@ -948,15 +900,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "matchers"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
dependencies = [
"regex-automata",
]
[[package]]
name = "matches"
version = "0.1.8"
@ -1468,31 +1411,6 @@ dependencies = [ @@ -1468,31 +1411,6 @@ dependencies = [
"syn",
]
[[package]]
name = "regex"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
dependencies = [
"byteorder",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189"
[[package]]
name = "remove_dir_all"
version = "0.5.3"
@ -1585,7 +1503,7 @@ dependencies = [ @@ -1585,7 +1503,7 @@ dependencies = [
"rocket_http",
"serde",
"state",
"time 0.2.23",
"time",
"tokio",
"ubyte",
"version_check",
@ -1622,7 +1540,7 @@ dependencies = [ @@ -1622,7 +1540,7 @@ dependencies = [
"ref-cast",
"smallvec",
"state",
"time 0.2.23",
"time",
"tokio",
"tokio-rustls",
"uncased",
@ -1633,7 +1551,7 @@ dependencies = [ @@ -1633,7 +1551,7 @@ dependencies = [
[[package]]
name = "ruma"
version = "0.0.1"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"assign",
"js_int",
@ -1651,7 +1569,7 @@ dependencies = [ @@ -1651,7 +1569,7 @@ dependencies = [
[[package]]
name = "ruma-api"
version = "0.17.0-alpha.1"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"http",
"percent-encoding",
@ -1666,7 +1584,7 @@ dependencies = [ @@ -1666,7 +1584,7 @@ dependencies = [
[[package]]
name = "ruma-api-macros"
version = "0.17.0-alpha.1"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"proc-macro-crate",
"proc-macro2",
@ -1677,7 +1595,7 @@ dependencies = [ @@ -1677,7 +1595,7 @@ dependencies = [
[[package]]
name = "ruma-appservice-api"
version = "0.2.0-alpha.1"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"ruma-api",
"ruma-common",
@ -1691,7 +1609,7 @@ dependencies = [ @@ -1691,7 +1609,7 @@ dependencies = [
[[package]]
name = "ruma-client-api"
version = "0.10.0-alpha.1"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"assign",
"http",
@ -1710,7 +1628,7 @@ dependencies = [ @@ -1710,7 +1628,7 @@ dependencies = [
[[package]]
name = "ruma-common"
version = "0.2.0"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"js_int",
"maplit",
@ -1723,7 +1641,7 @@ dependencies = [ @@ -1723,7 +1641,7 @@ dependencies = [
[[package]]
name = "ruma-events"
version = "0.22.0-alpha.1"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"js_int",
"ruma-common",
@ -1737,7 +1655,7 @@ dependencies = [ @@ -1737,7 +1655,7 @@ dependencies = [
[[package]]
name = "ruma-events-macros"
version = "0.22.0-alpha.1"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"proc-macro-crate",
"proc-macro2",
@ -1748,7 +1666,7 @@ dependencies = [ @@ -1748,7 +1666,7 @@ dependencies = [
[[package]]
name = "ruma-federation-api"
version = "0.0.3"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"js_int",
"ruma-api",
@ -1763,7 +1681,7 @@ dependencies = [ @@ -1763,7 +1681,7 @@ dependencies = [
[[package]]
name = "ruma-identifiers"
version = "0.17.4"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"paste",
"rand",
@ -1777,7 +1695,7 @@ dependencies = [ @@ -1777,7 +1695,7 @@ dependencies = [
[[package]]
name = "ruma-identifiers-macros"
version = "0.17.4"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"proc-macro2",
"quote",
@ -1788,7 +1706,7 @@ dependencies = [ @@ -1788,7 +1706,7 @@ dependencies = [
[[package]]
name = "ruma-identifiers-validation"
version = "0.1.1"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"serde",
]
@ -1796,7 +1714,7 @@ dependencies = [ @@ -1796,7 +1714,7 @@ dependencies = [
[[package]]
name = "ruma-serde"
version = "0.2.3"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"form_urlencoded",
"itoa",
@ -1809,7 +1727,7 @@ dependencies = [ @@ -1809,7 +1727,7 @@ dependencies = [
[[package]]
name = "ruma-serde-macros"
version = "0.2.0"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"proc-macro-crate",
"proc-macro2",
@ -1820,7 +1738,7 @@ dependencies = [ @@ -1820,7 +1738,7 @@ dependencies = [
[[package]]
name = "ruma-signatures"
version = "0.6.0-dev.1"
source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237"
source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef"
dependencies = [
"base64 0.12.3",
"ring",
@ -1948,18 +1866,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" @@ -1948,18 +1866,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.117"
version = "1.0.118"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a"
checksum = "06c64263859d87aa2eb554587e2d23183398d617427327cf2b3d0ed8c69e4800"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.117"
version = "1.0.118"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e"
checksum = "c84d3526699cd55261af4b941e4e725444df67aa4f9e6a3564f18030d12672df"
dependencies = [
"proc-macro2",
"quote",
@ -1995,16 +1913,6 @@ version = "0.6.0" @@ -1995,16 +1913,6 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
[[package]]
name = "sharded-slab"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b4921be914e16899a80adefb821f8ddb7974e3f1250223575a44ed994882127"
dependencies = [
"lazy_static",
"loom",
]
[[package]]
name = "signal-hook-registry"
version = "1.2.2"
@ -2078,17 +1986,15 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" @@ -2078,17 +1986,15 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483"
[[package]]
name = "state-res"
version = "0.1.0"
source = "git+https://github.com/ruma/state-res?branch=timo-spec-comp#99214e6fa6b9843b0d9e1f6ef0698d7fdb234fb2"
source = "git+https://github.com/ruma/state-res?branch=conflict#e4ba824806e7b780c23ca8120b57c5a7e4ab787d"
dependencies = [
"itertools",
"js_int",
"maplit",
"ruma",
"serde",
"serde_json",
"thiserror",
"tracing",
"tracing-subscriber",
]
[[package]]
@ -2206,26 +2112,6 @@ dependencies = [ @@ -2206,26 +2112,6 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
dependencies = [
"lazy_static",
]
[[package]]
name = "time"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi 0.3.9",
]
[[package]]
name = "time"
version = "0.2.23"
@ -2407,49 +2293,6 @@ dependencies = [ @@ -2407,49 +2293,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1fa8f0c8f4c594e4fc9debc1990deab13238077271ba84dd853d54902ee3401"
dependencies = [
"ansi_term",
"chrono",
"lazy_static",
"matchers",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
name = "trust-dns-proto"
version = "0.19.6"
@ -2599,12 +2442,6 @@ version = "0.9.0+wasi-snapshot-preview1" @@ -2599,12 +2442,6 @@ version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasm-bindgen"
version = "0.2.69"

4
Cargo.toml

@ -18,13 +18,13 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "1f1f44f33 @@ -18,13 +18,13 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "1f1f44f33
#rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] }
# Used for matrix spec type definitions and helpers
ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "e8882fe8142d7b55ed4c8ccc6150946945f9e237" }
ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "ee814aa84934530d76f5e4b275d739805b49bdef" }
# ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "unstable-join" }
# ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] }
# Used when doing state resolution
# state-res = { git = "https://github.com/timokoesters/state-res", branch = "spec-comp", features = ["unstable-pre-spec"] }
state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec", "gen-eventid"] }
state-res = { git = "https://github.com/ruma/state-res", branch = "conflict", features = ["unstable-pre-spec", "gen-eventid"] }
# state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] }
# Used for long polling and federation sender, should be the same as rocket::tokio

3
src/client_server/media.rs

@ -45,7 +45,8 @@ pub async fn create_content_route( @@ -45,7 +45,8 @@ pub async fn create_content_route(
db.flush().await?;
Ok(create_content::Response { content_uri: mxc }.into())
// TODO: blurhash is the other field not sure if that is important
Ok(create_content::Response::new(mxc).into())
}
#[cfg_attr(

2
src/client_server/membership.rs

@ -667,7 +667,7 @@ async fn join_room_by_id_helper( @@ -667,7 +667,7 @@ async fn join_room_by_id_helper(
// this is a `state_res::StateEvent` that holds a `ruma::Pdu`
let pdu = event_map
.get(ev_id)
.expect("Found event_id in sorted events that is not in resolved state");
.expect("found event_id in sorted events that is not in resolved state");
// We do not rebuild the PDU in this case only insert to DB
let count = db.globals.next_count()?;

115
src/database/rooms.rs

@ -36,16 +36,6 @@ use super::admin::AdminCommand; @@ -36,16 +36,6 @@ use super::admin::AdminCommand;
/// hashing the entire state.
pub type StateHashId = IVec;
/// An enum that represents the two valid states when searching
/// for an events "parent".
///
/// An events parent is any event we are aware of that is part of
/// the events `prev_events` array.
pub(crate) enum ClosestParent {
Append,
Insert(u64),
}
#[derive(Clone)]
pub struct Rooms {
pub edus: edus::RoomEdus,
@ -145,6 +135,29 @@ impl Rooms { @@ -145,6 +135,29 @@ impl Rooms {
.collect::<Result<StateMap<_>>>()
}
/// Returns all state entries we know of from a specific event_id onward.
/// The PDU's are ordered most recent to least recent.
///
/// This is used to resolve a forward extremity. We have the other servers
/// arm now we need ours.
/// TODO: Probably return Vec<PrevEvents<PduEvent>>
pub fn state_from(&self, room_id: &RoomId, event_id: &EventId) -> Result<Vec<PduEvent>> {
let mut last_ids = self.get_pdu_leaves(room_id)?;
let mut collected = vec![];
while let Some(find_id) = last_ids.pop() {
if event_id == &find_id {
break;
}
if let Some(pdu) = self.get_pdu(&find_id)? {
last_ids.extend(pdu.prev_events.to_vec());
collected.push(pdu);
}
}
Ok(collected)
}
/// Returns all state entries for this type.
pub fn state_type(
&self,
@ -205,7 +218,7 @@ impl Rooms { @@ -205,7 +218,7 @@ impl Rooms {
})
}
/// Returns the last state hash key added to the db.
/// Returns the state hash key for the given pdu.
pub fn pdu_state_hash(&self, pdu_id: &[u8]) -> Result<Option<StateHashId>> {
Ok(self.pduid_statehash.get(pdu_id)?)
}
@ -263,6 +276,36 @@ impl Rooms { @@ -263,6 +276,36 @@ impl Rooms {
.is_some())
}
/// Force the creation of a new StateHash and insert it into the db. This also associates
/// the given `pdu` with the new StateHash.
///
/// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot.
pub fn force_state_with_pdu(
&self,
new_pdu_id: &[u8],
room_id: &RoomId,
state: HashMap<(EventType, String), Vec<u8>>,
) -> Result<()> {
let state_hash =
self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::<Vec<_>>())?;
let mut prefix = state_hash.to_vec();
prefix.push(0xff);
for ((event_type, state_key), pdu_id) in state {
let mut state_id = prefix.clone();
state_id.extend_from_slice(&event_type.as_ref().as_bytes());
state_id.push(0xff);
state_id.extend_from_slice(&state_key.as_bytes());
self.stateid_pduid.insert(state_id, pdu_id.to_vec())?;
}
self.pduid_statehash.insert(new_pdu_id, &*state_hash)?;
self.roomid_statehash
.insert(room_id.as_bytes(), &*state_hash)?;
Ok(())
}
/// Force the creation of a new StateHash and insert it into the db.
///
/// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot.
@ -281,7 +324,7 @@ impl Rooms { @@ -281,7 +324,7 @@ impl Rooms {
state_id.extend_from_slice(&event_type.as_ref().as_bytes());
state_id.push(0xff);
state_id.extend_from_slice(&state_key.as_bytes());
self.stateid_pduid.insert(state_id, pdu_id)?;
self.stateid_pduid.insert(state_id, pdu_id.to_vec())?;
}
self.roomid_statehash
@ -411,54 +454,6 @@ impl Rooms { @@ -411,54 +454,6 @@ impl Rooms {
}
}
/// Recursively search for a PDU from our DB that is also in the
/// `prev_events` field of the incoming PDU.
///
/// First we check if the last PDU inserted to the given room is a parent
/// if not we recursively check older `prev_events` to insert the incoming
/// event after.
pub(crate) fn get_latest_pduid_before(
&self,
room: &RoomId,
incoming_prev_ids: &[EventId],
their_state: &BTreeMap<EventId, Arc<StateEvent>>,
) -> Result<Option<ClosestParent>> {
match self.pduid_pdu.scan_prefix(room.as_bytes()).last() {
Some(Ok(val))
if incoming_prev_ids.contains(
&serde_json::from_slice::<PduEvent>(&val.1)
.map_err(|_| {
Error::bad_database("last DB entry contains invalid PDU bytes")
})?
.event_id,
) =>
{
Ok(Some(ClosestParent::Append))
}
_ => {
let mut prev_ids = incoming_prev_ids.to_vec();
while let Some(id) = prev_ids.pop() {
match self.get_pdu_id(&id)? {
Some(pdu_id) => {
return Ok(Some(ClosestParent::Insert(self.pdu_count(&pdu_id)?)));
}
None => {
prev_ids.extend(their_state.get(&id).map_or(
Err(Error::BadServerResponse(
"Failed to find previous event for PDU in state",
)),
// `prev_event_ids` will return an empty Vec instead of failing
// so it works perfect for our use here
|pdu| Ok(pdu.prev_event_ids()),
)?);
}
}
}
Ok(None)
}
}
}
/// Returns the leaf pdus of a room.
pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result<Vec<EventId>> {
let mut prefix = room_id.as_bytes().to_vec();

2
src/pdu.rs

@ -17,7 +17,7 @@ use std::{ @@ -17,7 +17,7 @@ use std::{
time::UNIX_EPOCH,
};
#[derive(Deserialize, Serialize, Debug)]
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct PduEvent {
pub event_id: EventId,
pub room_id: RoomId,

741
src/server_server.rs

@ -1,7 +1,4 @@ @@ -1,7 +1,4 @@
use crate::{
client_server, database::rooms::ClosestParent, utils, ConduitResult, Database, Error, PduEvent,
Result, Ruma,
};
use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma};
use get_profile_information::v1::ProfileField;
use http::header::{HeaderValue, AUTHORIZATION, HOST};
use log::{error, warn};
@ -21,10 +18,10 @@ use ruma::{ @@ -21,10 +18,10 @@ use ruma::{
OutgoingRequest,
},
directory::{IncomingFilter, IncomingRoomNetwork},
EventId, RoomId, ServerName, ServerSigningKeyId, UserId,
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,
};
use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet},
convert::TryFrom,
fmt::Debug,
sync::Arc,
@ -390,6 +387,34 @@ pub async fn get_public_rooms_route( @@ -390,6 +387,34 @@ pub async fn get_public_rooms_route(
.into())
}
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum PrevEvents<T> {
Sequential(T),
Fork(Vec<T>),
}
impl<T> IntoIterator for PrevEvents<T> {
type Item = T;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
match self {
Self::Sequential(item) => vec![item].into_iter(),
Self::Fork(list) => list.into_iter(),
}
}
}
impl<T: Clone> PrevEvents<T> {
pub fn new(id: &[T]) -> Self {
match id {
[] => panic!("All events must have previous event"),
[single_id] => Self::Sequential(single_id.clone()),
rest => Self::Fork(rest.to_vec()),
}
}
}
#[cfg_attr(
feature = "conduit_bin",
put("/_matrix/federation/v1/send/<_>", data = "<body>")
@ -445,16 +470,23 @@ pub async fn send_transaction_message_route<'a>( @@ -445,16 +470,23 @@ pub async fn send_transaction_message_route<'a>(
// discard the event whereas the Client Server API's /send/{eventType} endpoint
// would return a M_BAD_JSON error.
let mut resolved_map = BTreeMap::new();
for pdu in &body.pdus {
// Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped.
let mut pdu_idx = 0;
// This is `for pdu in &body.pdus` but we need to do fancy continue/break so we need loop labels
'outer: loop {
if pdu_idx == body.pdus.len() {
break 'outer;
}
let pdu = &body.pdus[pdu_idx];
pdu_idx += 1;
// Ruma/PduEvent/StateEvent satisfies - 1. Is a valid event, otherwise it is dropped.
// state-res checks signatures - 2. Passes signature checks, otherwise event is dropped.
// 3. Passes hash checks, otherwise it is redacted before being processed further.
// TODO: redact event if hashing fails
let (event_id, value) = crate::pdu::process_incoming_pdu(pdu);
let pdu = serde_json::from_value::<PduEvent>(
let mut pdu = serde_json::from_value::<PduEvent>(
serde_json::to_value(&value).expect("CanonicalJsonObj is a valid JsonValue"),
)
.expect("all ruma pdus are conduit pdus");
@ -466,128 +498,469 @@ pub async fn send_transaction_message_route<'a>( @@ -466,128 +498,469 @@ pub async fn send_transaction_message_route<'a>(
continue;
}
// If it is not a state event, we can skip state-res... maybe
if value.get("state_key").is_none() {
if !db.rooms.is_joined(&pdu.sender, room_id)? {
warn!("Sender is not joined {}", pdu.kind);
resolved_map.insert(event_id, Err("User is not in this room".into()));
continue;
}
let count = db.globals.next_count()?;
let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
// The events that must be resolved to catch up to the incoming event
let mut missing = vec![];
let mut seen = state_res::StateMap::new();
let mut seen_id = None;
let mut prev_ids = vec![PrevEvents::new(&pdu.prev_events)];
// This is `while let Some(event_id) = prev_ids.pop_front()` but with a fancy continue
// in the case of a failed request to the server that sent the event
'inner: loop {
match prev_ids.pop() {
Some(PrevEvents::Sequential(id)) => match db
.rooms
.pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())?
{
// We found a common ancestor
Some(state_hash) => {
seen_id = Some(id.clone());
seen = db.rooms.state_full(&state_hash)?;
if let Some(pdu) = db.rooms.get_pdu(&id)? {
if pdu.state_key.is_some() {
// This becomes the state after the common event
seen.insert(
(pdu.kind.clone(), pdu.state_key.clone().unwrap()),
pdu,
);
}
}
break 'inner;
}
// We need to fill in information about this event's `prev_events` (parents)
None => {
match send_request(
&db.globals,
body.body.origin.clone(),
ruma::api::federation::event::get_event::v1::Request::new(&id),
)
.await
{
Ok(res) => {
let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu);
let prev_pdu = serde_json::from_value::<PduEvent>(
serde_json::to_value(&val)
.expect("CanonicalJsonObj is a valid JsonValue"),
)
.expect("all ruma pdus are conduit pdus");
// TODO: do we need this
assert_eq!(room_id, &prev_pdu.room_id);
prev_ids.push(PrevEvents::new(&prev_pdu.prev_events));
missing.push(PrevEvents::Sequential(prev_pdu));
}
// We can't hard fail because there are some valid errors, just
// keep checking PDU's
//
// As an example a possible error
// {"errcode":"M_FORBIDDEN","error":"Host not in room."}
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));
// We have to give up on this PDU
continue 'outer;
}
};
}
},
Some(PrevEvents::Fork(ids)) => {
error!(
"prev_events > 1: {}",
serde_json::to_string_pretty(&pdu).unwrap()
);
// Don't kill our server with state-res
// TODO: set this at a reasonable level this is for debug/wip purposes
if ids.len() > 5 {
error!(
"prev_events > 1: {}",
serde_json::to_string_pretty(&pdu).unwrap()
);
resolved_map.insert(
event_id,
Err("Previous events are too large for state-res".into()),
);
continue 'outer;
}
db.rooms.append_to_state(&pdu_id, &pdu)?;
// We want this to stay unique incase the fork comes together?
let mut prev_fork_ids = BTreeSet::new();
let mut missing_fork = vec![];
for id in &ids {
match db
.rooms
.pdu_state_hash(&db.rooms.get_pdu_id(&id)?.unwrap_or_default())?
{
// We found a common ancestor
Some(state_hash) => {
seen_id = Some(id.clone());
seen = db.rooms.state_full(&state_hash)?;
if let Some(pdu) = db.rooms.get_pdu(&id)? {
if pdu.state_key.is_some() {
// This becomes the state after the common event
seen.insert(
(pdu.kind.clone(), pdu.state_key.clone().unwrap()),
pdu,
);
}
}
break 'inner;
}
None => match send_request(
&db.globals,
body.body.origin.clone(),
ruma::api::federation::event::get_event::v1::Request::new(&id),
)
.await
{
Ok(res) => {
let (_, val) = crate::pdu::process_incoming_pdu(&res.pdu);
let prev_pdu = serde_json::from_value::<PduEvent>(
serde_json::to_value(&val)
.expect("CanonicalJsonObj is a valid JsonValue"),
)
.expect("all ruma pdus are conduit pdus");
// TODO: do we need this
assert_eq!(room_id, &prev_pdu.room_id);
for id in &prev_pdu.prev_events {
prev_fork_ids.insert(id.clone());
}
missing_fork.push(prev_pdu);
}
// We can't hard fail because there are some valid errors, just
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));
// We have to give up on this PDU
continue 'outer;
}
},
}
}
prev_ids.push(PrevEvents::new(
&prev_fork_ids.into_iter().collect::<Vec<_>>(),
));
missing.push(PrevEvents::new(&missing_fork));
}
// All done finding missing events
None => {
break 'inner;
}
}
}
db.rooms.append_pdu(
&pdu,
&value,
count,
pdu_id.into(),
&db.globals,
&db.account_data,
&db.admin,
)?;
// We can treat this event as sequential and simply apply it against the current state of the room
// because we know that state
if missing.is_empty() {
// Back to the original incoming event
// If it is a non state event we still must add it and associate a statehash with the pdu_id
if value.get("state_key").is_none() {
// TODO: Some auth needs to be done for non state events
if !db.rooms.is_joined(&pdu.sender, room_id)? {
error!("Sender is not joined {}", pdu.kind);
resolved_map.insert(event_id, Err("Sender not found in room".into()));
continue 'outer;
}
resolved_map.insert(event_id, Ok::<(), String>(()));
continue;
append_state(&db, &pdu)?;
resolved_map.insert(event_id, Ok::<(), String>(()));
continue 'outer;
} else {
let incoming = pdu.convert_for_state_res();
match state_res::StateResolution::apply_event(
room_id,
&RoomVersionId::Version6,
incoming.clone(),
&seen
.iter()
.map(|(k, v)| (k.clone(), v.event_id.clone()))
.collect::<BTreeMap<_, _>>(),
Some(
seen.iter()
.map(|(_k, v)| (v.event_id.clone(), v.convert_for_state_res()))
.collect::<BTreeMap<_, _>>(),
), // TODO: make mut and keep around, this is all the auth events
&db.rooms,
) {
Ok(true) => {
append_state(&db, &pdu)?;
resolved_map.insert(event_id, Ok::<(), String>(()));
continue 'outer;
}
Ok(false) => {
resolved_map.insert(event_id, Err("Failed event auth".to_string()));
error!("Failed sequential event auth for incoming");
continue 'outer;
}
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));
error!("{}", err);
continue 'outer;
}
}
}
}
// We have a state event so we need info for state-res
let get_state_response = match send_request(
&db.globals,
body.body.origin.clone(),
ruma::api::federation::event::get_room_state::v1::Request {
room_id,
event_id: &event_id,
},
)
.await
{
Ok(res) => res,
// We can't hard fail because there are some valid errors, just
// keep checking PDU's
//
// As an example a possible error
// {"errcode":"M_FORBIDDEN","error":"Host not in room."}
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));
continue;
}
};
// Well, now we have to actually do a bunch of work :(
// The steps are as follows
// 1. Rebuild the sending servers forward extremity, ignoring our own fork
// a) iterate "oldest" -> most recent authenticating each event with the state after the previous
// b) build a `snapshot_map` containing the state after the event for each missing event (EventId -> StateMap<EventId>)
// 2. Build our side of the fork (TODO do we have to re-auth these, is state at an event relative to the server its from)
// 3. resolve the two states (our current with the state after the most recent missing event)
let their_current_state = get_state_response
.pdus
// Now build up state
let mut state_snapshot = seen
.iter()
.chain(get_state_response.auth_chain.iter()) // add auth events
.map(|pdu| {
let (event_id, json) = crate::pdu::process_incoming_pdu(pdu);
(
event_id.clone(),
Arc::new(
// When creating a StateEvent the event_id arg will be used
// over any found in the json and it will not use ruma::reference_hash
// to generate one
state_res::StateEvent::from_id_canon_obj(event_id, json)
.expect("valid pdu json"),
),
)
})
.map(|(k, v)| (k.clone(), v.event_id.clone()))
.collect::<BTreeMap<_, _>>();
let our_current_state = db.rooms.room_state_full(room_id)?;
// State resolution takes care of these checks
// TODO: So this is super memory inefficient we clone the state_snapshot every time
// we need it for state events and non
let mut snapshot_map = BTreeMap::new();
snapshot_map.insert(seen_id.clone().unwrap(), state_snapshot.clone());
let accum_event_map = seen
.iter()
.map(|(_, v)| (v.event_id.clone(), v.convert_for_state_res()))
.chain(
missing
.iter()
.cloned()
.flatten()
.map(|pdu| (pdu.event_id.clone(), pdu.convert_for_state_res())),
)
.collect::<BTreeMap<_, Arc<state_res::StateEvent>>>();
// 4. Passes authorization rules based on the event's auth events, otherwise it is rejected.
// 5. Passes authorization rules based on the state at the event, otherwise it is rejected.
for missing_pdu in missing.iter().rev() {
match missing_pdu {
PrevEvents::Sequential(missing_pdu) => {
// For state events
if missing_pdu.state_key.is_some() {
let missing_pdu = missing_pdu.convert_for_state_res();
match state_res::StateResolution::apply_event(
room_id,
&RoomVersionId::Version6,
missing_pdu.clone(),
&state_snapshot,
Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(true) => {
// TODO: do we need this
assert_eq!(room_id, missing_pdu.room_id());
// We can't add to DB yet since we don't know if it passes
// current room state
// append_state(&db, &PduEvent::from(&*missing_pdu))?;
// Only after the state is recorded in the DB can we update the state_snapshot
// This will update the state snapshot so it is correct next loop through
state_snapshot.insert(
(missing_pdu.kind(), missing_pdu.state_key()),
missing_pdu.event_id(),
);
// Keep track of the state after for resolution
snapshot_map.insert(missing_pdu.event_id(), state_snapshot.clone());
}
Ok(false) => {
error!(
"apply missing: {}",
serde_json::to_string_pretty(&*missing_pdu).unwrap()
);
continue;
}
Err(e) => {
error!("{}", e);
// This is not a fatal error but we do eventually need to handle
// events failing that are not the incoming events
// TODO: what to do when missing events fail (not incoming events)
}
}
} else {
// TODO: Some auth needs to be done for non state events
if !db
.rooms
.is_joined(&missing_pdu.sender, &missing_pdu.room_id)?
{
error!("fork Sender is not joined {}", missing_pdu.kind);
// TODO: we probably should not be getting events for different rooms
continue;
}
// TODO: 6. Passes authorization rules based on the current state of the room, otherwise it is "soft failed".
match state_res::StateResolution::resolve(
room_id,
&ruma::RoomVersionId::Version6,
&[
our_current_state
.iter()
.map(|((ev, sk), v)| ((ev.clone(), sk.to_owned()), v.event_id.clone()))
.collect::<BTreeMap<_, _>>(),
their_current_state
.iter()
.map(|(_id, v)| ((v.kind(), v.state_key()), v.event_id()))
.collect::<BTreeMap<_, _>>(),
],
Some(
our_current_state
.iter()
.map(|(_k, v)| (v.event_id.clone(), v.convert_for_state_res()))
.chain(
their_current_state
.iter()
.map(|(id, ev)| (id.clone(), ev.clone())),
)
.collect::<BTreeMap<_, _>>(),
),
&db.rooms,
) {
Ok(resolved) if resolved.values().any(|id| &event_id == id) => {
// If the event is older than the last event in pduid_pdu Tree then find the
// closest ancestor we know of and insert after the known ancestor by
// altering the known events pduid to = same roomID + same count bytes + 0x1
// pushing a single byte every time a simple append cannot be done.
match db.rooms.get_latest_pduid_before(
room_id,
&pdu.prev_events,
&their_current_state,
)? {
Some(ClosestParent::Append) => {
// TODO: a better way to signal non state events...
snapshot_map.insert(missing_pdu.event_id.clone(), state_snapshot.clone());
// Continue this inner for loop adding all the missing events
}
}
PrevEvents::Fork(pdus) => {
let mut state_sets = BTreeSet::new();
for pdu in pdus {
let mut state_at = state_snapshot.clone();
if pdu.state_key.is_some() {
state_at.insert(
(pdu.kind.clone(), pdu.state_key.clone().unwrap()),
pdu.event_id.clone(),
);
}
state_sets.insert(state_at);
}
if state_sets.len() <= 1 {
for missing_pdu in pdus.iter() {
// For state events
if missing_pdu.state_key.is_some() {
let missing_pdu = missing_pdu.convert_for_state_res();
match state_res::StateResolution::apply_event(
room_id,
&RoomVersionId::Version6,
missing_pdu.clone(),
&state_snapshot,
Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(true) => {
// TODO: do we need this
assert_eq!(room_id, missing_pdu.room_id());
state_snapshot.insert(
(missing_pdu.kind(), missing_pdu.state_key()),
missing_pdu.event_id(),
);
snapshot_map
.insert(missing_pdu.event_id(), state_snapshot.clone());
}
Ok(false) => {
error!(
"apply missing fork: {}",
serde_json::to_string_pretty(&*missing_pdu).unwrap()
);
continue;
}
Err(e) => {
error!("fork state-res: {}", e);
// TODO: what to do when missing events fail (not incoming events)
}
}
} else {
// TODO: Some auth needs to be done for non state events
if !db
.rooms
.is_joined(&missing_pdu.sender, &missing_pdu.room_id)?
{
error!("fork Sender is not joined {}", missing_pdu.kind);
// TODO: we probably should not be getting events for different rooms
continue;
}
snapshot_map
.insert(missing_pdu.event_id.clone(), state_snapshot.clone());
// Continue this inner for loop adding all the missing events
}
}
} else {
match state_res::StateResolution::resolve(
room_id,
&RoomVersionId::Version6,
&state_sets.into_iter().collect::<Vec<_>>(),
Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(resolved) => {
for id in resolved
.values()
.filter(|id| pdus.iter().any(|pduid| pduid.event_id == **id))
{
snapshot_map.insert(id.clone(), resolved.clone());
}
}
Err(err) => {
error!("{}", err);
}
}
}
}
}
}
// We know these events are valid for each state at the event
// we have already authed them
let known_fork = if let Some(id) = seen_id {
db.rooms.state_from(room_id, &id)?
} else {
vec![]
};
for pdu in known_fork.clone().into_iter().rev() {
if pdu.state_key.is_some() {
seen.insert((pdu.kind.clone(), pdu.state_key.clone().unwrap()), pdu);
}
}
let current_room_state = seen
.into_iter()
.map(|(k, v)| (k, v.event_id))
.collect::<BTreeMap<_, _>>();
let mut state_sets = BTreeSet::new();
state_sets.insert(current_room_state.clone());
// The first item is the most recent
if let Some(fork) = missing.first() {
for pdu in fork.clone().into_iter() {
if let Some(map) = snapshot_map.get(&pdu.event_id) {
state_sets.insert(map.clone());
}
}
}
let mut state_sets = state_sets.into_iter().collect::<Vec<_>>();
// If we have actual differences resolve
if state_sets.len() > 1 {
// Set the incoming event to have parents from both forks
// ours/theirs
pdu.prev_events = missing
.first()
.cloned()
.into_iter()
.flatten()
.map(|pdu| pdu.event_id)
.chain(known_fork.first().cloned().map(|pdu| pdu.event_id))
.collect();
// The incoming event is a child of both forks now
for set in &mut state_sets {
set.insert(
(pdu.kind.clone(), pdu.state_key.clone().unwrap()),
pdu.event_id.clone(),
);
}
// If we have holes or a fork I am less sure what can be guaranteed about our state?
// Or what must be done to fix holes and forks?
match state_res::StateResolution::resolve(
room_id,
&RoomVersionId::Version6,
&state_sets,
Some(accum_event_map.clone()), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(resolved) if resolved.values().any(|id| id == &event_id) => {
for resolved_missing in missing
.into_iter()
.rev() // We want the oldest pdu's first
.flatten()
.filter(|pdu| resolved.values().any(|res_id| res_id == &pdu.event_id))
{
let count = db.globals.next_count()?;
let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_pdu(
&pdu,
&value,
&resolved_missing,
&crate::utils::to_canonical_object(&resolved_missing)
.expect("PDU is valid canonical JSON"),
count,
pdu_id.into(),
&db.globals,
@ -595,18 +968,58 @@ pub async fn send_transaction_message_route<'a>( @@ -595,18 +968,58 @@ pub async fn send_transaction_message_route<'a>(
&db.admin,
)?;
}
Some(ClosestParent::Insert(old_count)) => {
let count = old_count;
let count = db.globals.next_count()?;
let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
// Since we know the state we force_state
// saving the incoming pdu's id with our new state
db.rooms.force_state_with_pdu(
&*pdu_id,
room_id,
resolved
.iter()
.map(|(k, v)| {
(
k.clone(),
serde_json::to_vec(
&db.rooms
.get_pdu(v)
.expect("db err")
.expect("we know of all the pdus"),
)
.expect("serde can serialize pdus"),
)
})
.collect(),
)?;
db.rooms.append_pdu(
&pdu,
&value,
count,
pdu_id.into(),
&db.globals,
&db.account_data,
&db.admin,
)?;
resolved_map.insert(event_id, Ok::<(), String>(()));
}
Ok(resolved) => {
for resolved_missing in missing
.into_iter()
.flatten()
.filter(|pdu| resolved.values().any(|res_id| res_id == &pdu.event_id))
{
let count = db.globals.next_count()?;
let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
// Create a new count that is after old_count but before
// the pdu appended after
pdu_id.push(1);
db.rooms.append_pdu(
&pdu,
&value,
&resolved_missing,
&crate::utils::to_canonical_object(&resolved_missing)
.expect("PDU is valid canonical JSON"),
count,
pdu_id.into(),
&db.globals,
@ -614,28 +1027,78 @@ pub async fn send_transaction_message_route<'a>( @@ -614,28 +1027,78 @@ pub async fn send_transaction_message_route<'a>(
&db.admin,
)?;
}
_ => {
error!("Not a sequential event or no parents found");
continue;
}
resolved_map.insert(event_id, Err("Failed event auth".into()));
error!(
"auth failed: {}",
serde_json::to_string_pretty(&pdu).unwrap()
);
}
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));
error!("{}", err);
}
}
// If we have iterated through the incoming missing events sequentially we know that
// the original incoming event is the youngest child and so can be simply authed and appended
// to the state
} else {
// If it is a non state event we still must add it and associate a statehash with the pdu_id
if pdu.state_key.is_none() {
// TODO: Some auth needs to be done for non state events
if !db.rooms.is_joined(&pdu.sender, room_id)? {
error!("Sender is not joined {}", pdu.kind);
resolved_map.insert(event_id, Err("Sender not found in room".into()));
continue 'outer;
}
append_state(&db, &pdu)?;
resolved_map.insert(event_id, Ok::<(), String>(()));
} else {
let incoming = pdu.convert_for_state_res();
match state_res::StateResolution::apply_event(
room_id,
&RoomVersionId::Version6,
incoming,
&current_room_state,
Some(accum_event_map), // TODO: make mut and check on Ok(..) ?
&db.rooms,
) {
Ok(true) => {
append_state(&db, &pdu)?;
resolved_map.insert(event_id, Ok::<(), String>(()));
}
Ok(false) => {
resolved_map.insert(event_id, Err("Failed event auth".to_string()));
error!("Failed sequential event auth for incoming");
}
Err(err) => {
resolved_map.insert(event_id, Err(err.to_string()));
error!("{}", err);
}
}
}
// If the eventId is not found in the resolved state auth has failed
Ok(_) => {
resolved_map.insert(
event_id,
Err("This event failed authentication, not found in resolved set".into()),
);
}
Err(e) => {
resolved_map.insert(event_id, Err(e.to_string()));
}
};
}
}
Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into())
Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into())
}
fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> {
let count = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_to_state(&pdu_id, pdu)?;
db.rooms.append_pdu(
pdu,
&utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
count,
pdu_id.clone().into(),
&db.globals,
&db.account_data,
&db.admin,
)
}
#[cfg_attr(

Loading…
Cancel
Save