Compare commits

...

1 Commits
next ... timo

Author SHA1 Message Date
Timo Kösters bd1172835e
TODO: commit message 4 years ago
  1. 62
      Cargo.lock
  2. 5
      Cargo.toml
  3. 161
      src/client_server/account.rs
  4. 225
      src/client_server/directory.rs
  5. 397
      src/client_server/membership.rs
  6. 92
      src/client_server/profile.rs
  7. 12
      src/client_server/redact.rs
  8. 177
      src/client_server/room.rs
  9. 10
      src/client_server/state.rs
  10. 18
      src/client_server/sync.rs
  11. 8
      src/database/admin.rs
  12. 30
      src/database/pusher.rs
  13. 227
      src/database/rooms.rs
  14. 10
      src/database/sending.rs
  15. 89
      src/pdu.rs
  16. 201
      src/server_server.rs

62
Cargo.lock generated

@ -254,6 +254,7 @@ dependencies = [
"opentelemetry-jaeger", "opentelemetry-jaeger",
"parking_lot", "parking_lot",
"rand 0.8.4", "rand 0.8.4",
"rayon",
"regex", "regex",
"reqwest", "reqwest",
"ring", "ring",
@ -1747,6 +1748,31 @@ dependencies = [
"rand_core 0.6.3", "rand_core 0.6.3",
] ]
[[package]]
name = "rayon"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90"
dependencies = [
"autocfg",
"crossbeam-deque",
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils 0.8.5",
"lazy_static",
"num_cpus",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.10" version = "0.2.10"
@ -1968,7 +1994,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma" name = "ruma"
version = "0.4.0" version = "0.4.0"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"assign", "assign",
"js_int", "js_int",
@ -1989,7 +2015,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-api" name = "ruma-api"
version = "0.18.3" version = "0.18.3"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"bytes", "bytes",
"http", "http",
@ -2005,7 +2031,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-api-macros" name = "ruma-api-macros"
version = "0.18.3" version = "0.18.3"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"proc-macro-crate", "proc-macro-crate",
"proc-macro2", "proc-macro2",
@ -2016,7 +2042,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-appservice-api" name = "ruma-appservice-api"
version = "0.4.0" version = "0.4.0"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"ruma-api", "ruma-api",
"ruma-common", "ruma-common",
@ -2030,7 +2056,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-client-api" name = "ruma-client-api"
version = "0.12.2" version = "0.12.2"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"assign", "assign",
"bytes", "bytes",
@ -2050,7 +2076,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-common" name = "ruma-common"
version = "0.6.0" version = "0.6.0"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"indexmap", "indexmap",
"js_int", "js_int",
@ -2065,7 +2091,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-events" name = "ruma-events"
version = "0.24.5" version = "0.24.5"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"indoc", "indoc",
"js_int", "js_int",
@ -2081,7 +2107,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-events-macros" name = "ruma-events-macros"
version = "0.24.5" version = "0.24.5"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"proc-macro-crate", "proc-macro-crate",
"proc-macro2", "proc-macro2",
@ -2092,7 +2118,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-federation-api" name = "ruma-federation-api"
version = "0.3.1" version = "0.3.1"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"js_int", "js_int",
"ruma-api", "ruma-api",
@ -2107,7 +2133,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-identifiers" name = "ruma-identifiers"
version = "0.20.0" version = "0.20.0"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"paste", "paste",
"rand 0.8.4", "rand 0.8.4",
@ -2121,7 +2147,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-identifiers-macros" name = "ruma-identifiers-macros"
version = "0.20.0" version = "0.20.0"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"quote", "quote",
"ruma-identifiers-validation", "ruma-identifiers-validation",
@ -2131,12 +2157,12 @@ dependencies = [
[[package]] [[package]]
name = "ruma-identifiers-validation" name = "ruma-identifiers-validation"
version = "0.5.0" version = "0.5.0"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
[[package]] [[package]]
name = "ruma-identity-service-api" name = "ruma-identity-service-api"
version = "0.3.0" version = "0.3.0"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"js_int", "js_int",
"ruma-api", "ruma-api",
@ -2149,7 +2175,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-push-gateway-api" name = "ruma-push-gateway-api"
version = "0.3.0" version = "0.3.0"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"js_int", "js_int",
"ruma-api", "ruma-api",
@ -2164,7 +2190,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-serde" name = "ruma-serde"
version = "0.5.0" version = "0.5.0"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"bytes", "bytes",
"form_urlencoded", "form_urlencoded",
@ -2178,7 +2204,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-serde-macros" name = "ruma-serde-macros"
version = "0.5.0" version = "0.5.0"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"proc-macro-crate", "proc-macro-crate",
"proc-macro2", "proc-macro2",
@ -2189,7 +2215,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-signatures" name = "ruma-signatures"
version = "0.9.0" version = "0.9.0"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"base64 0.13.0", "base64 0.13.0",
"ed25519-dalek", "ed25519-dalek",
@ -2206,7 +2232,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-state-res" name = "ruma-state-res"
version = "0.4.1" version = "0.4.1"
source = "git+https://github.com/ruma/ruma?rev=a6a1224652912a957b09f136ec5da2686be6e0e2#a6a1224652912a957b09f136ec5da2686be6e0e2" source = "git+https://github.com/timokoesters/ruma?rev=6e9823de284967a41100db0e3134319f4b6a0cfa#6e9823de284967a41100db0e3134319f4b6a0cfa"
dependencies = [ dependencies = [
"itertools 0.10.1", "itertools 0.10.1",
"js_int", "js_int",

5
Cargo.toml

@ -19,8 +19,8 @@ rocket = { version = "0.5.0-rc.1", features = ["tls"] } # Used to handle request
# Used for matrix spec type definitions and helpers # Used for matrix spec type definitions and helpers
#ruma = { version = "0.4.0", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } #ruma = { version = "0.4.0", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
ruma = { git = "https://github.com/ruma/ruma", rev = "a6a1224652912a957b09f136ec5da2686be6e0e2", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } #ruma = { git = "https://github.com/ruma/ruma", rev = "a6a1224652912a957b09f136ec5da2686be6e0e2", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
#ruma = { git = "https://github.com/timokoesters/ruma", rev = "50c1db7e0a3a21fc794b0cce3b64285a4c750c71", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } ruma = { git = "https://github.com/timokoesters/ruma", rev = "6e9823de284967a41100db0e3134319f4b6a0cfa", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
#ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } #ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
# Used for long polling and federation sender, should be the same as rocket::tokio # Used for long polling and federation sender, should be the same as rocket::tokio
@ -79,6 +79,7 @@ num_cpus = "1.13.0"
threadpool = "1.8.1" threadpool = "1.8.1"
heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true } heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true }
thread_local = "1.1.3" thread_local = "1.1.3"
rayon = "1.5.1"
[features] [features]
default = ["conduit_bin", "backend_sqlite"] default = ["conduit_bin", "backend_sqlite"]

161
src/client_server/account.rs

@ -28,6 +28,7 @@ use ruma::{
identifiers::RoomName, identifiers::RoomName,
push, RoomAliasId, RoomId, RoomVersionId, UserId, push, RoomAliasId, RoomId, RoomVersionId, UserId,
}; };
use serde_json::value::RawValue;
use tracing::info; use tracing::info;
use register::RegistrationKind; use register::RegistrationKind;
@ -279,7 +280,10 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomCreate, event_type: EventType::RoomCreate,
content: serde_json::to_value(content).expect("event is valid, we just created it"), content: RawValue::from_string(
serde_json::to_string(&content).expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -294,16 +298,19 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent { content: RawValue::from_string(
membership: member::MembershipState::Join, serde_json::to_string(&member::MemberEventContent {
displayname: None, membership: member::MembershipState::Join,
avatar_url: None, displayname: None,
is_direct: None, avatar_url: None,
third_party_invite: None, is_direct: None,
blurhash: None, third_party_invite: None,
reason: None, blurhash: None,
}) reason: None,
.expect("event is valid, we just created it"), })
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(conduit_user.to_string()), state_key: Some(conduit_user.to_string()),
redacts: None, redacts: None,
@ -322,13 +329,16 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomPowerLevels, event_type: EventType::RoomPowerLevels,
content: serde_json::to_value( content: RawValue::from_string(
ruma::events::room::power_levels::PowerLevelsEventContent { serde_json::to_string(
users, &ruma::events::room::power_levels::PowerLevelsEventContent {
..Default::default() users,
}, ..Default::default()
},
)
.expect("event is valid, we just created it"),
) )
.expect("event is valid, we just created it"), .expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -343,10 +353,13 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomJoinRules, event_type: EventType::RoomJoinRules,
content: serde_json::to_value(join_rules::JoinRulesEventContent::new( content: RawValue::from_string(
join_rules::JoinRule::Invite, serde_json::to_string(&join_rules::JoinRulesEventContent::new(
)) join_rules::JoinRule::Invite,
.expect("event is valid, we just created it"), ))
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -361,12 +374,13 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomHistoryVisibility, event_type: EventType::RoomHistoryVisibility,
content: serde_json::to_value( content: RawValue::from_string(
history_visibility::HistoryVisibilityEventContent::new( serde_json::to_string(&history_visibility::HistoryVisibilityEventContent::new(
history_visibility::HistoryVisibility::Shared, history_visibility::HistoryVisibility::Shared,
), ))
.expect("event is valid, we just created it"),
) )
.expect("event is valid, we just created it"), .expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -381,10 +395,13 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomGuestAccess, event_type: EventType::RoomGuestAccess,
content: serde_json::to_value(guest_access::GuestAccessEventContent::new( content: RawValue::from_string(
guest_access::GuestAccess::Forbidden, serde_json::to_string(&guest_access::GuestAccessEventContent::new(
)) guest_access::GuestAccess::Forbidden,
.expect("event is valid, we just created it"), ))
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -402,8 +419,11 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomName, event_type: EventType::RoomName,
content: serde_json::to_value(name::NameEventContent::new(Some(room_name))) content: RawValue::from_string(
.expect("event is valid, we just created it"), serde_json::to_string(&name::NameEventContent::new(Some(room_name)))
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -417,10 +437,13 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomTopic, event_type: EventType::RoomTopic,
content: serde_json::to_value(topic::TopicEventContent { content: RawValue::from_string(
topic: format!("Manage {}", db.globals.server_name()), serde_json::to_string(&topic::TopicEventContent {
}) topic: format!("Manage {}", db.globals.server_name()),
.expect("event is valid, we just created it"), })
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -439,11 +462,14 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomCanonicalAlias, event_type: EventType::RoomCanonicalAlias,
content: serde_json::to_value(canonical_alias::CanonicalAliasEventContent { content: RawValue::from_string(
alias: Some(alias.clone()), serde_json::to_string(&canonical_alias::CanonicalAliasEventContent {
alt_aliases: Vec::new(), alias: Some(alias.clone()),
}) alt_aliases: Vec::new(),
.expect("event is valid, we just created it"), })
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -460,16 +486,19 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent { content: RawValue::from_string(
membership: member::MembershipState::Invite, serde_json::to_string(&member::MemberEventContent {
displayname: None, membership: member::MembershipState::Invite,
avatar_url: None, displayname: None,
is_direct: None, avatar_url: None,
third_party_invite: None, is_direct: None,
blurhash: None, third_party_invite: None,
reason: None, blurhash: None,
}) reason: None,
.expect("event is valid, we just created it"), })
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(user_id.to_string()), state_key: Some(user_id.to_string()),
redacts: None, redacts: None,
@ -482,16 +511,19 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent { content: RawValue::from_string(
membership: member::MembershipState::Join, serde_json::to_string(&member::MemberEventContent {
displayname: Some(displayname), membership: member::MembershipState::Join,
avatar_url: None, displayname: Some(displayname),
is_direct: None, avatar_url: None,
third_party_invite: None, is_direct: None,
blurhash: None, third_party_invite: None,
reason: None, blurhash: None,
}) reason: None,
.expect("event is valid, we just created it"), })
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(user_id.to_string()), state_key: Some(user_id.to_string()),
redacts: None, redacts: None,
@ -506,11 +538,11 @@ pub async fn register_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMessage, event_type: EventType::RoomMessage,
content: serde_json::to_value(message::MessageEventContent::text_html( content: RawValue::from_string(serde_json::to_string(&message::MessageEventContent::text_html(
"## Thank you for trying out Conduit!\n\nConduit is currently in Beta. This means you can join and participate in most Matrix rooms, but not all features are supported and you might run into bugs from time to time.\n\nHelpful links:\n> Website: https://conduit.rs\n> Git and Documentation: https://gitlab.com/famedly/conduit\n> Report issues: https://gitlab.com/famedly/conduit/-/issues\n\nHere are some rooms you can join (by typing the command):\n\nConduit room (Ask questions and get notified on updates):\n`/join #conduit:fachschaften.org`\n\nConduit lounge (Off-topic, only Conduit users are allowed to join)\n`/join #conduit-lounge:conduit.rs`".to_owned(), "## Thank you for trying out Conduit!\n\nConduit is currently in Beta. This means you can join and participate in most Matrix rooms, but not all features are supported and you might run into bugs from time to time.\n\nHelpful links:\n> Website: https://conduit.rs\n> Git and Documentation: https://gitlab.com/famedly/conduit\n> Report issues: https://gitlab.com/famedly/conduit/-/issues\n\nHere are some rooms you can join (by typing the command):\n\nConduit room (Ask questions and get notified on updates):\n`/join #conduit:fachschaften.org`\n\nConduit lounge (Off-topic, only Conduit users are allowed to join)\n`/join #conduit-lounge:conduit.rs`".to_owned(),
"<h2>Thank you for trying out Conduit!</h2>\n<p>Conduit is currently in Beta. This means you can join and participate in most Matrix rooms, but not all features are supported and you might run into bugs from time to time.</p>\n<p>Helpful links:</p>\n<blockquote>\n<p>Website: https://conduit.rs<br>Git and Documentation: https://gitlab.com/famedly/conduit<br>Report issues: https://gitlab.com/famedly/conduit/-/issues</p>\n</blockquote>\n<p>Here are some rooms you can join (by typing the command):</p>\n<p>Conduit room (Ask questions and get notified on updates):<br><code>/join #conduit:fachschaften.org</code></p>\n<p>Conduit lounge (Off-topic, only Conduit users are allowed to join)<br><code>/join #conduit-lounge:conduit.rs</code></p>\n".to_owned(), "<h2>Thank you for trying out Conduit!</h2>\n<p>Conduit is currently in Beta. This means you can join and participate in most Matrix rooms, but not all features are supported and you might run into bugs from time to time.</p>\n<p>Helpful links:</p>\n<blockquote>\n<p>Website: https://conduit.rs<br>Git and Documentation: https://gitlab.com/famedly/conduit<br>Report issues: https://gitlab.com/famedly/conduit/-/issues</p>\n</blockquote>\n<p>Here are some rooms you can join (by typing the command):</p>\n<p>Conduit room (Ask questions and get notified on updates):<br><code>/join #conduit:fachschaften.org</code></p>\n<p>Conduit lounge (Off-topic, only Conduit users are allowed to join)<br><code>/join #conduit-lounge:conduit.rs</code></p>\n".to_owned(),
)) ))
.expect("event is valid, we just created it"), .expect("event is valid, we just created it")).expect("string is valid"),
unsigned: None, unsigned: None,
state_key: None, state_key: None,
redacts: None, redacts: None,
@ -721,7 +753,10 @@ pub async fn deactivate_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(event).expect("event is valid, we just created it"), content: RawValue::from_string(
serde_json::to_string(&event).expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(sender_user.to_string()), state_key: Some(sender_user.to_string()),
redacts: None, redacts: None,

225
src/client_server/directory.rs

@ -20,7 +20,6 @@ use ruma::{
room::{avatar, canonical_alias, guest_access, history_visibility, name, topic}, room::{avatar, canonical_alias, guest_access, history_visibility, name, topic},
EventType, EventType,
}, },
serde::Raw,
ServerName, UInt, ServerName, UInt,
}; };
use tracing::{info, warn}; use tracing::{info, warn};
@ -217,76 +216,68 @@ pub(crate) async fn get_public_rooms_filtered_helper(
} }
} }
let mut all_rooms = let mut all_rooms = db
db.rooms .rooms
.public_rooms() .public_rooms()
.map(|room_id| { .map(|room_id| {
let room_id = room_id?; let room_id = room_id?;
let chunk = PublicRoomsChunk { let chunk = PublicRoomsChunk {
aliases: Vec::new(), aliases: Vec::new(),
canonical_alias: db canonical_alias: db
.rooms .rooms
.room_state_get(&room_id, &EventType::RoomCanonicalAlias, "")? .room_state_get(&room_id, &EventType::RoomCanonicalAlias, "")?
.map_or(Ok::<_, Error>(None), |s| { .map_or(Ok::<_, Error>(None), |s| {
Ok(serde_json::from_value::< Ok(
Raw<canonical_alias::CanonicalAliasEventContent>, serde_json::from_str::<canonical_alias::CanonicalAliasEventContent>(
>(s.content.clone()) s.content.get(),
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| {
Error::bad_database("Invalid canonical alias event in database.")
})?
.alias)
})?,
name: db
.rooms
.room_state_get(&room_id, &EventType::RoomName, "")?
.map_or(Ok::<_, Error>(None), |s| {
Ok(serde_json::from_value::<Raw<name::NameEventContent>>(
s.content.clone(),
) )
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| { .map_err(|_| {
Error::bad_database("Invalid room name event in database.") Error::bad_database("Invalid canonical alias event in database.")
})? })?
.name) .alias,
})?, )
num_joined_members: db })?,
.rooms name: db
.room_joined_count(&room_id)? .rooms
.unwrap_or_else(|| { .room_state_get(&room_id, &EventType::RoomName, "")?
warn!("Room {} has no member count", room_id); .map_or(Ok::<_, Error>(None), |s| {
0 Ok(
}) serde_json::from_str::<name::NameEventContent>(s.content.get())
.try_into() .map_err(|_| {
.expect("user count should not be that big"), Error::bad_database("Invalid room name event in database.")
topic: db })?
.rooms .name,
.room_state_get(&room_id, &EventType::RoomTopic, "")? )
.map_or(Ok::<_, Error>(None), |s| { })?,
Ok(Some( num_joined_members: db
serde_json::from_value::<Raw<topic::TopicEventContent>>( .rooms
s.content.clone(), .room_joined_count(&room_id)?
) .unwrap_or_else(|| {
.expect("from_value::<Raw<..>> can never fail") warn!("Room {} has no member count", room_id);
.deserialize() 0
})
.try_into()
.expect("user count should not be that big"),
topic: db
.rooms
.room_state_get(&room_id, &EventType::RoomTopic, "")?
.map_or(Ok::<_, Error>(None), |s| {
Ok(Some(
serde_json::from_str::<topic::TopicEventContent>(s.content.get())
.map_err(|_| { .map_err(|_| {
Error::bad_database("Invalid room topic event in database.") Error::bad_database("Invalid room topic event in database.")
})? })?
.topic, .topic,
)) ))
})?, })?,
world_readable: db world_readable: db
.rooms .rooms
.room_state_get(&room_id, &EventType::RoomHistoryVisibility, "")? .room_state_get(&room_id, &EventType::RoomHistoryVisibility, "")?
.map_or(Ok::<_, Error>(false), |s| { .map_or(Ok::<_, Error>(false), |s| {
Ok(serde_json::from_value::< Ok(serde_json::from_str::<
Raw<history_visibility::HistoryVisibilityEventContent>, history_visibility::HistoryVisibilityEventContent,
>(s.content.clone()) >(s.content.get())
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| { .map_err(|_| {
Error::bad_database( Error::bad_database(
"Invalid room history visibility event in database.", "Invalid room history visibility event in database.",
@ -294,80 +285,74 @@ pub(crate) async fn get_public_rooms_filtered_helper(
})? })?
.history_visibility .history_visibility
== history_visibility::HistoryVisibility::WorldReadable) == history_visibility::HistoryVisibility::WorldReadable)
})?, })?,
guest_can_join: db guest_can_join: db
.rooms .rooms
.room_state_get(&room_id, &EventType::RoomGuestAccess, "")? .room_state_get(&room_id, &EventType::RoomGuestAccess, "")?
.map_or(Ok::<_, Error>(false), |s| { .map_or(Ok::<_, Error>(false), |s| {
Ok( Ok(
serde_json::from_value::<Raw<guest_access::GuestAccessEventContent>>( serde_json::from_str::<guest_access::GuestAccessEventContent>(
s.content.clone(), s.content.get(),
) )
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| { .map_err(|_| {
Error::bad_database("Invalid room guest access event in database.") Error::bad_database("Invalid room guest access event in database.")
})? })?
.guest_access .guest_access
== guest_access::GuestAccess::CanJoin, == guest_access::GuestAccess::CanJoin,
) )
})?, })?,
avatar_url: db avatar_url: db
.rooms .rooms
.room_state_get(&room_id, &EventType::RoomAvatar, "")? .room_state_get(&room_id, &EventType::RoomAvatar, "")?
.map(|s| { .map(|s| {
Ok::<_, Error>( Ok::<_, Error>(
serde_json::from_value::<Raw<avatar::AvatarEventContent>>( serde_json::from_str::<avatar::AvatarEventContent>(s.content.get())
s.content.clone(),
)
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| { .map_err(|_| {
Error::bad_database("Invalid room avatar event in database.") Error::bad_database("Invalid room avatar event in database.")
})? })?
.url, .url,
) )
}) })
.transpose()? .transpose()?
// url is now an Option<String> so we must flatten // url is now an Option<String> so we must flatten
.flatten(), .flatten(),
room_id, room_id,
}; };
Ok(chunk) Ok(chunk)
}) })
.filter_map(|r: Result<_>| r.ok()) // Filter out buggy rooms .filter_map(|r: Result<_>| r.ok()) // Filter out buggy rooms
.filter(|chunk| { .filter(|chunk| {
if let Some(query) = filter if let Some(query) = filter
.generic_search_term .generic_search_term
.as_ref() .as_ref()
.map(|q| q.to_lowercase()) .map(|q| q.to_lowercase())
{ {
if let Some(name) = &chunk.name { if let Some(name) = &chunk.name {
if name.as_str().to_lowercase().contains(&query) { if name.as_str().to_lowercase().contains(&query) {
return true; return true;
}
} }
}
if let Some(topic) = &chunk.topic { if let Some(topic) = &chunk.topic {
if topic.to_lowercase().contains(&query) { if topic.to_lowercase().contains(&query) {
return true; return true;
}
} }
}
if let Some(canonical_alias) = &chunk.canonical_alias { if let Some(canonical_alias) = &chunk.canonical_alias {
if canonical_alias.as_str().to_lowercase().contains(&query) { if canonical_alias.as_str().to_lowercase().contains(&query) {
return true; return true;
}
} }
false
} else {
// No search term
true
} }
})
// We need to collect all, so we can sort by member count false
.collect::<Vec<_>>(); } else {
// No search term
true
}
})
// We need to collect all, so we can sort by member count
.collect::<Vec<_>>();
all_rooms.sort_by(|l, r| r.num_joined_members.cmp(&l.num_joined_members)); all_rooms.sort_by(|l, r| r.num_joined_members.cmp(&l.num_joined_members));

