@ -5,6 +5,7 @@ use crate::{
@@ -5,6 +5,7 @@ use crate::{
server_server , utils , ConduitResult , Database , Error , Result , Ruma ,
} ;
use member ::{ MemberEventContent , MembershipState } ;
use rayon ::prelude ::* ;
use ruma ::{
api ::{
client ::{
@ -15,7 +16,10 @@ use ruma::{
@@ -15,7 +16,10 @@ use ruma::{
unban_user , IncomingThirdPartySigned ,
} ,
} ,
federation ::{ self , membership ::create_invite } ,
federation ::{
self ,
membership ::{ create_invite , create_join_event } ,
} ,
} ,
events ::{
pdu ::Pdu ,
@ -26,13 +30,13 @@ use ruma::{
@@ -26,13 +30,13 @@ use ruma::{
state_res ::{ self , RoomVersion } ,
uint , EventId , RoomId , RoomVersionId , ServerName , UserId ,
} ;
use serde_json ::value ::RawValue ;
use std ::{
collections ::{ hash_map ::Entry , BTreeMap , HashMap , HashSet } ,
collections ::{ BTreeMap , HashMap , HashSet } ,
convert ::{ TryFrom , TryInto } ,
sync ::{ Arc , RwLock } ,
time ::{ Duration , Instant } ,
} ;
use tracing ::{ debug , error , warn } ;
use tracing ::{ error , warn } ;
#[ cfg(feature = " conduit_bin " ) ]
use rocket ::{ get , post } ;
@ -204,7 +208,7 @@ pub async fn kick_user_route(
@@ -204,7 +208,7 @@ pub async fn kick_user_route(
) -> ConduitResult < kick_user ::Response > {
let sender_user = body . sender_user . as_ref ( ) . expect ( "user is authenticated" ) ;
let mut event = serde_json ::from_value ::< Raw < ruma ::events ::room ::member ::MemberEventContent > > (
let mut event = serde_json ::from_str ::< Raw < ruma ::events ::room ::member ::MemberEventContent > > (
db . rooms
. room_state_get (
& body . room_id ,
@ -216,7 +220,7 @@ pub async fn kick_user_route(
@@ -216,7 +220,7 @@ pub async fn kick_user_route(
"Cannot kick member that's not in the room." ,
) ) ?
. content
. clone ( ) ,
. get ( ) ,
)
. expect ( "Raw::from_value always works" )
. deserialize ( )
@ -238,7 +242,10 @@ pub async fn kick_user_route(
@@ -238,7 +242,10 @@ pub async fn kick_user_route(
db . rooms . build_and_append_pdu (
PduBuilder {
event_type : EventType ::RoomMember ,
content : serde_json ::to_value ( event ) . expect ( "event is valid, we just created it" ) ,
content : RawValue ::from_string (
serde_json ::to_string ( & event ) . expect ( "event is valid, we just created it" ) ,
)
. expect ( "string is valid" ) ,
unsigned : None ,
state_key : Some ( body . user_id . to_string ( ) ) ,
redacts : None ,
@ -290,12 +297,9 @@ pub async fn ban_user_route(
@@ -290,12 +297,9 @@ pub async fn ban_user_route(
reason : None ,
} ) ,
| event | {
let mut event = serde_json ::from_value ::< Raw < member ::MemberEventContent > > (
event . content . clone ( ) ,
)
. expect ( "Raw::from_value always works" )
. deserialize ( )
. map_err ( | _ | Error ::bad_database ( "Invalid member event in database." ) ) ? ;
let mut event =
serde_json ::from_str ::< member ::MemberEventContent > ( event . content . get ( ) )
. map_err ( | _ | Error ::bad_database ( "Invalid member event in database." ) ) ? ;
event . membership = ruma ::events ::room ::member ::MembershipState ::Ban ;
Ok ( event )
} ,
@ -314,7 +318,10 @@ pub async fn ban_user_route(
@@ -314,7 +318,10 @@ pub async fn ban_user_route(
db . rooms . build_and_append_pdu (
PduBuilder {
event_type : EventType ::RoomMember ,
content : serde_json ::to_value ( event ) . expect ( "event is valid, we just created it" ) ,
content : RawValue ::from_string (
serde_json ::to_string ( & event ) . expect ( "event is valid, we just created it" ) ,
)
. expect ( "string is valid" ) ,
unsigned : None ,
state_key : Some ( body . user_id . to_string ( ) ) ,
redacts : None ,
@ -346,7 +353,7 @@ pub async fn unban_user_route(
@@ -346,7 +353,7 @@ pub async fn unban_user_route(
) -> ConduitResult < unban_user ::Response > {
let sender_user = body . sender_user . as_ref ( ) . expect ( "user is authenticated" ) ;
let mut event = serde_json ::from_value ::< Raw < ruma ::events ::room ::member ::MemberEventContent > > (
let mut event = serde_json ::from_str :: < ruma ::events ::room ::member ::MemberEventContent > (
db . rooms
. room_state_get (
& body . room_id ,
@ -358,10 +365,8 @@ pub async fn unban_user_route(
@@ -358,10 +365,8 @@ pub async fn unban_user_route(
"Cannot unban a user who is not banned." ,
) ) ?
. content
. clone ( ) ,
. get ( ) ,
)
. expect ( "from_value::<Raw<..>> can never fail" )
. deserialize ( )
. map_err ( | _ | Error ::bad_database ( "Invalid member event in database." ) ) ? ;
event . membership = ruma ::events ::room ::member ::MembershipState ::Leave ;
@ -379,7 +384,10 @@ pub async fn unban_user_route(
@@ -379,7 +384,10 @@ pub async fn unban_user_route(
db . rooms . build_and_append_pdu (
PduBuilder {
event_type : EventType ::RoomMember ,
content : serde_json ::to_value ( event ) . expect ( "event is valid, we just created it" ) ,
content : RawValue ::from_string (
serde_json ::to_string ( & event ) . expect ( "event is valid, we just created it" ) ,
)
. expect ( "string is valid" ) ,
unsigned : None ,
state_key : Some ( body . user_id . to_string ( ) ) ,
redacts : None ,
@ -563,6 +571,7 @@ async fn join_room_by_id_helper(
@@ -563,6 +571,7 @@ async fn join_room_by_id_helper(
} ,
)
. await ;
warn ! ( "Make join done" ) ;
make_join_response_and_server = make_join_response . map ( | r | ( r , remote_server ) ) ;
@ -658,93 +667,193 @@ async fn join_room_by_id_helper(
@@ -658,93 +667,193 @@ async fn join_room_by_id_helper(
)
. await ? ;
warn ! ( "Send join done" ) ;
db . rooms . get_or_create_shortroomid ( room_id , & db . globals ) ? ;
let pdu = PduEvent ::from_id_val ( & event_id , join_event . clone ( ) )
. map_err ( | _ | Error ::BadServerResponse ( "Invalid join event PDU." ) ) ? ;
let mut state = Hash Map ::new ( ) ;
let pub_key_map = RwLock ::new ( BTreeMap ::new ( ) ) ;
let pub_key_map = Arc ::new ( RwLock ::new ( BTree Map ::new ( ) ) ) ;
let missing_servers = Arc ::new ( RwLock ::new ( BTreeMap ::new ( ) ) ) ;
server_server ::fetch_join_signing_keys (
& send_join_response ,
& room_version ,
& pub_key_map ,
db ,
)
. await ? ;
for result in send_join_response
. room_state
. state
. iter ( )
. map ( | pdu | validate_and_add_event_id ( pdu , & room_version , & pub_key_map , db ) )
{
let ( event_id , value ) = match result {
Ok ( t ) = > t ,
Err ( _ ) = > continue ,
} ;
let pdu = PduEvent ::from_id_val ( & event_id , value . clone ( ) ) . map_err ( | e | {
warn ! ( "{:?}: {}" , value , e ) ;
Error ::BadServerResponse ( "Invalid PDU in send_join response." )
} ) ? ;
db . rooms . add_pdu_outlier ( & event_id , & value ) ? ;
if let Some ( state_key ) = & pdu . state_key {
let shortstatekey =
db . rooms
. get_or_create_shortstatekey ( & pdu . kind , state_key , & db . globals ) ? ;
state . insert ( shortstatekey , pdu . event_id . clone ( ) ) ;
}
}
let incoming_shortstatekey = db . rooms . get_or_create_shortstatekey (
& pdu . kind ,
pdu . state_key
. as_ref ( )
. expect ( "Pdu is a membership state event" ) ,
& db . globals ,
) ? ;
state . insert ( incoming_shortstatekey , pdu . event_id . clone ( ) ) ;
let create_join_event ::RoomState {
state : mut room_state_state ,
auth_chain : mut room_state_auth_chain ,
} = send_join_response . room_state ;
let create_shortstatekey = db
. rooms
. get_shortstatekey ( & EventType ::RoomCreate , "" ) ?
. expect ( "Room exists" ) ;
if state . get ( & create_shortstatekey ) . is_none ( ) {
let mut saw_create_event = false ;
warn ! ( "Parsing send join response state" ) ;
const CHUNK_SIZE : usize = 500 ;
let mut parsed_state = room_state_state
. par_chunks_mut ( CHUNK_SIZE )
. filter_map ( | pdus | {
let mut r = HashMap ::with_capacity ( CHUNK_SIZE ) ;
for pdu in pdus {
let ( id , value ) = get_event_id ( & pdu , & room_version ) . ok ( ) ? ;
r . insert ( id , value ) ;
}
let mut missing_servers = missing_servers . write ( ) . unwrap ( ) ;
let mut pub_key_map = pub_key_map . write ( ) . unwrap ( ) ;
for ( _ , value ) in & r {
server_server ::get_server_keys_from_cache (
& value ,
& mut missing_servers ,
& mut pub_key_map ,
& db ,
)
. ok ( ) ? ;
}
Some ( r )
} )
. collect ::< Vec < _ > > ( ) ;
warn ! ( "Parsing send join response auth chain" ) ;
let mut parsed_chain = room_state_auth_chain
. par_chunks_mut ( CHUNK_SIZE )
. filter_map ( | pdus | {
let mut r = HashMap ::with_capacity ( CHUNK_SIZE ) ;
for pdu in pdus {
let ( id , value ) = get_event_id ( & pdu , & room_version ) . ok ( ) ? ;
r . insert ( id , value ) ;
}
let mut missing_servers = missing_servers . write ( ) . unwrap ( ) ;
let mut pub_key_map = pub_key_map . write ( ) . unwrap ( ) ;
for ( _ , value ) in & r {
server_server ::get_server_keys_from_cache (
& value ,
& mut missing_servers ,
& mut pub_key_map ,
& db ,
)
. ok ( ) ? ;
}
Some ( r )
} )
. collect ::< Vec < _ > > ( ) ;
warn ! ( "Fetching send join signing keys" ) ;
server_server ::fetch_join_signing_keys ( missing_servers , & pub_key_map , db ) . await ? ;
warn ! ( "Validating state" ) ;
parsed_state . par_iter_mut ( ) . for_each ( | chunk | {
let mut bad_events = Vec ::new ( ) ;
for ( event_id , value ) in chunk . iter_mut ( ) {
if let Err ( e ) = ruma ::signatures ::verify_event (
& * pub_key_map . read ( ) . unwrap ( ) ,
& value ,
& room_version ,
) {
warn ! ( "Event {} failed verification {:?} {}" , event_id , value , e ) ;
bad_events . push ( event_id . clone ( ) ) ;
continue ;
}
value . insert (
"event_id" . to_owned ( ) ,
CanonicalJsonValue ::String ( event_id . as_str ( ) . to_owned ( ) ) ,
) ;
}
for id in bad_events {
chunk . remove ( & id ) ;
}
} ) ;
warn ! ( "Inserting state" ) ;
db . rooms
. add_pdu_outlier_batch ( & mut parsed_state . iter ( ) . flatten ( ) ) ? ;
warn ! ( "Compressing state" ) ;
let state = parsed_state
. iter ( )
. flatten ( )
. map ( | ( event_id , value ) | {
let kind = if let Some ( s ) = value . get ( "type" ) . and_then ( | s | s . as_str ( ) ) {
s
} else {
warn ! ( "Event {} has no type: {:?}" , event_id , value ) ;
return Ok ( None ) ;
} ;
if let Some ( state_key ) = value . get ( "state_key" ) . and_then ( | s | s . as_str ( ) ) {
let shortstatekey = db . rooms . get_or_create_shortstatekey (
& EventType ::from ( kind ) ,
state_key ,
& db . globals ,
) ? ;
if shortstatekey = = create_shortstatekey {
saw_create_event = true ;
}
Ok ( Some ( db . rooms . compress_state_event (
shortstatekey ,
& event_id ,
& db . globals ,
) ? ) )
} else {
Ok ( None )
}
} )
. filter_map ( | r | r . transpose ( ) )
. collect ::< Result < HashSet < _ > > > ( ) ? ;
if ! saw_create_event {
return Err ( Error ::BadServerResponse ( "State contained no create event." ) ) ;
}
db . rooms . force_state (
warn ! ( "Validating chain" ) ;
parsed_chain . par_iter_mut ( ) . for_each ( | chunk | {
let mut bad_events = Vec ::new ( ) ;
for ( event_id , value ) in chunk . iter_mut ( ) {
if let Err ( e ) = ruma ::signatures ::verify_event (
& * pub_key_map . read ( ) . unwrap ( ) ,
& value ,
& room_version ,
) {
warn ! ( "Event {} failed verification {:?} {}" , event_id , value , e ) ;
bad_events . push ( event_id . clone ( ) ) ;
continue ;
}
value . insert (
"event_id" . to_owned ( ) ,
CanonicalJsonValue ::String ( event_id . as_str ( ) . to_owned ( ) ) ,
) ;
}
for id in bad_events {
chunk . remove ( & id ) ;
}
} ) ;
warn ! ( "Inserting chain" ) ;
db . rooms
. add_pdu_outlier_batch ( & mut parsed_chain . iter ( ) . flatten ( ) ) ? ;
warn ! ( "Forcing state of room" ) ;
db . rooms . force_state_new (
room_id ,
state
. into_iter ( )
. map ( | ( k , id ) | db . rooms . compress_state_event ( k , & id , & db . globals ) )
. collect ::< Result < HashSet < _ > > > ( ) ? ,
state ,
& mut parsed_state . iter ( ) . flat_map ( | m | m . values ( ) ) ,
db ,
) ? ;
for result in send_join_response
. room_state
. auth_chain
. iter ( )
. map ( | pdu | validate_and_add_event_id ( pdu , & room_version , & pub_key_map , db ) )
{
let ( event_id , value ) = match result {
Ok ( t ) = > t ,
Err ( _ ) = > continue ,
} ;
db . rooms . add_pdu_outlier ( & event_id , & value ) ? ;
}
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.
warn ! ( "Appending join event to state" ) ;
let statehashid = db . rooms . append_to_state ( & pdu , & db . globals ) ? ;
warn ! ( "Adding join event to db" ) ;
db . rooms . append_pdu (
& pdu ,
utils ::to_canonical_object ( & pdu ) . expect ( "Pdu is valid canonical object" ) ,
@ -752,6 +861,7 @@ async fn join_room_by_id_helper(
@@ -752,6 +861,7 @@ async fn join_room_by_id_helper(
db ,
) ? ;
warn ! ( "Updating room state to join event" ) ;
// We set the room state after inserting the pdu, so that we never have a moment in time
// where events in the current room state do not exist
db . rooms . set_room_state ( room_id , statehashid ) ? ;
@ -769,7 +879,10 @@ async fn join_room_by_id_helper(
@@ -769,7 +879,10 @@ async fn join_room_by_id_helper(
db . rooms . build_and_append_pdu (
PduBuilder {
event_type : EventType ::RoomMember ,
content : serde_json ::to_value ( event ) . expect ( "event is valid, we just created it" ) ,
content : RawValue ::from_string (
serde_json ::to_string ( & event ) . expect ( "event is valid, we just created it" ) ,
)
. expect ( "string is valid" ) ,
unsigned : None ,
state_key : Some ( sender_user . to_string ( ) ) ,
redacts : None ,
@ -788,16 +901,15 @@ async fn join_room_by_id_helper(
@@ -788,16 +901,15 @@ async fn join_room_by_id_helper(
Ok ( join_room_by_id ::Response ::new ( room_id . clone ( ) ) . into ( ) )
}
fn validate_and_add _event_id(
fn get _event_id(
pdu : & Raw < Pdu > ,
room_version : & RoomVersionId ,
pub_key_map : & RwLock < BTreeMap < String , BTreeMap < String , String > > > ,
db : & Database ,
) -> Result < ( EventId , CanonicalJsonObject ) > {
let mut value = serde_json ::from_str ::< CanonicalJsonObject > ( pdu . json ( ) . get ( ) ) . map_err ( | e | {
error ! ( "Invalid PDU in server response: {:?}: {:?}" , pdu , e ) ;
let value = serde_json ::from_str ::< CanonicalJsonObject > ( pdu . json ( ) . get ( ) ) . map_err ( | e | {
warn ! ( "Invalid PDU in server response: {:?}: {:?}" , pdu , e ) ;
Error ::BadServerResponse ( "Invalid PDU in server response" )
} ) ? ;
let event_id = EventId ::try_from ( & * format! (
"${}" ,
ruma ::signatures ::reference_hash ( & value , room_version )
@ -805,49 +917,6 @@ fn validate_and_add_event_id(
@@ -805,49 +917,6 @@ fn validate_and_add_event_id(
) )
. expect ( "ruma's reference hashes are valid event ids" ) ;
let back_off = | id | match db . globals . bad_event_ratelimiter . write ( ) . unwrap ( ) . entry ( id ) {
Entry ::Vacant ( e ) = > {
e . insert ( ( Instant ::now ( ) , 1 ) ) ;
}
Entry ::Occupied ( mut e ) = > * e . get_mut ( ) = ( Instant ::now ( ) , e . get ( ) . 1 + 1 ) ,
} ;
if let Some ( ( time , tries ) ) = db
. globals
. bad_event_ratelimiter
. read ( )
. unwrap ( )
. get ( & event_id )
{
// Exponential backoff
let mut min_elapsed_duration = Duration ::from_secs ( 30 ) * ( * tries ) * ( * tries ) ;
if min_elapsed_duration > Duration ::from_secs ( 60 * 60 * 24 ) {
min_elapsed_duration = Duration ::from_secs ( 60 * 60 * 24 ) ;
}
if time . elapsed ( ) < min_elapsed_duration {
debug ! ( "Backing off from {}" , event_id ) ;
return Err ( Error ::BadServerResponse ( "bad event, still backing off" ) ) ;
}
}
if let Err ( e ) = ruma ::signatures ::verify_event (
& * pub_key_map
. read ( )
. map_err ( | _ | Error ::bad_database ( "RwLock is poisoned." ) ) ? ,
& value ,
room_version ,
) {
warn ! ( "Event {} failed verification {:?} {}" , event_id , pdu , e ) ;
back_off ( event_id ) ;
return Err ( Error ::BadServerResponse ( "Event failed verification." ) ) ;
}
value . insert (
"event_id" . to_owned ( ) ,
CanonicalJsonValue ::String ( event_id . as_str ( ) . to_owned ( ) ) ,
) ;
Ok ( ( event_id , value ) )
}
@ -884,7 +953,7 @@ pub(crate) async fn invite_helper<'a>(
@@ -884,7 +953,7 @@ pub(crate) async fn invite_helper<'a>(
let create_event_content = create_event
. as_ref ( )
. map ( | create_event | {
serde_json ::from_value ::< Raw < CreateEventContent > > ( create_event . content . clone ( ) )
serde_json ::from_str ::< Raw < CreateEventContent > > ( create_event . content . get ( ) )
. expect ( "Raw::from_value always works." )
. deserialize ( )
. map_err ( | e | {
@ -910,16 +979,19 @@ pub(crate) async fn invite_helper<'a>(
@@ -910,16 +979,19 @@ pub(crate) async fn invite_helper<'a>(
let room_version =
RoomVersion ::new ( & room_version_id ) . expect ( "room version is supported" ) ;
let content = serde_json ::to_value ( MemberEventContent {
avatar_url : None ,
displayname : None ,
is_direct : Some ( is_direct ) ,
membership : MembershipState ::Invite ,
third_party_invite : None ,
blurhash : None ,
reason : None ,
} )
. expect ( "member event is valid value" ) ;
let content = RawValue ::from_string (
serde_json ::to_string ( & MemberEventContent {
avatar_url : None ,
displayname : None ,
is_direct : Some ( is_direct ) ,
membership : MembershipState ::Invite ,
third_party_invite : None ,
blurhash : None ,
reason : None ,
} )
. expect ( "member event is valid value" ) ,
)
. expect ( "string is valid" ) ;
let state_key = user_id . to_string ( ) ;
let kind = EventType ::RoomMember ;
@ -946,7 +1018,7 @@ pub(crate) async fn invite_helper<'a>(
@@ -946,7 +1018,7 @@ pub(crate) async fn invite_helper<'a>(
unsigned . insert ( "prev_content" . to_owned ( ) , prev_pdu . content . clone ( ) ) ;
unsigned . insert (
"prev_sender" . to_owned ( ) ,
serde_json ::to_value ( & prev_pdu . sender ) . expect ( "UserId::to_value always works " ) ,
serde_json ::from_str ( prev_pdu . sender . as_str ( ) ) . expect ( "UserId is valid string " ) ,
) ;
}
@ -959,6 +1031,7 @@ pub(crate) async fn invite_helper<'a>(
@@ -959,6 +1031,7 @@ pub(crate) async fn invite_helper<'a>(
. expect ( "time is valid" ) ,
kind ,
content ,
parsed_content : RwLock ::new ( None ) ,
state_key : Some ( state_key ) ,
prev_events ,
depth ,
@ -967,11 +1040,20 @@ pub(crate) async fn invite_helper<'a>(
@@ -967,11 +1040,20 @@ pub(crate) async fn invite_helper<'a>(
. map ( | ( _ , pdu ) | pdu . event_id . clone ( ) )
. collect ( ) ,
redacts : None ,
unsigned ,
unsigned : if unsigned . is_empty ( ) {
None
} else {
Some (
RawValue ::from_string (
serde_json ::to_string ( & unsigned ) . expect ( "to_string always works" ) ,
)
. expect ( "string is valid" ) ,
)
} ,
hashes : ruma ::events ::pdu ::EventHash {
sha256 : "aaa" . to_owned ( ) ,
} ,
signatures : BTreeMap ::new ( ) ,
signatures : None ,
} ;
let auth_check = state_res ::auth_check (
@ -1116,16 +1198,19 @@ pub(crate) async fn invite_helper<'a>(
@@ -1116,16 +1198,19 @@ pub(crate) async fn invite_helper<'a>(
db . rooms . build_and_append_pdu (
PduBuilder {
event_type : EventType ::RoomMember ,
content : serde_json ::to_value ( member ::MemberEventContent {
membership : member ::MembershipState ::Invite ,
displayname : db . users . displayname ( user_id ) ? ,
avatar_url : db . users . avatar_url ( user_id ) ? ,
is_direct : Some ( is_direct ) ,
third_party_invite : None ,
blurhash : db . users . blurhash ( user_id ) ? ,
reason : None ,
} )
. expect ( "event is valid, we just created it" ) ,
content : RawValue ::from_string (
serde_json ::to_string ( & member ::MemberEventContent {
membership : member ::MembershipState ::Invite ,
displayname : db . users . displayname ( user_id ) ? ,
avatar_url : db . users . avatar_url ( user_id ) ? ,
is_direct : Some ( is_direct ) ,
third_party_invite : None ,
blurhash : db . users . blurhash ( user_id ) ? ,
reason : None ,
} )
. expect ( "event is valid, we just created it" ) ,
)
. expect ( "string is valid" ) ,
unsigned : None ,
state_key : Some ( user_id . to_string ( ) ) ,
redacts : None ,