397
src/client_server/membership.rs

@ -5,6 +5,7 @@ use crate::{
server_server, utils, ConduitResult, Database, Error, Result, Ruma, server_server, utils, ConduitResult, Database, Error, Result, Ruma,
}; };
use member::{MemberEventContent, MembershipState}; use member::{MemberEventContent, MembershipState};
use rayon::prelude::*;
use ruma::{ use ruma::{
api::{ api::{
client::{ client::{
@ -15,7 +16,10 @@ use ruma::{
unban_user, IncomingThirdPartySigned, unban_user, IncomingThirdPartySigned,
}, },
}, },
federation::{self, membership::create_invite}, federation::{
self,
membership::{create_invite, create_join_event},
},
}, },
events::{ events::{
pdu::Pdu, pdu::Pdu,
@ -26,13 +30,13 @@ use ruma::{
state_res::{self, RoomVersion}, state_res::{self, RoomVersion},
uint, EventId, RoomId, RoomVersionId, ServerName, UserId, uint, EventId, RoomId, RoomVersionId, ServerName, UserId,
}; };
use serde_json::value::RawValue;
use std::{ use std::{
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
convert::{TryFrom, TryInto}, convert::{TryFrom, TryInto},
sync::{Arc, RwLock}, sync::{Arc, RwLock},
time::{Duration, Instant},
}; };
use tracing::{debug, error, warn}; use tracing::{error, warn};
#[cfg(feature = "conduit_bin")] #[cfg(feature = "conduit_bin")]
use rocket::{get, post}; use rocket::{get, post};
@ -204,7 +208,7 @@ pub async fn kick_user_route(
) -> ConduitResult<kick_user::Response> { ) -> ConduitResult<kick_user::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let mut event = serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>( let mut event = serde_json::from_str::<Raw<ruma::events::room::member::MemberEventContent>>(
db.rooms db.rooms
.room_state_get( .room_state_get(
&body.room_id, &body.room_id,
@ -216,7 +220,7 @@ pub async fn kick_user_route(
"Cannot kick member that's not in the room.", "Cannot kick member that's not in the room.",
))? ))?
.content .content
.clone(), .get(),
) )
.expect("Raw::from_value always works") .expect("Raw::from_value always works")
.deserialize() .deserialize()
@ -238,7 +242,10 @@ pub async fn kick_user_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(event).expect("event is valid, we just created it"), content: RawValue::from_string(
serde_json::to_string(&event).expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(body.user_id.to_string()), state_key: Some(body.user_id.to_string()),
redacts: None, redacts: None,
@ -290,12 +297,9 @@ pub async fn ban_user_route(
reason: None, reason: None,
}), }),
|event| { |event| {
let mut event = serde_json::from_value::<Raw<member::MemberEventContent>>( let mut event =
event.content.clone(), serde_json::from_str::<member::MemberEventContent>(event.content.get())
) .map_err(|_| Error::bad_database("Invalid member event in database."))?;
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
event.membership = ruma::events::room::member::MembershipState::Ban; event.membership = ruma::events::room::member::MembershipState::Ban;
Ok(event) Ok(event)
}, },
@ -314,7 +318,10 @@ pub async fn ban_user_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(event).expect("event is valid, we just created it"), content: RawValue::from_string(
serde_json::to_string(&event).expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(body.user_id.to_string()), state_key: Some(body.user_id.to_string()),
redacts: None, redacts: None,
@ -346,7 +353,7 @@ pub async fn unban_user_route(
) -> ConduitResult<unban_user::Response> { ) -> ConduitResult<unban_user::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let mut event = serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>( let mut event = serde_json::from_str::<ruma::events::room::member::MemberEventContent>(
db.rooms db.rooms
.room_state_get( .room_state_get(
&body.room_id, &body.room_id,
@ -358,10 +365,8 @@ pub async fn unban_user_route(
"Cannot unban a user who is not banned.", "Cannot unban a user who is not banned.",
))? ))?
.content .content
.clone(), .get(),
) )
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| Error::bad_database("Invalid member event in database."))?; .map_err(|_| Error::bad_database("Invalid member event in database."))?;
event.membership = ruma::events::room::member::MembershipState::Leave; event.membership = ruma::events::room::member::MembershipState::Leave;
@ -379,7 +384,10 @@ pub async fn unban_user_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(event).expect("event is valid, we just created it"), content: RawValue::from_string(
serde_json::to_string(&event).expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(body.user_id.to_string()), state_key: Some(body.user_id.to_string()),
redacts: None, redacts: None,
@ -563,6 +571,7 @@ async fn join_room_by_id_helper(
}, },
) )
.await; .await;
warn!("Make join done");
make_join_response_and_server = make_join_response.map(|r| (r, remote_server)); make_join_response_and_server = make_join_response.map(|r| (r, remote_server));
@ -658,93 +667,193 @@ async fn join_room_by_id_helper(
) )
.await?; .await?;
warn!("Send join done");
db.rooms.get_or_create_shortroomid(room_id, &db.globals)?; db.rooms.get_or_create_shortroomid(room_id, &db.globals)?;
let pdu = PduEvent::from_id_val(&event_id, join_event.clone()) let pdu = PduEvent::from_id_val(&event_id, join_event.clone())
.map_err(|_| Error::BadServerResponse("Invalid join event PDU."))?; .map_err(|_| Error::BadServerResponse("Invalid join event PDU."))?;
let mut state = HashMap::new(); let pub_key_map = Arc::new(RwLock::new(BTreeMap::new()));
let pub_key_map = RwLock::new(BTreeMap::new()); let missing_servers = Arc::new(RwLock::new(BTreeMap::new()));
server_server::fetch_join_signing_keys( let create_join_event::RoomState {
&send_join_response, state: mut room_state_state,
&room_version, auth_chain: mut room_state_auth_chain,
&pub_key_map, } = send_join_response.room_state;
db,
)
.await?;
for result in send_join_response
.room_state
.state
.iter()
.map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, db))
{
let (event_id, value) = match result {
Ok(t) => t,
Err(_) => continue,
};
let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| {
warn!("{:?}: {}", value, e);
Error::BadServerResponse("Invalid PDU in send_join response.")
})?;
db.rooms.add_pdu_outlier(&event_id, &value)?;
if let Some(state_key) = &pdu.state_key {
let shortstatekey =
db.rooms
.get_or_create_shortstatekey(&pdu.kind, state_key, &db.globals)?;
state.insert(shortstatekey, pdu.event_id.clone());
}
}
let incoming_shortstatekey = db.rooms.get_or_create_shortstatekey(
&pdu.kind,
pdu.state_key
.as_ref()
.expect("Pdu is a membership state event"),
&db.globals,
)?;
state.insert(incoming_shortstatekey, pdu.event_id.clone());
let create_shortstatekey = db let create_shortstatekey = db
.rooms .rooms
.get_shortstatekey(&EventType::RoomCreate, "")? .get_shortstatekey(&EventType::RoomCreate, "")?
.expect("Room exists"); .expect("Room exists");
if state.get(&create_shortstatekey).is_none() { let mut saw_create_event = false;
warn!("Parsing send join response state");
const CHUNK_SIZE: usize = 500;
let mut parsed_state = room_state_state
.par_chunks_mut(CHUNK_SIZE)
.filter_map(|pdus| {
let mut r = HashMap::with_capacity(CHUNK_SIZE);
for pdu in pdus {
let (id, value) = get_event_id(&pdu, &room_version).ok()?;
r.insert(id, value);
}
let mut missing_servers = missing_servers.write().unwrap();
let mut pub_key_map = pub_key_map.write().unwrap();
for (_, value) in &r {
server_server::get_server_keys_from_cache(
&value,
&mut missing_servers,
&mut pub_key_map,
&db,
)
.ok()?;
}
Some(r)
})
.collect::<Vec<_>>();
warn!("Parsing send join response auth chain");
let mut parsed_chain = room_state_auth_chain
.par_chunks_mut(CHUNK_SIZE)
.filter_map(|pdus| {
let mut r = HashMap::with_capacity(CHUNK_SIZE);
for pdu in pdus {
let (id, value) = get_event_id(&pdu, &room_version).ok()?;
r.insert(id, value);
}
let mut missing_servers = missing_servers.write().unwrap();
let mut pub_key_map = pub_key_map.write().unwrap();
for (_, value) in &r {
server_server::get_server_keys_from_cache(
&value,
&mut missing_servers,
&mut pub_key_map,
&db,
)
.ok()?;
}
Some(r)
})
.collect::<Vec<_>>();
warn!("Fetching send join signing keys");
server_server::fetch_join_signing_keys(missing_servers, &pub_key_map, db).await?;
warn!("Validating state");
parsed_state.par_iter_mut().for_each(|chunk| {
let mut bad_events = Vec::new();
for (event_id, value) in chunk.iter_mut() {
if let Err(e) = ruma::signatures::verify_event(
&*pub_key_map.read().unwrap(),
&value,
&room_version,
) {
warn!("Event {} failed verification {:?} {}", event_id, value, e);
bad_events.push(event_id.clone());
continue;
}
value.insert(
"event_id".to_owned(),
CanonicalJsonValue::String(event_id.as_str().to_owned()),
);
}
for id in bad_events {
chunk.remove(&id);
}
});
warn!("Inserting state");
db.rooms
.add_pdu_outlier_batch(&mut parsed_state.iter().flatten())?;
warn!("Compressing state");
let state = parsed_state
.iter()
.flatten()
.map(|(event_id, value)| {
let kind = if let Some(s) = value.get("type").and_then(|s| s.as_str()) {
s
} else {
warn!("Event {} has no type: {:?}", event_id, value);
return Ok(None);
};
if let Some(state_key) = value.get("state_key").and_then(|s| s.as_str()) {
let shortstatekey = db.rooms.get_or_create_shortstatekey(
&EventType::from(kind),
state_key,
&db.globals,
)?;
if shortstatekey == create_shortstatekey {
saw_create_event = true;
}
Ok(Some(db.rooms.compress_state_event(
shortstatekey,
&event_id,
&db.globals,
)?))
} else {
Ok(None)
}
})
.filter_map(|r| r.transpose())
.collect::<Result<HashSet<_>>>()?;
if !saw_create_event {
return Err(Error::BadServerResponse("State contained no create event.")); return Err(Error::BadServerResponse("State contained no create event."));
} }
db.rooms.force_state( warn!("Validating chain");
parsed_chain.par_iter_mut().for_each(|chunk| {
let mut bad_events = Vec::new();
for (event_id, value) in chunk.iter_mut() {
if let Err(e) = ruma::signatures::verify_event(
&*pub_key_map.read().unwrap(),
&value,
&room_version,
) {
warn!("Event {} failed verification {:?} {}", event_id, value, e);
bad_events.push(event_id.clone());
continue;
}
value.insert(
"event_id".to_owned(),
CanonicalJsonValue::String(event_id.as_str().to_owned()),
);
}
for id in bad_events {
chunk.remove(&id);
}
});
warn!("Inserting chain");
db.rooms
.add_pdu_outlier_batch(&mut parsed_chain.iter().flatten())?;
warn!("Forcing state of room");
db.rooms.force_state_new(
room_id, room_id,
state state,
.into_iter() &mut parsed_state.iter().flat_map(|m| m.values()),
.map(|(k, id)| db.rooms.compress_state_event(k, &id, &db.globals))
.collect::<Result<HashSet<_>>>()?,
db, db,
)?; )?;
for result in send_join_response
.room_state
.auth_chain
.iter()
.map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, db))
{
let (event_id, value) = match result {
Ok(t) => t,
Err(_) => continue,
};
db.rooms.add_pdu_outlier(&event_id, &value)?;
}
// We append to state before appending the pdu, so we don't have a moment in time with the // We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail. // pdu without it's state. This is okay because append_pdu can't fail.
warn!("Appending join event to state");
let statehashid = db.rooms.append_to_state(&pdu, &db.globals)?; let statehashid = db.rooms.append_to_state(&pdu, &db.globals)?;
warn!("Adding join event to db");
db.rooms.append_pdu( db.rooms.append_pdu(
&pdu, &pdu,
utils::to_canonical_object(&pdu).expect("Pdu is valid canonical object"), utils::to_canonical_object(&pdu).expect("Pdu is valid canonical object"),
@ -752,6 +861,7 @@ async fn join_room_by_id_helper(
db, db,
)?; )?;
warn!("Updating room state to join event");
// We set the room state after inserting the pdu, so that we never have a moment in time // We set the room state after inserting the pdu, so that we never have a moment in time
// where events in the current room state do not exist // where events in the current room state do not exist
db.rooms.set_room_state(room_id, statehashid)?; db.rooms.set_room_state(room_id, statehashid)?;
@ -769,7 +879,10 @@ async fn join_room_by_id_helper(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(event).expect("event is valid, we just created it"), content: RawValue::from_string(
serde_json::to_string(&event).expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(sender_user.to_string()), state_key: Some(sender_user.to_string()),
redacts: None, redacts: None,
@ -788,16 +901,15 @@ async fn join_room_by_id_helper(
Ok(join_room_by_id::Response::new(room_id.clone()).into()) Ok(join_room_by_id::Response::new(room_id.clone()).into())
} }
fn validate_and_add_event_id( fn get_event_id(
pdu: &Raw<Pdu>, pdu: &Raw<Pdu>,
room_version: &RoomVersionId, room_version: &RoomVersionId,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
db: &Database,
) -> Result<(EventId, CanonicalJsonObject)> { ) -> Result<(EventId, CanonicalJsonObject)> {
let mut value = serde_json::from_str::<CanonicalJsonObject>(pdu.json().get()).map_err(|e| { let value = serde_json::from_str::<CanonicalJsonObject>(pdu.json().get()).map_err(|e| {
error!("Invalid PDU in server response: {:?}: {:?}", pdu, e); warn!("Invalid PDU in server response: {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response") Error::BadServerResponse("Invalid PDU in server response")
})?; })?;
let event_id = EventId::try_from(&*format!( let event_id = EventId::try_from(&*format!(
"${}", "${}",
ruma::signatures::reference_hash(&value, room_version) ruma::signatures::reference_hash(&value, room_version)
@ -805,49 +917,6 @@ fn validate_and_add_event_id(
)) ))
.expect("ruma's reference hashes are valid event ids"); .expect("ruma's reference hashes are valid event ids");
let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) {
Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
}
Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1),
};
if let Some((time, tries)) = db
.globals
.bad_event_ratelimiter
.read()
.unwrap()
.get(&event_id)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
debug!("Backing off from {}", event_id);
return Err(Error::BadServerResponse("bad event, still backing off"));
}
}
if let Err(e) = ruma::signatures::verify_event(
&*pub_key_map
.read()
.map_err(|_| Error::bad_database("RwLock is poisoned."))?,
&value,
room_version,
) {
warn!("Event {} failed verification {:?} {}", event_id, pdu, e);
back_off(event_id);
return Err(Error::BadServerResponse("Event failed verification."));
}
value.insert(
"event_id".to_owned(),
CanonicalJsonValue::String(event_id.as_str().to_owned()),
);
Ok((event_id, value)) Ok((event_id, value))
} }
@ -884,7 +953,7 @@ pub(crate) async fn invite_helper<'a>(
let create_event_content = create_event let create_event_content = create_event
.as_ref() .as_ref()
.map(|create_event| { .map(|create_event| {
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) serde_json::from_str::<Raw<CreateEventContent>>(create_event.content.get())
.expect("Raw::from_value always works.") .expect("Raw::from_value always works.")
.deserialize() .deserialize()
.map_err(|e| { .map_err(|e| {
@ -910,16 +979,19 @@ pub(crate) async fn invite_helper<'a>(
let room_version = let room_version =
RoomVersion::new(&room_version_id).expect("room version is supported"); RoomVersion::new(&room_version_id).expect("room version is supported");
let content = serde_json::to_value(MemberEventContent { let content = RawValue::from_string(
avatar_url: None, serde_json::to_string(&MemberEventContent {
displayname: None, avatar_url: None,
is_direct: Some(is_direct), displayname: None,
membership: MembershipState::Invite, is_direct: Some(is_direct),
third_party_invite: None, membership: MembershipState::Invite,
blurhash: None, third_party_invite: None,
reason: None, blurhash: None,
}) reason: None,
.expect("member event is valid value"); })
.expect("member event is valid value"),
)
.expect("string is valid");
let state_key = user_id.to_string(); let state_key = user_id.to_string();
let kind = EventType::RoomMember; let kind = EventType::RoomMember;
@ -946,7 +1018,7 @@ pub(crate) async fn invite_helper<'a>(
unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone()); unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone());
unsigned.insert( unsigned.insert(
"prev_sender".to_owned(), "prev_sender".to_owned(),
serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"), serde_json::from_str(prev_pdu.sender.as_str()).expect("UserId is valid string"),
); );
} }
@ -959,6 +1031,7 @@ pub(crate) async fn invite_helper<'a>(
.expect("time is valid"), .expect("time is valid"),
kind, kind,
content, content,
parsed_content: RwLock::new(None),
state_key: Some(state_key), state_key: Some(state_key),
prev_events, prev_events,
depth, depth,
@ -967,11 +1040,20 @@ pub(crate) async fn invite_helper<'a>(
.map(|(_, pdu)| pdu.event_id.clone()) .map(|(_, pdu)| pdu.event_id.clone())
.collect(), .collect(),
redacts: None, redacts: None,
unsigned, unsigned: if unsigned.is_empty() {
None
} else {
Some(
RawValue::from_string(
serde_json::to_string(&unsigned).expect("to_string always works"),
)
.expect("string is valid"),
)
},
hashes: ruma::events::pdu::EventHash { hashes: ruma::events::pdu::EventHash {
sha256: "aaa".to_owned(), sha256: "aaa".to_owned(),
}, },
signatures: BTreeMap::new(), signatures: None,
}; };
let auth_check = state_res::auth_check( let auth_check = state_res::auth_check(
@ -1116,16 +1198,19 @@ pub(crate) async fn invite_helper<'a>(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent { content: RawValue::from_string(
membership: member::MembershipState::Invite, serde_json::to_string(&member::MemberEventContent {
displayname: db.users.displayname(user_id)?, membership: member::MembershipState::Invite,
avatar_url: db.users.avatar_url(user_id)?, displayname: db.users.displayname(user_id)?,
is_direct: Some(is_direct), avatar_url: db.users.avatar_url(user_id)?,
third_party_invite: None, is_direct: Some(is_direct),
blurhash: db.users.blurhash(user_id)?, third_party_invite: None,
reason: None, blurhash: db.users.blurhash(user_id)?,
}) reason: None,
.expect("event is valid, we just created it"), })
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(user_id.to_string()), state_key: Some(user_id.to_string()),
redacts: None, redacts: None,

92
src/client_server/profile.rs

@ -10,8 +10,8 @@ use ruma::{
federation::{self, query::get_profile_information::v1::ProfileField}, federation::{self, query::get_profile_information::v1::ProfileField},
}, },
events::EventType, events::EventType,
serde::Raw,
}; };
use serde_json::value::RawValue;
use std::{convert::TryInto, sync::Arc}; use std::{convert::TryInto, sync::Arc};
#[cfg(feature = "conduit_bin")] #[cfg(feature = "conduit_bin")]
@ -45,29 +45,30 @@ pub async fn set_displayname_route(
Ok::<_, Error>(( Ok::<_, Error>((
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(ruma::events::room::member::MemberEventContent { content: RawValue::from_string(
displayname: body.displayname.clone(), serde_json::to_string(&ruma::events::room::member::MemberEventContent {
..serde_json::from_value::<Raw<_>>( displayname: body.displayname.clone(),
db.rooms ..serde_json::from_str(
.room_state_get( db.rooms
&room_id, .room_state_get(
&EventType::RoomMember, &room_id,
&sender_user.to_string(), &EventType::RoomMember,
)? &sender_user.to_string(),
.ok_or_else(|| { )?
Error::bad_database( .ok_or_else(|| {
"Tried to send displayname update for user not in the \ Error::bad_database(
"Tried to send displayname update for user not in the \
room.", room.",
) )
})? })?
.content .content
.clone(), .get(),
) )
.expect("from_value::<Raw<..>> can never fail") .map_err(|_| Error::bad_database("Database contains invalid PDU."))?
.deserialize() })
.map_err(|_| Error::bad_database("Database contains invalid PDU."))? .expect("event is valid, we just created it"),
}) )
.expect("event is valid, we just created it"), .expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(sender_user.to_string()), state_key: Some(sender_user.to_string()),
redacts: None, redacts: None,
@ -190,29 +191,30 @@ pub async fn set_avatar_url_route(
Ok::<_, Error>(( Ok::<_, Error>((
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(ruma::events::room::member::MemberEventContent { content: RawValue::from_string(
avatar_url: body.avatar_url.clone(), serde_json::to_string(&ruma::events::room::member::MemberEventContent {
..serde_json::from_value::<Raw<_>>( avatar_url: body.avatar_url.clone(),
db.rooms ..serde_json::from_str(
.room_state_get( db.rooms
&room_id, .room_state_get(
&EventType::RoomMember, &room_id,
&sender_user.to_string(), &EventType::RoomMember,
)? &sender_user.to_string(),
.ok_or_else(|| { )?
Error::bad_database( .ok_or_else(|| {
"Tried to send displayname update for user not in the \ Error::bad_database(
"Tried to send displayname update for user not in the \
room.", room.",
) )
})? })?
.content .content
.clone(), .get(),
) )
.expect("from_value::<Raw<..>> can never fail") .map_err(|_| Error::bad_database("Database contains invalid PDU."))?
.deserialize() })
.map_err(|_| Error::bad_database("Database contains invalid PDU."))? .expect("event is valid, we just created it"),
}) )
.expect("event is valid, we just created it"), .expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(sender_user.to_string()), state_key: Some(sender_user.to_string()),
redacts: None, redacts: None,

12
src/client_server/redact.rs

@ -8,6 +8,7 @@ use ruma::{
#[cfg(feature = "conduit_bin")] #[cfg(feature = "conduit_bin")]
use rocket::put; use rocket::put;
use serde_json::value::RawValue;
/// # `PUT /_matrix/client/r0/rooms/{roomId}/redact/{eventId}/{txnId}` /// # `PUT /_matrix/client/r0/rooms/{roomId}/redact/{eventId}/{txnId}`
/// ///
@ -38,10 +39,13 @@ pub async fn redact_event_route(
let event_id = db.rooms.build_and_append_pdu( let event_id = db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomRedaction, event_type: EventType::RoomRedaction,
content: serde_json::to_value(redaction::RedactionEventContent { content: RawValue::from_string(
reason: body.reason.clone(), serde_json::to_string(&redaction::RedactionEventContent {
}) reason: body.reason.clone(),
.expect("event is valid, we just created it"), })
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: None, state_key: None,
redacts: Some(body.event_id.clone()), redacts: Some(body.event_id.clone()),

177
src/client_server/room.rs

@ -11,9 +11,9 @@ use ruma::{
room::{guest_access, history_visibility, join_rules, member, name, topic}, room::{guest_access, history_visibility, join_rules, member, name, topic},
EventType, EventType,
}, },
serde::Raw,
RoomAliasId, RoomId, RoomVersionId, RoomAliasId, RoomId, RoomVersionId,
}; };
use serde_json::value::RawValue;
use std::{cmp::max, collections::BTreeMap, convert::TryFrom, sync::Arc}; use std::{cmp::max, collections::BTreeMap, convert::TryFrom, sync::Arc};
use tracing::{info, warn}; use tracing::{info, warn};
@ -101,7 +101,10 @@ pub async fn create_room_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomCreate, event_type: EventType::RoomCreate,
content: serde_json::to_value(content).expect("event is valid, we just created it"), content: RawValue::from_string(
serde_json::to_string(&content).expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -116,16 +119,19 @@ pub async fn create_room_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent { content: RawValue::from_string(
membership: member::MembershipState::Join, serde_json::to_string(&member::MemberEventContent {
displayname: db.users.displayname(sender_user)?, membership: member::MembershipState::Join,
avatar_url: db.users.avatar_url(sender_user)?, displayname: db.users.displayname(sender_user)?,
is_direct: Some(body.is_direct), avatar_url: db.users.avatar_url(sender_user)?,
third_party_invite: None, is_direct: Some(body.is_direct),
blurhash: db.users.blurhash(sender_user)?, third_party_invite: None,
reason: None, blurhash: db.users.blurhash(sender_user)?,
}) reason: None,
.expect("event is valid, we just created it"), })
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(sender_user.to_string()), state_key: Some(sender_user.to_string()),
redacts: None, redacts: None,
@ -180,7 +186,8 @@ pub async fn create_room_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomPowerLevels, event_type: EventType::RoomPowerLevels,
content: power_levels_content, content: RawValue::from_string(power_levels_content.to_string())
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -196,13 +203,16 @@ pub async fn create_room_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomCanonicalAlias, event_type: EventType::RoomCanonicalAlias,
content: serde_json::to_value( content: RawValue::from_string(
ruma::events::room::canonical_alias::CanonicalAliasEventContent { serde_json::to_string(
alias: Some(room_alias_id.clone()), &ruma::events::room::canonical_alias::CanonicalAliasEventContent {
alt_aliases: vec![], alias: Some(room_alias_id.clone()),
}, alt_aliases: vec![],
},
)
.expect("We checked that alias earlier, it must be fine"),
) )
.expect("We checked that alias earlier, it must be fine"), .expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -220,17 +230,15 @@ pub async fn create_room_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomJoinRules, event_type: EventType::RoomJoinRules,
content: match preset { content: RawValue::from_string(
create_room::RoomPreset::PublicChat => serde_json::to_value( serde_json::to_string(&join_rules::JoinRulesEventContent::new(match preset {
join_rules::JoinRulesEventContent::new(join_rules::JoinRule::Public), create_room::RoomPreset::PublicChat => join_rules::JoinRule::Public,
) // according to spec "invite" is the default
_ => join_rules::JoinRule::Invite,
}))
.expect("event is valid, we just created it"), .expect("event is valid, we just created it"),
// according to spec "invite" is the default )
_ => serde_json::to_value(join_rules::JoinRulesEventContent::new( .expect("string is valid"),
join_rules::JoinRule::Invite,
))
.expect("event is valid, we just created it"),
},
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -245,10 +253,13 @@ pub async fn create_room_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomHistoryVisibility, event_type: EventType::RoomHistoryVisibility,
content: serde_json::to_value(history_visibility::HistoryVisibilityEventContent::new( content: RawValue::from_string(
history_visibility::HistoryVisibility::Shared, serde_json::to_string(&history_visibility::HistoryVisibilityEventContent::new(
)) history_visibility::HistoryVisibility::Shared,
.expect("event is valid, we just created it"), ))
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -263,18 +274,14 @@ pub async fn create_room_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomGuestAccess, event_type: EventType::RoomGuestAccess,
content: match preset { content: RawValue::from_string(
create_room::RoomPreset::PublicChat => { serde_json::to_string(&guest_access::GuestAccessEventContent::new(match preset {
serde_json::to_value(guest_access::GuestAccessEventContent::new( create_room::RoomPreset::PublicChat => guest_access::GuestAccess::Forbidden,
guest_access::GuestAccess::Forbidden, _ => guest_access::GuestAccess::CanJoin,
)) }))
.expect("event is valid, we just created it")
}
_ => serde_json::to_value(guest_access::GuestAccessEventContent::new(
guest_access::GuestAccess::CanJoin,
))
.expect("event is valid, we just created it"), .expect("event is valid, we just created it"),
}, )
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -306,8 +313,11 @@ pub async fn create_room_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomName, event_type: EventType::RoomName,
content: serde_json::to_value(name::NameEventContent::new(Some(name.clone()))) content: RawValue::from_string(
.expect("event is valid, we just created it"), serde_json::to_string(&name::NameEventContent::new(Some(name.clone())))
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -323,10 +333,13 @@ pub async fn create_room_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomTopic, event_type: EventType::RoomTopic,
content: serde_json::to_value(topic::TopicEventContent { content: RawValue::from_string(
topic: topic.clone(), serde_json::to_string(&topic::TopicEventContent {
}) topic: topic.clone(),
.expect("event is valid, we just created it"), })
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -477,11 +490,14 @@ pub async fn upgrade_room_route(
let tombstone_event_id = db.rooms.build_and_append_pdu( let tombstone_event_id = db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomTombstone, event_type: EventType::RoomTombstone,
content: serde_json::to_value(ruma::events::room::tombstone::TombstoneEventContent { content: RawValue::from_string(
body: "This room has been replaced".to_string(), serde_json::to_string(&ruma::events::room::tombstone::TombstoneEventContent {
replacement_room: replacement_room.clone(), body: "This room has been replaced".to_string(),
}) replacement_room: replacement_room.clone(),
.expect("event is valid, we just created it"), })
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -505,15 +521,13 @@ pub async fn upgrade_room_route(
let state_lock = mutex_state.lock().await; let state_lock = mutex_state.lock().await;
// Get the old room federations status // Get the old room federations status
let federate = serde_json::from_value::<Raw<ruma::events::room::create::CreateEventContent>>( let federate = serde_json::from_str::<ruma::events::room::create::CreateEventContent>(
db.rooms db.rooms
.room_state_get(&body.room_id, &EventType::RoomCreate, "")? .room_state_get(&body.room_id, &EventType::RoomCreate, "")?
.ok_or_else(|| Error::bad_database("Found room without m.room.create event."))? .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))?
.content .content
.clone(), .get(),
) )
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid room event in database."))? .map_err(|_| Error::bad_database("Invalid room event in database."))?
.federate; .federate;
@ -533,8 +547,11 @@ pub async fn upgrade_room_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomCreate, event_type: EventType::RoomCreate,
content: serde_json::to_value(create_event_content) content: RawValue::from_string(
.expect("event is valid, we just created it"), serde_json::to_string(&create_event_content)
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,
@ -549,16 +566,19 @@ pub async fn upgrade_room_route(
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent { content: RawValue::from_string(
membership: member::MembershipState::Join, serde_json::to_string(&member::MemberEventContent {
displayname: db.users.displayname(sender_user)?, membership: member::MembershipState::Join,
avatar_url: db.users.avatar_url(sender_user)?, displayname: db.users.displayname(sender_user)?,
is_direct: None, avatar_url: db.users.avatar_url(sender_user)?,
third_party_invite: None, is_direct: None,
blurhash: db.users.blurhash(sender_user)?, third_party_invite: None,
reason: None, blurhash: db.users.blurhash(sender_user)?,
}) reason: None,
.expect("event is valid, we just created it"), })
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(sender_user.to_string()), state_key: Some(sender_user.to_string()),
redacts: None, redacts: None,
@ -585,7 +605,7 @@ pub async fn upgrade_room_route(
// Replicate transferable state events to the new room // Replicate transferable state events to the new room
for event_type in transferable_state_events { for event_type in transferable_state_events {
let event_content = match db.rooms.room_state_get(&body.room_id, &event_type, "")? { let event_content = match db.rooms.room_state_get(&body.room_id, &event_type, "")? {
Some(v) => v.content.clone(), Some(v) => v.content,
None => continue, // Skipping missing events. None => continue, // Skipping missing events.
}; };
@ -612,15 +632,13 @@ pub async fn upgrade_room_route(
// Get the old room power levels // Get the old room power levels
let mut power_levels_event_content = let mut power_levels_event_content =
serde_json::from_value::<Raw<ruma::events::room::power_levels::PowerLevelsEventContent>>( serde_json::from_str::<ruma::events::room::power_levels::PowerLevelsEventContent>(
db.rooms db.rooms
.room_state_get(&body.room_id, &EventType::RoomPowerLevels, "")? .room_state_get(&body.room_id, &EventType::RoomPowerLevels, "")?
.ok_or_else(|| Error::bad_database("Found room without m.room.create event."))? .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))?
.content .content
.clone(), .get(),
) )
.expect("database contains invalid PDU")
.deserialize()
.map_err(|_| Error::bad_database("Invalid room event in database."))?; .map_err(|_| Error::bad_database("Invalid room event in database."))?;
// Setting events_default and invite to the greater of 50 and users_default + 1 // Setting events_default and invite to the greater of 50 and users_default + 1
@ -635,8 +653,11 @@ pub async fn upgrade_room_route(
let _ = db.rooms.build_and_append_pdu( let _ = db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomPowerLevels, event_type: EventType::RoomPowerLevels,
content: serde_json::to_value(power_levels_event_content) content: RawValue::from_string(
.expect("event is valid, we just created it"), serde_json::to_string(&power_levels_event_content)
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
redacts: None, redacts: None,

10
src/client_server/state.rs

@ -112,7 +112,7 @@ pub async fn get_state_events_route(
db.rooms db.rooms
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
.map(|event| { .map(|event| {
serde_json::from_value::<HistoryVisibilityEventContent>(event.content.clone()) serde_json::from_str::<HistoryVisibilityEventContent>(event.content.get())
.map_err(|_| { .map_err(|_| {
Error::bad_database( Error::bad_database(
"Invalid room history visibility event in database.", "Invalid room history visibility event in database.",
@ -164,7 +164,7 @@ pub async fn get_state_events_for_key_route(
db.rooms db.rooms
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
.map(|event| { .map(|event| {
serde_json::from_value::<HistoryVisibilityEventContent>(event.content.clone()) serde_json::from_str::<HistoryVisibilityEventContent>(event.content.get())
.map_err(|_| { .map_err(|_| {
Error::bad_database( Error::bad_database(
"Invalid room history visibility event in database.", "Invalid room history visibility event in database.",
@ -190,7 +190,7 @@ pub async fn get_state_events_for_key_route(
))?; ))?;
Ok(get_state_events_for_key::Response { Ok(get_state_events_for_key::Response {
content: serde_json::from_value(event.content.clone()) content: serde_json::from_str(event.content.get())
.map_err(|_| Error::bad_database("Invalid event content in database"))?, .map_err(|_| Error::bad_database("Invalid event content in database"))?,
} }
.into()) .into())
@ -220,7 +220,7 @@ pub async fn get_state_events_for_empty_key_route(
db.rooms db.rooms
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
.map(|event| { .map(|event| {
serde_json::from_value::<HistoryVisibilityEventContent>(event.content.clone()) serde_json::from_str::<HistoryVisibilityEventContent>(event.content.get())
.map_err(|_| { .map_err(|_| {
Error::bad_database( Error::bad_database(
"Invalid room history visibility event in database.", "Invalid room history visibility event in database.",
@ -246,7 +246,7 @@ pub async fn get_state_events_for_empty_key_route(
))?; ))?;
Ok(get_state_events_for_key::Response { Ok(get_state_events_for_key::Response {
content: serde_json::from_value(event.content.clone()) content: serde_json::from_str(event.content.get())
.map_err(|_| Error::bad_database("Invalid event content in database"))?, .map_err(|_| Error::bad_database("Invalid event content in database"))?,
} }
.into()) .into())

18
src/client_server/sync.rs

@ -287,9 +287,9 @@ async fn sync_helper(
.filter_map(|pdu| pdu.ok()) // Ignore all broken pdus .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
.filter(|(_, pdu)| pdu.kind == EventType::RoomMember) .filter(|(_, pdu)| pdu.kind == EventType::RoomMember)
.map(|(_, pdu)| { .map(|(_, pdu)| {
let content = serde_json::from_value::< let content = serde_json::from_str::<
ruma::events::room::member::MemberEventContent, ruma::events::room::member::MemberEventContent,
>(pdu.content.clone()) >(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid member event in database."))?; .map_err(|_| Error::bad_database("Invalid member event in database."))?;
if let Some(state_key) = &pdu.state_key { if let Some(state_key) = &pdu.state_key {
@ -371,11 +371,9 @@ async fn sync_helper(
sender_user.as_str(), sender_user.as_str(),
)? )?
.and_then(|pdu| { .and_then(|pdu| {
serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>( serde_json::from_str::<ruma::events::room::member::MemberEventContent>(
pdu.content.clone(), pdu.content.get(),
) )
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database.")) .map_err(|_| Error::bad_database("Invalid PDU in database."))
.ok() .ok()
}); });
@ -432,11 +430,9 @@ async fn sync_helper(
continue; continue;
} }
let new_membership = serde_json::from_value::< let new_membership = serde_json::from_str::<
Raw<ruma::events::room::member::MemberEventContent>, ruma::events::room::member::MemberEventContent,
>(state_event.content.clone()) >(state_event.content.get())
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database."))? .map_err(|_| Error::bad_database("Invalid PDU in database."))?
.membership; .membership;

8
src/database/admin.rs

@ -9,6 +9,7 @@ use ruma::{
events::{room::message, EventType}, events::{room::message, EventType},
UserId, UserId,
}; };
use serde_json::value::RawValue;
use tokio::sync::{MutexGuard, RwLock, RwLockReadGuard}; use tokio::sync::{MutexGuard, RwLock, RwLockReadGuard};
use tracing::warn; use tracing::warn;
@ -66,8 +67,11 @@ impl Admin {
.build_and_append_pdu( .build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMessage, event_type: EventType::RoomMessage,
content: serde_json::to_value(message) content: RawValue::from_string(
.expect("event is valid, we just created it"), serde_json::to_string(&message)
.expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: None, state_key: None,
redacts: None, redacts: None,

30
src/database/pusher.rs

@ -9,8 +9,10 @@ use ruma::{
}, },
IncomingResponse, OutgoingRequest, SendAccessToken, IncomingResponse, OutgoingRequest, SendAccessToken,
}, },
events::{room::power_levels::PowerLevelsEventContent, AnySyncRoomEvent, EventType}, events::{
identifiers::RoomName, room::{name::NameEventContent, power_levels::PowerLevelsEventContent},
AnySyncRoomEvent, EventType,
},
push::{Action, PushConditionRoomCtx, PushFormat, Ruleset, Tweak}, push::{Action, PushConditionRoomCtx, PushFormat, Ruleset, Tweak},
serde::Raw, serde::Raw,
uint, RoomId, UInt, UserId, uint, RoomId, UInt, UserId,
@ -181,7 +183,7 @@ pub async fn send_push_notice(
.rooms .rooms
.room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")? .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
.map(|ev| { .map(|ev| {
serde_json::from_value(ev.content.clone()) serde_json::from_str(ev.content.get())
.map_err(|_| Error::bad_database("invalid m.room.power_levels event")) .map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
}) })
.transpose()? .transpose()?
@ -318,16 +320,18 @@ async fn send_notice(
let user_name = db.users.displayname(&event.sender)?; let user_name = db.users.displayname(&event.sender)?;
notifi.sender_display_name = user_name.as_deref(); notifi.sender_display_name = user_name.as_deref();
let room_name = db
.rooms let room_name = if let Some(room_name_pdu) =
.room_state_get(&event.room_id, &EventType::RoomName, "")? db.rooms
.map(|pdu| match pdu.content.get("name") { .room_state_get(&event.room_id, &EventType::RoomName, "")?
Some(serde_json::Value::String(s)) => { {
Some(Box::<RoomName>::try_from(&**s).expect("room name is valid")) serde_json::from_str::<NameEventContent>(room_name_pdu.content.get())
} .map_err(|_| Error::bad_database("Invalid room name event in database."))?
_ => None, .name
}) } else {
.flatten(); None
};
notifi.room_name = room_name.as_deref(); notifi.room_name = room_name.as_deref();
send_request( send_request(

227
src/database/rooms.rs

@ -2,6 +2,7 @@ mod edus;
pub use edus::RoomEdus; pub use edus::RoomEdus;
use member::MembershipState; use member::MembershipState;
use serde_json::value::RawValue;
use crate::{pdu::PduBuilder, server_server, utils, Database, Error, PduEvent, Result}; use crate::{pdu::PduBuilder, server_server, utils, Database, Error, PduEvent, Result};
use lru_cache::LruCache; use lru_cache::LruCache;
@ -96,7 +97,7 @@ pub struct Rooms {
/// RoomId + EventId -> Parent PDU EventId. /// RoomId + EventId -> Parent PDU EventId.
pub(super) referencedevents: Arc<dyn Tree>, pub(super) referencedevents: Arc<dyn Tree>,
pub(super) pdu_cache: Mutex<LruCache<EventId, Arc<PduEvent>>>, pub(super) pdu_cache: Mutex<LruCache<EventId, Vec<u8>>>,
pub(super) shorteventid_cache: Mutex<LruCache<u64, Arc<EventId>>>, pub(super) shorteventid_cache: Mutex<LruCache<u64, Arc<EventId>>>,
pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<HashSet<u64>>>>, pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<HashSet<u64>>>>,
pub(super) eventidshort_cache: Mutex<LruCache<EventId, u64>>, pub(super) eventidshort_cache: Mutex<LruCache<EventId, u64>>,
@ -137,7 +138,7 @@ impl Rooms {
pub fn state_full( pub fn state_full(
&self, &self,
shortstatehash: u64, shortstatehash: u64,
) -> Result<HashMap<(EventType, String), Arc<PduEvent>>> { ) -> Result<HashMap<(EventType, String), PduEvent>> {
let full_state = self let full_state = self
.load_shortstatehash_info(shortstatehash)? .load_shortstatehash_info(shortstatehash)?
.pop() .pop()
@ -199,7 +200,7 @@ impl Rooms {
shortstatehash: u64, shortstatehash: u64,
event_type: &EventType, event_type: &EventType,
state_key: &str, state_key: &str,
) -> Result<Option<Arc<PduEvent>>> { ) -> Result<Option<PduEvent>> {
self.state_get_id(shortstatehash, event_type, state_key)? self.state_get_id(shortstatehash, event_type, state_key)?
.map_or(Ok(None), |event_id| self.get_pdu(&event_id)) .map_or(Ok(None), |event_id| self.get_pdu(&event_id))
} }
@ -243,8 +244,8 @@ impl Rooms {
kind: &EventType, kind: &EventType,
sender: &UserId, sender: &UserId,
state_key: Option<&str>, state_key: Option<&str>,
content: &serde_json::Value, content: &serde_json::value::RawValue,
) -> Result<StateMap<Arc<PduEvent>>> { ) -> Result<StateMap<PduEvent>> {
let shortstatehash = let shortstatehash =
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? { if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
current_shortstatehash current_shortstatehash
@ -252,7 +253,12 @@ impl Rooms {
return Ok(HashMap::new()); return Ok(HashMap::new());
}; };
let auth_events = state_res::auth_types_for_event(kind, sender, state_key, content); let auth_events = state_res::auth_types_for_event(
kind,
sender,
state_key,
&serde_json::from_str(content.get()).expect("RawValue is valid json"),
);
let mut sauthevents = auth_events let mut sauthevents = auth_events
.into_iter() .into_iter()
@ -329,6 +335,75 @@ impl Rooms {
.transpose() .transpose()
} }
/// Force the creation of a new StateHash and insert it into the db. This version should be
/// used when creating new rooms.
///
/// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot.
#[tracing::instrument(skip(self, new_state_ids_compressed, state_values, db))]
pub fn force_state_new(
&self,
room_id: &RoomId,
new_state_ids_compressed: HashSet<CompressedStateEvent>,
state_values: &mut dyn Iterator<Item = &CanonicalJsonObject>,
db: &Database,
) -> Result<()> {
let state_hash = self.calculate_hash(
&new_state_ids_compressed
.iter()
.map(|bytes| &bytes[..])
.collect::<Vec<_>>(),
);
let (new_shortstatehash, _) =
self.get_or_create_shortstatehash(&state_hash, &db.globals)?;
self.save_state_from_diff(
new_shortstatehash,
new_state_ids_compressed.clone(),
HashSet::new(),
2, // every state change is 2 event changes on average
Vec::new(),
)?;
for pdu in state_values {
if pdu.get("type").and_then(|val| val.as_str()) == Some("m.room.member") {
let content = pdu
.get("content")
.and_then(|o| o.as_object())
.ok_or_else(|| Error::bad_database("Invalid content in pdu."))?;
if let Some(membership) = content.get("membership").and_then(|membership| {
serde_json::from_str::<member::MembershipState>(
&serde_json::to_string(membership).expect("json is valid string"),
)
.ok()
}) {
if let Some(state_key) = pdu
.get("state_key")
.and_then(|o| o.as_str())
.and_then(|state_key| UserId::try_from(state_key).ok())
{
if let Some(sender) = pdu
.get("sender")
.and_then(|o| o.as_str())
.and_then(|state_key| UserId::try_from(state_key).ok())
{
self.update_membership(
room_id, &state_key, membership, &sender, None, db, false,
)?;
}
}
}
}
}
self.update_joined_count(room_id, db)?;
self.roomid_shortstatehash
.insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?;
Ok(())
}
/// Force the creation of a new StateHash and insert it into the db. /// Force the creation of a new StateHash and insert it into the db.
/// ///
/// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot. /// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot.
@ -393,29 +468,28 @@ impl Rooms {
}) { }) {
if let Some(pdu) = self.get_pdu_json(&event_id)? { if let Some(pdu) = self.get_pdu_json(&event_id)? {
if pdu.get("type").and_then(|val| val.as_str()) == Some("m.room.member") { if pdu.get("type").and_then(|val| val.as_str()) == Some("m.room.member") {
if let Ok(pdu) = serde_json::from_value::<PduEvent>( let content = pdu
serde_json::to_value(&pdu).expect("CanonicalJsonObj is a valid JsonValue"), .get("content")
) { .and_then(|o| o.as_object())
if let Some(membership) = .ok_or_else(|| Error::bad_database("Invalid content in pdu."))?;
pdu.content.get("membership").and_then(|membership| { if let Some(membership) = content.get("membership").and_then(|membership| {
serde_json::from_value::<member::MembershipState>( serde_json::from_str::<member::MembershipState>(
membership.clone(), &serde_json::to_string(membership).expect("json is valid string"),
) )
.ok() .ok()
}) }) {
if let Some(state_key) = pdu
.get("state_key")
.and_then(|o| o.as_str())
.and_then(|state_key| UserId::try_from(state_key).ok())
{ {
if let Some(state_key) = pdu if let Some(sender) = pdu
.state_key .get("sender")
.and_then(|o| o.as_str())
.and_then(|state_key| UserId::try_from(state_key).ok()) .and_then(|state_key| UserId::try_from(state_key).ok())
{ {
self.update_membership( self.update_membership(
room_id, room_id, &state_key, membership, &sender, None, db, false,
&state_key,
membership,
&pdu.sender,
None,
db,
false,
)?; )?;
} }
} }
@ -915,7 +989,7 @@ impl Rooms {
pub fn room_state_full( pub fn room_state_full(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
) -> Result<HashMap<(EventType, String), Arc<PduEvent>>> { ) -> Result<HashMap<(EventType, String), PduEvent>> {
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? { if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
self.state_full(current_shortstatehash) self.state_full(current_shortstatehash)
} else { } else {
@ -945,7 +1019,7 @@ impl Rooms {
room_id: &RoomId, room_id: &RoomId,
event_type: &EventType, event_type: &EventType,
state_key: &str, state_key: &str,
) -> Result<Option<Arc<PduEvent>>> { ) -> Result<Option<PduEvent>> {
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? { if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
self.state_get(current_shortstatehash, event_type, state_key) self.state_get(current_shortstatehash, event_type, state_key)
} else { } else {
@ -1073,9 +1147,11 @@ impl Rooms {
/// ///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline. /// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn get_pdu(&self, event_id: &EventId) -> Result<Option<Arc<PduEvent>>> { pub fn get_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
if let Some(p) = self.pdu_cache.lock().unwrap().get_mut(event_id) { if let Some(p) = self.pdu_cache.lock().unwrap().get_mut(event_id) {
return Ok(Some(Arc::clone(p))); return Ok(Some(
serde_json::from_slice(p).expect("pdus in cache are valid"),
));
} }
if let Some(pdu) = self if let Some(pdu) = self
@ -1092,18 +1168,13 @@ impl Rooms {
})?)) })?))
}, },
)? )?
.map(|pdu| {
serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
.map(Arc::new)
})
.transpose()?
{ {
self.pdu_cache let parsed =
.lock() serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."));
.unwrap()
.insert(event_id.clone(), Arc::clone(&pdu)); self.pdu_cache.lock().unwrap().insert(event_id.clone(), pdu);
Ok(Some(pdu))
parsed
} else { } else {
Ok(None) Ok(None)
} }
@ -1228,6 +1299,23 @@ impl Rooms {
) )
} }
/// Append the PDU as an outlier.
///
/// Any event given to this will be processed (state-res) on another thread.
#[tracing::instrument(skip(self, batch))]
pub fn add_pdu_outlier_batch(
&self,
batch: &mut dyn Iterator<Item = (&EventId, &CanonicalJsonObject)>,
) -> Result<()> {
self.eventid_outlierpdu
.insert_batch(&mut batch.map(|(event_id, pdu)| {
(
event_id.as_bytes().to_vec(),
serde_json::to_vec(&pdu).expect("CanonicalJsonObject is valid"),
)
}))
}
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()> { pub fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()> {
self.softfailedeventids.insert(event_id.as_bytes(), &[]) self.softfailedeventids.insert(event_id.as_bytes(), &[])
@ -1329,7 +1417,7 @@ impl Rooms {
.rooms .rooms
.room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")? .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
.map(|ev| { .map(|ev| {
serde_json::from_value(ev.content.clone()) serde_json::from_str(ev.content.get())
.map_err(|_| Error::bad_database("invalid m.room.power_levels event")) .map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
}) })
.transpose()? .transpose()?
@ -1408,8 +1496,11 @@ impl Rooms {
let target_user_id = UserId::try_from(state_key.clone()) let target_user_id = UserId::try_from(state_key.clone())
.expect("This state_key was previously validated"); .expect("This state_key was previously validated");
let content = serde_json::from_str::<serde_json::Value>(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
let membership = serde_json::from_value::<member::MembershipState>( let membership = serde_json::from_value::<member::MembershipState>(
pdu.content content
.get("membership") .get("membership")
.ok_or(Error::BadRequest( .ok_or(Error::BadRequest(
ErrorKind::InvalidParam, ErrorKind::InvalidParam,
@ -1447,7 +1538,10 @@ impl Rooms {
} }
} }
EventType::RoomMessage => { EventType::RoomMessage => {
if let Some(body) = pdu.content.get("body").and_then(|b| b.as_str()) { let content = serde_json::from_str::<serde_json::Value>(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
if let Some(body) = content.get("body").and_then(|b| b.as_str()) {
let mut batch = body let mut batch = body
.split_terminator(|c: char| !c.is_alphanumeric()) .split_terminator(|c: char| !c.is_alphanumeric())
.filter(|s| !s.is_empty()) .filter(|s| !s.is_empty())
@ -1961,7 +2055,7 @@ impl Rooms {
let create_event_content = create_event let create_event_content = create_event
.as_ref() .as_ref()
.map(|create_event| { .map(|create_event| {
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) serde_json::from_str::<Raw<CreateEventContent>>(create_event.content.get())
.expect("Raw::from_value always works.") .expect("Raw::from_value always works.")
.deserialize() .deserialize()
.map_err(|e| { .map_err(|e| {
@ -2000,7 +2094,10 @@ impl Rooms {
let mut unsigned = unsigned.unwrap_or_default(); let mut unsigned = unsigned.unwrap_or_default();
if let Some(state_key) = &state_key { if let Some(state_key) = &state_key {
if let Some(prev_pdu) = self.room_state_get(room_id, &event_type, state_key)? { if let Some(prev_pdu) = self.room_state_get(room_id, &event_type, state_key)? {
unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone()); unsigned.insert(
"prev_content".to_owned(),
serde_json::from_str(prev_pdu.content.get()).expect("string is valid json"),
);
unsigned.insert( unsigned.insert(
"prev_sender".to_owned(), "prev_sender".to_owned(),
serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"), serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"),
@ -2017,6 +2114,7 @@ impl Rooms {
.expect("time is valid"), .expect("time is valid"),
kind: event_type, kind: event_type,
content, content,
parsed_content: RwLock::new(None),
state_key, state_key,
prev_events, prev_events,
depth, depth,
@ -2025,11 +2123,20 @@ impl Rooms {
.map(|(_, pdu)| pdu.event_id.clone()) .map(|(_, pdu)| pdu.event_id.clone())
.collect(), .collect(),
redacts, redacts,
unsigned, unsigned: if unsigned.is_empty() {
None
} else {
Some(
RawValue::from_string(
serde_json::to_string(&unsigned).expect("to_string always works"),
)
.expect("string is valid"),
)
},
hashes: ruma::events::pdu::EventHash { hashes: ruma::events::pdu::EventHash {
sha256: "aaa".to_owned(), sha256: "aaa".to_owned(),
}, },
signatures: BTreeMap::new(), signatures: None,
}; };
let auth_check = state_res::auth_check( let auth_check = state_res::auth_check(
@ -2205,7 +2312,7 @@ impl Rooms {
let mut pdu = serde_json::from_slice::<PduEvent>(&v) let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id { if pdu.sender != user_id {
pdu.unsigned.remove("transaction_id"); pdu.remove_transaction_id()?;
} }
Ok((pdu_id, pdu)) Ok((pdu_id, pdu))
})) }))
@ -2242,7 +2349,7 @@ impl Rooms {
let mut pdu = serde_json::from_slice::<PduEvent>(&v) let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id { if pdu.sender != user_id {
pdu.unsigned.remove("transaction_id"); pdu.remove_transaction_id()?;
} }
Ok((pdu_id, pdu)) Ok((pdu_id, pdu))
})) }))
@ -2279,7 +2386,7 @@ impl Rooms {
let mut pdu = serde_json::from_slice::<PduEvent>(&v) let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id { if pdu.sender != user_id {
pdu.unsigned.remove("transaction_id"); pdu.remove_transaction_id()?;
} }
Ok((pdu_id, pdu)) Ok((pdu_id, pdu))
})) }))
@ -2348,11 +2455,9 @@ impl Rooms {
if let Some(predecessor) = self if let Some(predecessor) = self
.room_state_get(room_id, &EventType::RoomCreate, "")? .room_state_get(room_id, &EventType::RoomCreate, "")?
.and_then(|create| { .and_then(|create| {
serde_json::from_value::< serde_json::from_str::<ruma::events::room::create::CreateEventContent>(
Raw<ruma::events::room::create::CreateEventContent>, create.content.get(),
>(create.content.clone()) )
.expect("Raw::from_value always works")
.deserialize()
.ok() .ok()
}) })
.and_then(|content| content.predecessor) .and_then(|content| content.predecessor)
@ -2700,17 +2805,15 @@ impl Rooms {
); );
let state_lock = mutex_state.lock().await; let state_lock = mutex_state.lock().await;
let mut event = serde_json::from_value::<Raw<member::MemberEventContent>>( let mut event = serde_json::from_str::<member::MemberEventContent>(
self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())? self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())?
.ok_or(Error::BadRequest( .ok_or(Error::BadRequest(
ErrorKind::BadState, ErrorKind::BadState,
"Cannot leave a room you are not a member of.", "Cannot leave a room you are not a member of.",
))? ))?
.content .content
.clone(), .get(),
) )
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| Error::bad_database("Invalid member event in database."))?; .map_err(|_| Error::bad_database("Invalid member event in database."))?;
event.membership = member::MembershipState::Leave; event.membership = member::MembershipState::Leave;
@ -2718,8 +2821,10 @@ impl Rooms {
self.build_and_append_pdu( self.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: EventType::RoomMember, event_type: EventType::RoomMember,
content: serde_json::to_value(event) content: RawValue::from_string(
.expect("event is valid, we just created it"), serde_json::to_string(&event).expect("event is valid, we just created it"),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(user_id.to_string()), state_key: Some(user_id.to_string()),
redacts: None, redacts: None,

10
src/database/sending.rs

@ -573,8 +573,14 @@ impl Sending {
for pdu in pdus { for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them) // Redacted events are not notification targets (we don't send push for them)
if pdu.unsigned.get("redacted_because").is_some() { if let Some(unsigned) = &pdu.unsigned {
continue; if let Ok(unsigned) =
serde_json::from_str::<serde_json::Value>(unsigned.get())
{
if unsigned.get("redacted_because").is_some() {
continue;
}
}
} }
let userid = let userid =

89
src/pdu.rs

@ -6,15 +6,15 @@ use ruma::{
AnySyncStateEvent, EventType, StateEvent, AnySyncStateEvent, EventType, StateEvent,
}, },
serde::{CanonicalJsonObject, CanonicalJsonValue, Raw}, serde::{CanonicalJsonObject, CanonicalJsonValue, Raw},
state_res, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, state_res, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, UInt,
ServerSigningKeyId, UInt, UserId, UserId,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::{json, value::RawValue};
use std::{cmp::Ordering, collections::BTreeMap, convert::TryFrom}; use std::{cmp::Ordering, collections::BTreeMap, convert::TryFrom, sync::RwLock};
use tracing::warn; use tracing::warn;
#[derive(Clone, Deserialize, Serialize, Debug)] #[derive(Deserialize, Serialize, Debug)]
pub struct PduEvent { pub struct PduEvent {
pub event_id: EventId, pub event_id: EventId,
pub room_id: RoomId, pub room_id: RoomId,
@ -22,7 +22,9 @@ pub struct PduEvent {
pub origin_server_ts: UInt, pub origin_server_ts: UInt,
#[serde(rename = "type")] #[serde(rename = "type")]
pub kind: EventType, pub kind: EventType,
pub content: serde_json::Value, pub content: Box<serde_json::value::RawValue>,
#[serde(skip)]
pub parsed_content: RwLock<Option<serde_json::Value>>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub state_key: Option<String>, pub state_key: Option<String>,
pub prev_events: Vec<EventId>, pub prev_events: Vec<EventId>,
@ -30,16 +32,17 @@ pub struct PduEvent {
pub auth_events: Vec<EventId>, pub auth_events: Vec<EventId>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub redacts: Option<EventId>, pub redacts: Option<EventId>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub unsigned: BTreeMap<String, serde_json::Value>, pub unsigned: Option<Box<serde_json::value::RawValue>>,
pub hashes: EventHash, pub hashes: EventHash,
pub signatures: BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, String>>, #[serde(default, skip_serializing_if = "Option::is_none")]
pub signatures: Option<Box<serde_json::value::RawValue>>, // BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, String>>
} }
impl PduEvent { impl PduEvent {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn redact(&mut self, reason: &PduEvent) -> crate::Result<()> { pub fn redact(&mut self, reason: &PduEvent) -> crate::Result<()> {
self.unsigned.clear(); self.unsigned = None;
let allowed: &[&str] = match self.kind { let allowed: &[&str] = match self.kind {
EventType::RoomMember => &["membership"], EventType::RoomMember => &["membership"],
@ -59,10 +62,9 @@ impl PduEvent {
_ => &[], _ => &[],
}; };
let old_content = self let mut old_content =
.content serde_json::from_str::<BTreeMap<String, serde_json::Value>>(self.content.get())
.as_object_mut() .map_err(|_| Error::bad_database("PDU in db has invalid content."))?;
.ok_or_else(|| Error::bad_database("PDU in db has invalid content."))?;
let mut new_content = serde_json::Map::new(); let mut new_content = serde_json::Map::new();
@ -72,12 +74,29 @@ impl PduEvent {
} }
} }
self.unsigned.insert( self.unsigned = Some(RawValue::from_string(serde_json::to_string(&json!({
"redacted_because".to_owned(), "redacted_because": serde_json::to_value(reason).expect("to_value(PduEvent) always works")
serde_json::to_value(reason).expect("to_value(PduEvent) always works"), })).expect("to string always works")).expect("string is valid"));
);
self.content = RawValue::from_string(
serde_json::to_string(&new_content).expect("to string always works"),
)
.expect("string is valid");
Ok(())
}
self.content = new_content.into(); pub fn remove_transaction_id(&mut self) -> crate::Result<()> {
if let Some(unsigned) = &self.unsigned {
let mut unsigned =
serde_json::from_str::<BTreeMap<String, Box<RawValue>>>(unsigned.get())
.map_err(|_| Error::bad_database("Invalid unsigned in pdu event"))?;
unsigned.remove("transaction_id");
self.unsigned = Some(
RawValue::from_string(serde_json::to_string(&unsigned).expect("unsigned is valid"))
.expect("string is valid"),
);
}
Ok(()) Ok(())
} }
@ -265,8 +284,14 @@ impl state_res::Event for PduEvent {
&self.kind &self.kind
} }
fn content(&self) -> &serde_json::Value { fn content(&self) -> serde_json::Value {
&self.content self.parsed_content
.write()
.unwrap()
.get_or_insert_with(|| {
serde_json::to_value(&self.content).expect("content is valid json")
})
.clone()
} }
fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch { fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch {
@ -298,11 +323,17 @@ impl state_res::Event for PduEvent {
} }
fn signatures(&self) -> BTreeMap<Box<ServerName>, BTreeMap<ruma::ServerSigningKeyId, String>> { fn signatures(&self) -> BTreeMap<Box<ServerName>, BTreeMap<ruma::ServerSigningKeyId, String>> {
self.signatures.clone() self.signatures
.as_ref()
.map(|raw| serde_json::from_str(raw.get()).expect("string is valid signatures json"))
.unwrap_or_default()
} }
fn unsigned(&self) -> &BTreeMap<String, serde_json::Value> { fn unsigned(&self) -> BTreeMap<String, serde_json::Value> {
&self.unsigned self.unsigned
.as_ref()
.map(|raw| serde_json::from_str(raw.get()).expect("string is valid json"))
.unwrap_or_default()
} }
} }
@ -352,7 +383,7 @@ pub(crate) fn gen_event_id_canonical_json(
pub struct PduBuilder { pub struct PduBuilder {
#[serde(rename = "type")] #[serde(rename = "type")]
pub event_type: EventType, pub event_type: EventType,
pub content: serde_json::Value, pub content: Box<serde_json::value::RawValue>,
pub unsigned: Option<BTreeMap<String, serde_json::Value>>, pub unsigned: Option<BTreeMap<String, serde_json::Value>>,
pub state_key: Option<String>, pub state_key: Option<String>,
pub redacts: Option<EventId>, pub redacts: Option<EventId>,
@ -363,8 +394,12 @@ impl From<AnyInitialStateEvent> for PduBuilder {
fn from(event: AnyInitialStateEvent) -> Self { fn from(event: AnyInitialStateEvent) -> Self {
Self { Self {
event_type: EventType::from(event.event_type()), event_type: EventType::from(event.event_type()),
content: serde_json::value::to_value(event.content()) content: RawValue::from_string(
.expect("AnyStateEventContent came from JSON and can thus turn back into JSON."), serde_json::to_string(&event.content()).expect(
"AnyStateEventContent came from JSON and can thus turn back into JSON.",
),
)
.expect("string is valid"),
unsigned: None, unsigned: None,
state_key: Some(event.state_key().to_owned()), state_key: Some(event.state_key().to_owned()),
redacts: None, redacts: None,

201
src/server_server.rs

@ -39,7 +39,6 @@ use ruma::{
}, },
directory::{IncomingFilter, IncomingRoomNetwork}, directory::{IncomingFilter, IncomingRoomNetwork},
events::{ events::{
pdu::Pdu,
receipt::{ReceiptEvent, ReceiptEventContent}, receipt::{ReceiptEvent, ReceiptEventContent},
room::{ room::{
create::CreateEventContent, create::CreateEventContent,
@ -55,7 +54,9 @@ use ruma::{
uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName,
ServerSigningKeyId, ServerSigningKeyId,
}; };
use serde_json::value::RawValue;
use std::{ use std::{
cell::RefCell,
collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap, HashSet}, collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
convert::{TryFrom, TryInto}, convert::{TryFrom, TryInto},
fmt::Debug, fmt::Debug,
@ -1095,7 +1096,7 @@ pub(crate) async fn handle_incoming_pdu<'a>(
let start_time = Instant::now(); let start_time = Instant::now();
let event_id = pdu.event_id.clone(); let event_id = pdu.event_id.clone();
if let Err(e) = upgrade_outlier_to_timeline_pdu( if let Err(e) = upgrade_outlier_to_timeline_pdu(
pdu, &pdu,
json, json,
&create_event, &create_event,
origin, origin,
@ -1119,7 +1120,7 @@ pub(crate) async fn handle_incoming_pdu<'a>(
} }
upgrade_outlier_to_timeline_pdu( upgrade_outlier_to_timeline_pdu(
incoming_pdu, &incoming_pdu,
val, val,
&create_event, &create_event,
origin, origin,
@ -1139,8 +1140,7 @@ fn handle_outlier_pdu<'a>(
value: BTreeMap<String, CanonicalJsonValue>, value: BTreeMap<String, CanonicalJsonValue>,
db: &'a Database, db: &'a Database,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
) -> AsyncRecursiveType<'a, StdResult<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> ) -> AsyncRecursiveType<'a, StdResult<(PduEvent, BTreeMap<String, CanonicalJsonValue>), String>> {
{
Box::pin(async move { Box::pin(async move {
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
@ -1153,14 +1153,13 @@ fn handle_outlier_pdu<'a>(
// 2. Check signatures, otherwise drop // 2. Check signatures, otherwise drop
// 3. check content hash, redact if doesn't match // 3. check content hash, redact if doesn't match
let create_event_content = let create_event_content = serde_json::from_str::<CreateEventContent>(
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) create_event.content.get(),
.expect("Raw::from_value always works.") )
.deserialize() .map_err(|e| {
.map_err(|e| { warn!("Invalid create event: {}", e);
warn!("Invalid create event: {}", e); "Invalid create event in db.".to_owned()
"Invalid create event in db.".to_owned() })?;
})?;
let room_version_id = &create_event_content.room_version; let room_version_id = &create_event_content.room_version;
let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); let room_version = RoomVersion::new(room_version_id).expect("room version is supported");
@ -1241,7 +1240,7 @@ fn handle_outlier_pdu<'a>(
.expect("all auth events have state keys"), .expect("all auth events have state keys"),
)) { )) {
hash_map::Entry::Vacant(v) => { hash_map::Entry::Vacant(v) => {
v.insert(auth_event.clone()); v.insert(auth_event);
} }
hash_map::Entry::Occupied(_) => { hash_map::Entry::Occupied(_) => {
return Err( return Err(
@ -1253,11 +1252,7 @@ fn handle_outlier_pdu<'a>(
} }
// The original create event must be in the auth events // The original create event must be in the auth events
if auth_events if auth_events.get(&(EventType::RoomCreate, "".to_owned())) != Some(create_event) {
.get(&(EventType::RoomCreate, "".to_owned()))
.map(|a| a.as_ref())
!= Some(create_event)
{
return Err("Incoming event refers to wrong create event.".to_owned()); return Err("Incoming event refers to wrong create event.".to_owned());
} }
@ -1268,7 +1263,7 @@ fn handle_outlier_pdu<'a>(
db.rooms db.rooms
.get_pdu(&incoming_pdu.auth_events[0]) .get_pdu(&incoming_pdu.auth_events[0])
.map_err(|e| e.to_string())? .map_err(|e| e.to_string())?
.filter(|maybe_create| **maybe_create == *create_event) .filter(|maybe_create| maybe_create == create_event)
} else { } else {
None None
}; };
@ -1276,7 +1271,7 @@ fn handle_outlier_pdu<'a>(
if !state_res::event_auth::auth_check( if !state_res::event_auth::auth_check(
&room_version, &room_version,
&incoming_pdu, &incoming_pdu,
previous_create, previous_create.as_ref(),
None::<PduEvent>, // TODO: third party invite None::<PduEvent>, // TODO: third party invite
|k, s| auth_events.get(&(k.clone(), s.to_owned())), |k, s| auth_events.get(&(k.clone(), s.to_owned())),
) )
@ -1293,13 +1288,13 @@ fn handle_outlier_pdu<'a>(
.map_err(|_| "Failed to add pdu as outlier.".to_owned())?; .map_err(|_| "Failed to add pdu as outlier.".to_owned())?;
debug!("Added pdu as outlier."); debug!("Added pdu as outlier.");
Ok((Arc::new(incoming_pdu), val)) Ok((incoming_pdu, val))
}) })
} }
#[tracing::instrument(skip(incoming_pdu, val, create_event, origin, db, room_id, pub_key_map))] #[tracing::instrument(skip(incoming_pdu, val, create_event, origin, db, room_id, pub_key_map))]
async fn upgrade_outlier_to_timeline_pdu( async fn upgrade_outlier_to_timeline_pdu(
incoming_pdu: Arc<PduEvent>, incoming_pdu: &PduEvent,
val: BTreeMap<String, CanonicalJsonValue>, val: BTreeMap<String, CanonicalJsonValue>,
create_event: &PduEvent, create_event: &PduEvent,
origin: &ServerName, origin: &ServerName,
@ -1320,13 +1315,10 @@ async fn upgrade_outlier_to_timeline_pdu(
} }
let create_event_content = let create_event_content =
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) serde_json::from_str::<CreateEventContent>(create_event.content.get()).map_err(|e| {
.expect("Raw::from_value always works.") warn!("Invalid create event: {}", e);
.deserialize() "Invalid create event in db.".to_owned()
.map_err(|e| { })?;
warn!("Invalid create event: {}", e);
"Invalid create event in db.".to_owned()
})?;
let room_version_id = &create_event_content.room_version; let room_version_id = &create_event_content.room_version;
let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); let room_version = RoomVersion::new(room_version_id).expect("room version is supported");
@ -1434,16 +1426,20 @@ async fn upgrade_outlier_to_timeline_pdu(
fork_states.push(state); fork_states.push(state);
} }
let event_cache = RefCell::new(HashMap::new());
state_at_incoming_event = match state_res::resolve( state_at_incoming_event = match state_res::resolve(
room_version_id, room_version_id,
&fork_states, &fork_states,
auth_chain_sets, auth_chain_sets,
|id| { |id| match event_cache.borrow_mut().entry(id.clone()) {
let res = db.rooms.get_pdu(id); hash_map::Entry::Vacant(v) => {
if let Err(e) = &res { let res = db.rooms.get_pdu(id);
error!("LOOK AT ME Failed to fetch event: {}", e); if let Err(e) = &res {
error!("LOOK AT ME Failed to fetch event: {}", e);
}
v.insert(res.ok().flatten().map(Arc::new)).clone()
} }
res.ok().flatten() hash_map::Entry::Occupied(o) => o.get().as_ref().map(Arc::clone),
}, },
) { ) {
Ok(new_state) => Some( Ok(new_state) => Some(
@ -1554,7 +1550,7 @@ async fn upgrade_outlier_to_timeline_pdu(
db.rooms db.rooms
.get_pdu(&incoming_pdu.auth_events[0]) .get_pdu(&incoming_pdu.auth_events[0])
.map_err(|e| e.to_string())? .map_err(|e| e.to_string())?
.filter(|maybe_create| **maybe_create == *create_event) .filter(|maybe_create| maybe_create == create_event)
} else { } else {
None None
}; };
@ -1562,7 +1558,7 @@ async fn upgrade_outlier_to_timeline_pdu(
let check_result = state_res::event_auth::auth_check( let check_result = state_res::event_auth::auth_check(
&room_version, &room_version,
&incoming_pdu, &incoming_pdu,
previous_create.as_deref(), previous_create.as_ref(),
None::<PduEvent>, // TODO: third party invite None::<PduEvent>, // TODO: third party invite
|k, s| { |k, s| {
db.rooms db.rooms
@ -1646,7 +1642,7 @@ async fn upgrade_outlier_to_timeline_pdu(
let soft_fail = !state_res::event_auth::auth_check( let soft_fail = !state_res::event_auth::auth_check(
&room_version, &room_version,
&incoming_pdu, &incoming_pdu,
previous_create.as_deref(), previous_create.as_ref(),
None::<PduEvent>, None::<PduEvent>,
|k, s| auth_events.get(&(k.clone(), s.to_owned())), |k, s| auth_events.get(&(k.clone(), s.to_owned())),
) )
@ -1791,16 +1787,20 @@ async fn upgrade_outlier_to_timeline_pdu(
.collect::<Result<Vec<_>>>() .collect::<Result<Vec<_>>>()
.map_err(|_| "Failed to get_statekey_from_short.".to_owned())?; .map_err(|_| "Failed to get_statekey_from_short.".to_owned())?;
let event_cache = RefCell::new(HashMap::new());
let state = match state_res::resolve( let state = match state_res::resolve(
room_version_id, room_version_id,
fork_states, fork_states,
auth_chain_sets, auth_chain_sets,
|id| { |id| match event_cache.borrow_mut().entry(id.clone()) {
let res = db.rooms.get_pdu(id); hash_map::Entry::Vacant(v) => {
if let Err(e) = &res { let res = db.rooms.get_pdu(id);
error!("LOOK AT ME Failed to fetch event: {}", e); if let Err(e) = &res {
error!("LOOK AT ME Failed to fetch event: {}", e);
}
v.insert(res.ok().flatten().map(Arc::new)).clone()
} }
res.ok().flatten() hash_map::Entry::Occupied(o) => o.get().as_ref().map(Arc::clone),
}, },
) { ) {
Ok(new_state) => new_state, Ok(new_state) => new_state,
@ -1873,7 +1873,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
create_event: &'a PduEvent, create_event: &'a PduEvent,
room_id: &'a RoomId, room_id: &'a RoomId,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
) -> AsyncRecursiveType<'a, Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>> { ) -> AsyncRecursiveType<'a, Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>> {
Box::pin(async move { Box::pin(async move {
let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) {
hash_map::Entry::Vacant(e) => { hash_map::Entry::Vacant(e) => {
@ -2669,13 +2669,10 @@ pub fn create_join_event_template_route(
let create_event_content = create_event let create_event_content = create_event
.as_ref() .as_ref()
.map(|create_event| { .map(|create_event| {
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) serde_json::from_str::<CreateEventContent>(create_event.content.get()).map_err(|e| {
.expect("Raw::from_value always works.") warn!("Invalid create event: {}", e);
.deserialize() Error::bad_database("Invalid create event in db.")
.map_err(|e| { })
warn!("Invalid create event: {}", e);
Error::bad_database("Invalid create event in db.")
})
}) })
.transpose()?; .transpose()?;
@ -2702,16 +2699,19 @@ pub fn create_join_event_template_route(
)); ));
} }
let content = serde_json::to_value(MemberEventContent { let content = RawValue::from_string(
avatar_url: None, serde_json::to_string(&MemberEventContent {
blurhash: None, avatar_url: None,
displayname: None, blurhash: None,
is_direct: None, displayname: None,
membership: MembershipState::Join, is_direct: None,
third_party_invite: None, membership: MembershipState::Join,
reason: None, third_party_invite: None,
}) reason: None,
.expect("member event is valid value"); })
.expect("member event is valid value"),
)
.expect("string is valid");
let state_key = body.user_id.to_string(); let state_key = body.user_id.to_string();
let kind = EventType::RoomMember; let kind = EventType::RoomMember;
@ -2738,7 +2738,7 @@ pub fn create_join_event_template_route(
unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone()); unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone());
unsigned.insert( unsigned.insert(
"prev_sender".to_owned(), "prev_sender".to_owned(),
serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"), serde_json::from_str(prev_pdu.sender.as_str()).expect("UserId is valid string"),
); );
} }
@ -2751,6 +2751,7 @@ pub fn create_join_event_template_route(
.expect("time is valid"), .expect("time is valid"),
kind, kind,
content, content,
parsed_content: RwLock::new(None),
state_key: Some(state_key), state_key: Some(state_key),
prev_events, prev_events,
depth, depth,
@ -2759,17 +2760,26 @@ pub fn create_join_event_template_route(
.map(|(_, pdu)| pdu.event_id.clone()) .map(|(_, pdu)| pdu.event_id.clone())
.collect(), .collect(),
redacts: None, redacts: None,
unsigned, unsigned: if unsigned.is_empty() {
None
} else {
Some(
RawValue::from_string(
serde_json::to_string(&unsigned).expect("to_string always works"),
)
.expect("string is valid"),
)
},
hashes: ruma::events::pdu::EventHash { hashes: ruma::events::pdu::EventHash {
sha256: "aaa".to_owned(), sha256: "aaa".to_owned(),
}, },
signatures: BTreeMap::new(), signatures: None,
}; };
let auth_check = state_res::auth_check( let auth_check = state_res::auth_check(
&room_version, &room_version,
&pdu, &pdu,
create_prev_event.as_deref(), create_prev_event,
None::<PduEvent>, // TODO: third_party_invite None::<PduEvent>, // TODO: third_party_invite
|k, s| auth_events.get(&(k.clone(), s.to_owned())), |k, s| auth_events.get(&(k.clone(), s.to_owned())),
) )
@ -3279,45 +3289,13 @@ pub(crate) async fn fetch_required_signing_keys(
// Gets a list of servers for which we don't have the signing key yet. We go over // Gets a list of servers for which we don't have the signing key yet. We go over
// the PDUs and either cache the key or add it to the list that needs to be retrieved. // the PDUs and either cache the key or add it to the list that needs to be retrieved.
fn get_server_keys_from_cache( pub fn get_server_keys_from_cache(
pdu: &Raw<Pdu>, pdu: &CanonicalJsonObject,
servers: &mut BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>>, servers: &mut BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>>,
room_version: &RoomVersionId,
pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap<String, BTreeMap<String, String>>>, pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap<String, BTreeMap<String, String>>>,
db: &Database, db: &Database,
) -> Result<()> { ) -> Result<()> {
let value = serde_json::from_str::<CanonicalJsonObject>(pdu.json().get()).map_err(|e| { let signatures = pdu
error!("Invalid PDU in server response: {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
let event_id = EventId::try_from(&*format!(
"${}",
ruma::signatures::reference_hash(&value, room_version)
.expect("ruma can calculate reference hashes")
))
.expect("ruma's reference hashes are valid event ids");
if let Some((time, tries)) = db
.globals
.bad_event_ratelimiter
.read()
.unwrap()
.get(&event_id)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
debug!("Backing off from {}", event_id);
return Err(Error::BadServerResponse("bad event, still backing off"));
}
}
let signatures = value
.get("signatures") .get("signatures")
.ok_or(Error::BadServerResponse( .ok_or(Error::BadServerResponse(
"No signatures in server response pdu.", "No signatures in server response pdu.",
@ -3369,30 +3347,11 @@ fn get_server_keys_from_cache(
} }
pub(crate) async fn fetch_join_signing_keys( pub(crate) async fn fetch_join_signing_keys(
event: &create_join_event::v2::Response, servers: Arc<RwLock<BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>>>>,
room_version: &RoomVersionId,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>, pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
db: &Database, db: &Database,
) -> Result<()> { ) -> Result<()> {
let mut servers = let mut servers = std::mem::take(&mut *servers.write().unwrap());
BTreeMap::<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>>::new();
{
let mut pkm = pub_key_map
.write()
.map_err(|_| Error::bad_database("RwLock is poisoned."))?;
// Try to fetch keys, failure is okay
// Servers we couldn't find in the cache will be added to `servers`
for pdu in &event.room_state.state {
let _ = get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm, db);
}
for pdu in &event.room_state.auth_chain {
let _ = get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm, db);
}
drop(pkm);
}
if servers.is_empty() { if servers.is_empty() {
// We had all keys locally // We had all keys locally

Loading…
Cancel
Save