@ -1,12 +1,13 @@
use crate ::{
use crate ::{
client_server ::{ self , claim_keys_helper , get_keys_helper } ,
client_server ::{ self , claim_keys_helper , get_keys_helper } ,
database ::DatabaseGuard ,
utils , ConduitResult , Database , Error , PduEvent , Result , Ruma ,
utils , ConduitResult , Database , Error , PduEvent , Result , Ruma ,
} ;
} ;
use get_profile_information ::v1 ::ProfileField ;
use get_profile_information ::v1 ::ProfileField ;
use http ::header ::{ HeaderValue , AUTHORIZATION , HOST } ;
use http ::header ::{ HeaderValue , AUTHORIZATION , HOST } ;
use log ::{ debug , error , info , trace , warn } ;
use log ::{ debug , error , info , trace , warn } ;
use regex ::Regex ;
use regex ::Regex ;
use rocket ::{ response ::content ::Json , State } ;
use rocket ::response ::content ::Json ;
use ruma ::{
use ruma ::{
api ::{
api ::{
client ::error ::{ Error as RumaError , ErrorKind } ,
client ::error ::{ Error as RumaError , ErrorKind } ,
@ -45,7 +46,7 @@ use ruma::{
receipt ::ReceiptType ,
receipt ::ReceiptType ,
serde ::Raw ,
serde ::Raw ,
signatures ::{ CanonicalJsonObject , CanonicalJsonValue } ,
signatures ::{ CanonicalJsonObject , CanonicalJsonValue } ,
state_res ::{ self , Event , RoomVersion , StateMap } ,
state_res ::{ self , RoomVersion , StateMap } ,
to_device ::DeviceIdOrAllDevices ,
to_device ::DeviceIdOrAllDevices ,
uint , EventId , MilliSecondsSinceUnixEpoch , RoomId , RoomVersionId , ServerName ,
uint , EventId , MilliSecondsSinceUnixEpoch , RoomId , RoomVersionId , ServerName ,
ServerSigningKeyId , UserId ,
ServerSigningKeyId , UserId ,
@ -432,7 +433,7 @@ pub async fn request_well_known(
#[ cfg_attr(feature = " conduit_bin " , get( " /_matrix/federation/v1/version " )) ]
#[ cfg_attr(feature = " conduit_bin " , get( " /_matrix/federation/v1/version " )) ]
#[ tracing::instrument(skip(db)) ]
#[ tracing::instrument(skip(db)) ]
pub fn get_server_version_route (
pub fn get_server_version_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
) -> ConduitResult < get_server_version ::v1 ::Response > {
) -> ConduitResult < get_server_version ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
return Err ( Error ::bad_config ( "Federation is disabled." ) ) ;
return Err ( Error ::bad_config ( "Federation is disabled." ) ) ;
@ -450,7 +451,7 @@ pub fn get_server_version_route(
// Response type for this endpoint is Json because we need to calculate a signature for the response
// Response type for this endpoint is Json because we need to calculate a signature for the response
#[ cfg_attr(feature = " conduit_bin " , get( " /_matrix/key/v2/server " )) ]
#[ cfg_attr(feature = " conduit_bin " , get( " /_matrix/key/v2/server " )) ]
#[ tracing::instrument(skip(db)) ]
#[ tracing::instrument(skip(db)) ]
pub fn get_server_keys_route ( db : State < ' _ , Arc < Database > > ) -> Json < String > {
pub fn get_server_keys_route ( db : DatabaseGuard ) -> Json < String > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
// TODO: Use proper types
// TODO: Use proper types
return Json ( "Federation is disabled." . to_owned ( ) ) ;
return Json ( "Federation is disabled." . to_owned ( ) ) ;
@ -497,7 +498,7 @@ pub fn get_server_keys_route(db: State<'_, Arc<Database>>) -> Json<String> {
#[ cfg_attr(feature = " conduit_bin " , get( " /_matrix/key/v2/server/<_> " )) ]
#[ cfg_attr(feature = " conduit_bin " , get( " /_matrix/key/v2/server/<_> " )) ]
#[ tracing::instrument(skip(db)) ]
#[ tracing::instrument(skip(db)) ]
pub fn get_server_keys_deprecated_route ( db : State < ' _ , Arc < Database > > ) -> Json < String > {
pub fn get_server_keys_deprecated_route ( db : DatabaseGuard ) -> Json < String > {
get_server_keys_route ( db )
get_server_keys_route ( db )
}
}
@ -507,7 +508,7 @@ pub fn get_server_keys_deprecated_route(db: State<'_, Arc<Database>>) -> Json<St
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub async fn get_public_rooms_filtered_route (
pub async fn get_public_rooms_filtered_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < get_public_rooms_filtered ::v1 ::Request < ' _ > > ,
body : Ruma < get_public_rooms_filtered ::v1 ::Request < ' _ > > ,
) -> ConduitResult < get_public_rooms_filtered ::v1 ::Response > {
) -> ConduitResult < get_public_rooms_filtered ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -551,7 +552,7 @@ pub async fn get_public_rooms_filtered_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub async fn get_public_rooms_route (
pub async fn get_public_rooms_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < get_public_rooms ::v1 ::Request < ' _ > > ,
body : Ruma < get_public_rooms ::v1 ::Request < ' _ > > ,
) -> ConduitResult < get_public_rooms ::v1 ::Response > {
) -> ConduitResult < get_public_rooms ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -595,7 +596,7 @@ pub async fn get_public_rooms_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub async fn send_transaction_message_route (
pub async fn send_transaction_message_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < send_transaction_message ::v1 ::Request < ' _ > > ,
body : Ruma < send_transaction_message ::v1 ::Request < ' _ > > ,
) -> ConduitResult < send_transaction_message ::v1 ::Response > {
) -> ConduitResult < send_transaction_message ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -624,13 +625,44 @@ pub async fn send_transaction_message_route(
}
}
} ;
} ;
// 0. Check the server is in the room
let room_id = match value
. get ( "room_id" )
. and_then ( | id | RoomId ::try_from ( id . as_str ( ) ? ) . ok ( ) )
{
Some ( id ) = > id ,
None = > {
// Event is invalid
resolved_map . insert ( event_id , Err ( "Event needs a valid RoomId." . to_string ( ) ) ) ;
continue ;
}
} ;
let mutex = Arc ::clone (
db . globals
. roomid_mutex_federation
. write ( )
. unwrap ( )
. entry ( room_id . clone ( ) )
. or_default ( ) ,
) ;
let mutex_lock = mutex . lock ( ) . await ;
let start_time = Instant ::now ( ) ;
let start_time = Instant ::now ( ) ;
resolved_map . insert (
resolved_map . insert (
event_id . clone ( ) ,
event_id . clone ( ) ,
handle_incoming_pdu ( & body . origin , & event_id , value , true , & db , & pub_key_map )
handle_incoming_pdu (
& body . origin ,
& event_id ,
& room_id ,
value ,
true ,
& db ,
& pub_key_map ,
)
. await
. await
. map ( | _ | ( ) ) ,
. map ( | _ | ( ) ) ,
) ;
) ;
drop ( mutex_lock ) ;
let elapsed = start_time . elapsed ( ) ;
let elapsed = start_time . elapsed ( ) ;
if elapsed > Duration ::from_secs ( 1 ) {
if elapsed > Duration ::from_secs ( 1 ) {
@ -775,6 +807,8 @@ pub async fn send_transaction_message_route(
}
}
}
}
db . flush ( ) . await ? ;
Ok ( send_transaction_message ::v1 ::Response { pdus : resolved_map } . into ( ) )
Ok ( send_transaction_message ::v1 ::Response { pdus : resolved_map } . into ( ) )
}
}
@ -782,8 +816,8 @@ pub async fn send_transaction_message_route(
type AsyncRecursiveResult < ' a , T , E > = Pin < Box < dyn Future < Output = StdResult < T , E > > + ' a + Send > > ;
type AsyncRecursiveResult < ' a , T , E > = Pin < Box < dyn Future < Output = StdResult < T , E > > + ' a + Send > > ;
/// When receiving an event one needs to:
/// When receiving an event one needs to:
/// 0. Skip the PDU if we already know about it
/// 0. Check the server is in the room
/// 1. Check the server is in the room
/// 1. Skip the PDU if we already know about it
/// 2. Check signatures, otherwise drop
/// 2. Check signatures, otherwise drop
/// 3. Check content hash, redact if doesn't match
/// 3. Check content hash, redact if doesn't match
/// 4. Fetch any missing auth events doing all checks listed here starting at 1. These are not
/// 4. Fetch any missing auth events doing all checks listed here starting at 1. These are not
@ -808,6 +842,7 @@ type AsyncRecursiveResult<'a, T, E> = Pin<Box<dyn Future<Output = StdResult<T, E
pub fn handle_incoming_pdu < ' a > (
pub fn handle_incoming_pdu < ' a > (
origin : & ' a ServerName ,
origin : & ' a ServerName ,
event_id : & ' a EventId ,
event_id : & ' a EventId ,
room_id : & ' a RoomId ,
value : BTreeMap < String , CanonicalJsonValue > ,
value : BTreeMap < String , CanonicalJsonValue > ,
is_timeline_event : bool ,
is_timeline_event : bool ,
db : & ' a Database ,
db : & ' a Database ,
@ -815,24 +850,6 @@ pub fn handle_incoming_pdu<'a>(
) -> AsyncRecursiveResult < ' a , Option < Vec < u8 > > , String > {
) -> AsyncRecursiveResult < ' a , Option < Vec < u8 > > , String > {
Box ::pin ( async move {
Box ::pin ( async move {
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
// 0. Skip the PDU if we already have it as a timeline event
if let Ok ( Some ( pdu_id ) ) = db . rooms . get_pdu_id ( & event_id ) {
return Ok ( Some ( pdu_id . to_vec ( ) ) ) ;
}
// 1. Check the server is in the room
let room_id = match value
. get ( "room_id" )
. and_then ( | id | RoomId ::try_from ( id . as_str ( ) ? ) . ok ( ) )
{
Some ( id ) = > id ,
None = > {
// Event is invalid
return Err ( "Event needs a valid RoomId." . to_string ( ) ) ;
}
} ;
match db . rooms . exists ( & room_id ) {
match db . rooms . exists ( & room_id ) {
Ok ( true ) = > { }
Ok ( true ) = > { }
_ = > {
_ = > {
@ -840,6 +857,11 @@ pub fn handle_incoming_pdu<'a>(
}
}
}
}
// 1. Skip the PDU if we already have it as a timeline event
if let Ok ( Some ( pdu_id ) ) = db . rooms . get_pdu_id ( & event_id ) {
return Ok ( Some ( pdu_id . to_vec ( ) ) ) ;
}
// We go through all the signatures we see on the value and fetch the corresponding signing
// We go through all the signatures we see on the value and fetch the corresponding signing
// keys
// keys
fetch_required_signing_keys ( & value , & pub_key_map , db )
fetch_required_signing_keys ( & value , & pub_key_map , db )
@ -899,7 +921,7 @@ pub fn handle_incoming_pdu<'a>(
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
// EDIT: Step 5 is not applied anymore because it failed too often
// EDIT: Step 5 is not applied anymore because it failed too often
debug ! ( "Fetching auth events for {}" , incoming_pdu . event_id ) ;
debug ! ( "Fetching auth events for {}" , incoming_pdu . event_id ) ;
fetch_and_handle_events ( db , origin , & incoming_pdu . auth_events , pub_key_map )
fetch_and_handle_events ( db , origin , & incoming_pdu . auth_events , & room_id , pub_key_map )
. await
. await
. map_err ( | e | e . to_string ( ) ) ? ;
. map_err ( | e | e . to_string ( ) ) ? ;
@ -1000,13 +1022,13 @@ pub fn handle_incoming_pdu<'a>(
if incoming_pdu . prev_events . len ( ) = = 1 {
if incoming_pdu . prev_events . len ( ) = = 1 {
let prev_event = & incoming_pdu . prev_events [ 0 ] ;
let prev_event = & incoming_pdu . prev_events [ 0 ] ;
let state_vec = db
let state = db
. rooms
. rooms
. pdu_shortstatehash ( prev_event )
. pdu_shortstatehash ( prev_event )
. map_err ( | _ | "Failed talking to db" . to_owned ( ) ) ?
. map_err ( | _ | "Failed talking to db" . to_owned ( ) ) ?
. map ( | shortstatehash | db . rooms . state_full_ids ( shortstatehash ) . ok ( ) )
. map ( | shortstatehash | db . rooms . state_full_ids ( shortstatehash ) . ok ( ) )
. flatten ( ) ;
. flatten ( ) ;
if let Some ( mut state_vec ) = state_vec {
if let Some ( mut state ) = state {
if db
if db
. rooms
. rooms
. get_pdu ( prev_event )
. get_pdu ( prev_event )
@ -1016,10 +1038,16 @@ pub fn handle_incoming_pdu<'a>(
. state_key
. state_key
. is_some ( )
. is_some ( )
{
{
state_vec . push ( prev_event . clone ( ) ) ;
state . insert ( prev_event . clone ( ) ) ;
}
}
state_at_incoming_event = Some (
state_at_incoming_event = Some (
fetch_and_handle_events ( db , origin , & state_vec , pub_key_map )
fetch_and_handle_events (
db ,
origin ,
& state . into_iter ( ) . collect ::< Vec < _ > > ( ) ,
& room_id ,
pub_key_map ,
)
. await
. await
. map_err ( | _ | "Failed to fetch state events locally" . to_owned ( ) ) ?
. map_err ( | _ | "Failed to fetch state events locally" . to_owned ( ) ) ?
. into_iter ( )
. into_iter ( )
@ -1057,8 +1085,14 @@ pub fn handle_incoming_pdu<'a>(
{
{
Ok ( res ) = > {
Ok ( res ) = > {
debug ! ( "Fetching state events at event." ) ;
debug ! ( "Fetching state events at event." ) ;
let state_vec =
let state_vec = match fetch_and_handle_events (
match fetch_and_handle_events ( & db , origin , & res . pdu_ids , pub_key_map ) . await
& db ,
origin ,
& res . pdu_ids ,
& room_id ,
pub_key_map ,
)
. await
{
{
Ok ( state ) = > state ,
Ok ( state ) = > state ,
Err ( _ ) = > return Err ( "Failed to fetch state events." . to_owned ( ) ) ,
Err ( _ ) = > return Err ( "Failed to fetch state events." . to_owned ( ) ) ,
@ -1088,7 +1122,13 @@ pub fn handle_incoming_pdu<'a>(
}
}
debug ! ( "Fetching auth chain events at event." ) ;
debug ! ( "Fetching auth chain events at event." ) ;
match fetch_and_handle_events ( & db , origin , & res . auth_chain_ids , pub_key_map )
match fetch_and_handle_events (
& db ,
origin ,
& res . auth_chain_ids ,
& room_id ,
pub_key_map ,
)
. await
. await
{
{
Ok ( state ) = > state ,
Ok ( state ) = > state ,
@ -1217,18 +1257,10 @@ pub fn handle_incoming_pdu<'a>(
let mut auth_events = vec! [ ] ;
let mut auth_events = vec! [ ] ;
for map in & fork_states {
for map in & fork_states {
let mut state_auth = vec! [ ] ;
let state_auth = map
for auth_id in map . values ( ) . flat_map ( | pdu | & pdu . auth_events ) {
. values ( )
match fetch_and_handle_events ( & db , origin , & [ auth_id . clone ( ) ] , pub_key_map )
. flat_map ( | pdu | pdu . auth_events . clone ( ) )
. await
. collect ( ) ;
{
// This should always contain exactly one element when Ok
Ok ( events ) = > state_auth . extend_from_slice ( & events ) ,
Err ( e ) = > {
debug ! ( "Event was not present: {}" , e ) ;
}
}
}
auth_events . push ( state_auth ) ;
auth_events . push ( state_auth ) ;
}
}
@ -1243,10 +1275,7 @@ pub fn handle_incoming_pdu<'a>(
. collect ::< StateMap < _ > > ( )
. collect ::< StateMap < _ > > ( )
} )
} )
. collect ::< Vec < _ > > ( ) ,
. collect ::< Vec < _ > > ( ) ,
auth_events
auth_events ,
. into_iter ( )
. map ( | pdus | pdus . into_iter ( ) . map ( | pdu | pdu . event_id ( ) . clone ( ) ) . collect ( ) )
. collect ( ) ,
& | id | {
& | id | {
let res = db . rooms . get_pdu ( id ) ;
let res = db . rooms . get_pdu ( id ) ;
if let Err ( e ) = & res {
if let Err ( e ) = & res {
@ -1280,11 +1309,13 @@ pub fn handle_incoming_pdu<'a>(
pdu_id = Some (
pdu_id = Some (
append_incoming_pdu (
append_incoming_pdu (
& db ,
& db ,
& room_id ,
& incoming_pdu ,
& incoming_pdu ,
val ,
val ,
extremities ,
extremities ,
& state_at_incoming_event ,
& state_at_incoming_event ,
)
)
. await
. map_err ( | _ | "Failed to add pdu to db." . to_owned ( ) ) ? ,
. map_err ( | _ | "Failed to add pdu to db." . to_owned ( ) ) ? ,
) ;
) ;
debug ! ( "Appended incoming pdu." ) ;
debug ! ( "Appended incoming pdu." ) ;
@ -1322,6 +1353,7 @@ pub(crate) fn fetch_and_handle_events<'a>(
db : & ' a Database ,
db : & ' a Database ,
origin : & ' a ServerName ,
origin : & ' a ServerName ,
events : & ' a [ EventId ] ,
events : & ' a [ EventId ] ,
room_id : & ' a RoomId ,
pub_key_map : & ' a RwLock < BTreeMap < String , BTreeMap < String , String > > > ,
pub_key_map : & ' a RwLock < BTreeMap < String , BTreeMap < String , String > > > ,
) -> AsyncRecursiveResult < ' a , Vec < Arc < PduEvent > > , Error > {
) -> AsyncRecursiveResult < ' a , Vec < Arc < PduEvent > > , Error > {
Box ::pin ( async move {
Box ::pin ( async move {
@ -1375,6 +1407,7 @@ pub(crate) fn fetch_and_handle_events<'a>(
match handle_incoming_pdu (
match handle_incoming_pdu (
origin ,
origin ,
& event_id ,
& event_id ,
& room_id ,
value . clone ( ) ,
value . clone ( ) ,
false ,
false ,
db ,
db ,
@ -1581,32 +1614,38 @@ pub(crate) async fn fetch_signing_keys(
/// Append the incoming event setting the state snapshot to the state from the
/// Append the incoming event setting the state snapshot to the state from the
/// server that sent the event.
/// server that sent the event.
#[ tracing::instrument(skip(db)) ]
#[ tracing::instrument(skip(db)) ]
pub ( crate ) fn append_incoming_pdu (
async fn append_incoming_pdu (
db : & Database ,
db : & Database ,
room_id : & RoomId ,
pdu : & PduEvent ,
pdu : & PduEvent ,
pdu_json : CanonicalJsonObject ,
pdu_json : CanonicalJsonObject ,
new_room_leaves : HashSet < EventId > ,
new_room_leaves : HashSet < EventId > ,
state : & StateMap < Arc < PduEvent > > ,
state : & StateMap < Arc < PduEvent > > ,
) -> Result < Vec < u8 > > {
) -> Result < Vec < u8 > > {
let count = db . globals . next_count ( ) ? ;
let mutex = Arc ::clone (
let mut pdu_id = pdu . room_id . as_bytes ( ) . to_vec ( ) ;
db . globals
pdu_id . push ( 0xff ) ;
. roomid_mutex
pdu_id . extend_from_slice ( & count . to_be_bytes ( ) ) ;
. write ( )
. unwrap ( )
. entry ( room_id . clone ( ) )
. or_default ( ) ,
) ;
let mutex_lock = mutex . lock ( ) . await ;
// We append to state before appending the pdu, so we don't have a moment in time with the
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.
// pdu without it's state. This is okay because append_pdu can't fail.
db . rooms
db . rooms
. set_event_state ( & pdu . event_id , state , & db . globals ) ? ;
. set_event_state ( & pdu . event_id , state , & db . globals ) ? ;
db . rooms . append_pdu (
let pdu_id = db . rooms . append_pdu (
pdu ,
pdu ,
pdu_json ,
pdu_json ,
count ,
& pdu_id ,
& new_room_leaves . into_iter ( ) . collect ::< Vec < _ > > ( ) ,
& new_room_leaves . into_iter ( ) . collect ::< Vec < _ > > ( ) ,
& db ,
& db ,
) ? ;
) ? ;
drop ( mutex_lock ) ;
for appservice in db . appservice . iter_all ( ) ? . filter_map ( | r | r . ok ( ) ) {
for appservice in db . appservice . iter_all ( ) ? . filter_map ( | r | r . ok ( ) ) {
if let Some ( namespaces ) = appservice . 1. get ( "namespaces" ) {
if let Some ( namespaces ) = appservice . 1. get ( "namespaces" ) {
let users = namespaces
let users = namespaces
@ -1674,7 +1713,7 @@ pub(crate) fn append_incoming_pdu(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub fn get_event_route (
pub fn get_event_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < get_event ::v1 ::Request < ' _ > > ,
body : Ruma < get_event ::v1 ::Request < ' _ > > ,
) -> ConduitResult < get_event ::v1 ::Response > {
) -> ConduitResult < get_event ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -1699,7 +1738,7 @@ pub fn get_event_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub fn get_missing_events_route (
pub fn get_missing_events_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < get_missing_events ::v1 ::Request < ' _ > > ,
body : Ruma < get_missing_events ::v1 ::Request < ' _ > > ,
) -> ConduitResult < get_missing_events ::v1 ::Response > {
) -> ConduitResult < get_missing_events ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -1748,7 +1787,7 @@ pub fn get_missing_events_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub fn get_event_authorization_route (
pub fn get_event_authorization_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < get_event_authorization ::v1 ::Request < ' _ > > ,
body : Ruma < get_event_authorization ::v1 ::Request < ' _ > > ,
) -> ConduitResult < get_event_authorization ::v1 ::Response > {
) -> ConduitResult < get_event_authorization ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -1792,7 +1831,7 @@ pub fn get_event_authorization_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub fn get_room_state_route (
pub fn get_room_state_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < get_room_state ::v1 ::Request < ' _ > > ,
body : Ruma < get_room_state ::v1 ::Request < ' _ > > ,
) -> ConduitResult < get_room_state ::v1 ::Response > {
) -> ConduitResult < get_room_state ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -1855,7 +1894,7 @@ pub fn get_room_state_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub fn get_room_state_ids_route (
pub fn get_room_state_ids_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < get_room_state_ids ::v1 ::Request < ' _ > > ,
body : Ruma < get_room_state_ids ::v1 ::Request < ' _ > > ,
) -> ConduitResult < get_room_state_ids ::v1 ::Response > {
) -> ConduitResult < get_room_state_ids ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -1870,7 +1909,11 @@ pub fn get_room_state_ids_route(
"Pdu state not found." ,
"Pdu state not found." ,
) ) ? ;
) ) ? ;
let pdu_ids = db . rooms . state_full_ids ( shortstatehash ) ? ;
let pdu_ids = db
. rooms
. state_full_ids ( shortstatehash ) ?
. into_iter ( )
. collect ( ) ;
let mut auth_chain_ids = BTreeSet ::< EventId > ::new ( ) ;
let mut auth_chain_ids = BTreeSet ::< EventId > ::new ( ) ;
let mut todo = BTreeSet ::new ( ) ;
let mut todo = BTreeSet ::new ( ) ;
@ -1907,7 +1950,7 @@ pub fn get_room_state_ids_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub fn create_join_event_template_route (
pub fn create_join_event_template_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < create_join_event_template ::v1 ::Request < ' _ > > ,
body : Ruma < create_join_event_template ::v1 ::Request < ' _ > > ,
) -> ConduitResult < create_join_event_template ::v1 ::Response > {
) -> ConduitResult < create_join_event_template ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -2076,7 +2119,7 @@ pub fn create_join_event_template_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub async fn create_join_event_route (
pub async fn create_join_event_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < create_join_event ::v2 ::Request < ' _ > > ,
body : Ruma < create_join_event ::v2 ::Request < ' _ > > ,
) -> ConduitResult < create_join_event ::v2 ::Response > {
) -> ConduitResult < create_join_event ::v2 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -2116,7 +2159,24 @@ pub async fn create_join_event_route(
)
)
. map_err ( | _ | Error ::BadRequest ( ErrorKind ::InvalidParam , "Origin field is invalid." ) ) ? ;
. map_err ( | _ | Error ::BadRequest ( ErrorKind ::InvalidParam , "Origin field is invalid." ) ) ? ;
let pdu_id = handle_incoming_pdu ( & origin , & event_id , value , true , & db , & pub_key_map )
let mutex = Arc ::clone (
db . globals
. roomid_mutex_federation
. write ( )
. unwrap ( )
. entry ( body . room_id . clone ( ) )
. or_default ( ) ,
) ;
let mutex_lock = mutex . lock ( ) . await ;
let pdu_id = handle_incoming_pdu (
& origin ,
& event_id ,
& body . room_id ,
value ,
true ,
& db ,
& pub_key_map ,
)
. await
. await
. map_err ( | _ | {
. map_err ( | _ | {
Error ::BadRequest (
Error ::BadRequest (
@ -2128,6 +2188,7 @@ pub async fn create_join_event_route(
ErrorKind ::InvalidParam ,
ErrorKind ::InvalidParam ,
"Could not accept incoming PDU as timeline event." ,
"Could not accept incoming PDU as timeline event." ,
) ) ? ;
) ) ? ;
drop ( mutex_lock ) ;
let state_ids = db . rooms . state_full_ids ( shortstatehash ) ? ;
let state_ids = db . rooms . state_full_ids ( shortstatehash ) ? ;
@ -2161,6 +2222,8 @@ pub async fn create_join_event_route(
db . sending . send_pdu ( & server , & pdu_id ) ? ;
db . sending . send_pdu ( & server , & pdu_id ) ? ;
}
}
db . flush ( ) . await ? ;
Ok ( create_join_event ::v2 ::Response {
Ok ( create_join_event ::v2 ::Response {
room_state : RoomState {
room_state : RoomState {
auth_chain : auth_chain_ids
auth_chain : auth_chain_ids
@ -2184,7 +2247,7 @@ pub async fn create_join_event_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub async fn create_invite_route (
pub async fn create_invite_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < create_invite ::v2 ::Request > ,
body : Ruma < create_invite ::v2 ::Request > ,
) -> ConduitResult < create_invite ::v2 ::Response > {
) -> ConduitResult < create_invite ::v2 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -2277,6 +2340,8 @@ pub async fn create_invite_route(
) ? ;
) ? ;
}
}
db . flush ( ) . await ? ;
Ok ( create_invite ::v2 ::Response {
Ok ( create_invite ::v2 ::Response {
event : PduEvent ::convert_to_outgoing_federation_event ( signed_event ) ,
event : PduEvent ::convert_to_outgoing_federation_event ( signed_event ) ,
}
}
@ -2289,7 +2354,7 @@ pub async fn create_invite_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub fn get_devices_route (
pub fn get_devices_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < get_devices ::v1 ::Request < ' _ > > ,
body : Ruma < get_devices ::v1 ::Request < ' _ > > ,
) -> ConduitResult < get_devices ::v1 ::Response > {
) -> ConduitResult < get_devices ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -2329,7 +2394,7 @@ pub fn get_devices_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub fn get_room_information_route (
pub fn get_room_information_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < get_room_information ::v1 ::Request < ' _ > > ,
body : Ruma < get_room_information ::v1 ::Request < ' _ > > ,
) -> ConduitResult < get_room_information ::v1 ::Response > {
) -> ConduitResult < get_room_information ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -2357,7 +2422,7 @@ pub fn get_room_information_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub fn get_profile_information_route (
pub fn get_profile_information_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < get_profile_information ::v1 ::Request < ' _ > > ,
body : Ruma < get_profile_information ::v1 ::Request < ' _ > > ,
) -> ConduitResult < get_profile_information ::v1 ::Response > {
) -> ConduitResult < get_profile_information ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -2390,8 +2455,8 @@ pub fn get_profile_information_route(
post ( "/_matrix/federation/v1/user/keys/query" , data = "<body>" )
post ( "/_matrix/federation/v1/user/keys/query" , data = "<body>" )
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub fn get_keys_route (
pub async fn get_keys_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < get_keys ::v1 ::Request > ,
body : Ruma < get_keys ::v1 ::Request > ,
) -> ConduitResult < get_keys ::v1 ::Response > {
) -> ConduitResult < get_keys ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {
@ -2405,6 +2470,8 @@ pub fn get_keys_route(
& db ,
& db ,
) ? ;
) ? ;
db . flush ( ) . await ? ;
Ok ( get_keys ::v1 ::Response {
Ok ( get_keys ::v1 ::Response {
device_keys : result . device_keys ,
device_keys : result . device_keys ,
master_keys : result . master_keys ,
master_keys : result . master_keys ,
@ -2419,7 +2486,7 @@ pub fn get_keys_route(
) ]
) ]
#[ tracing::instrument(skip(db, body)) ]
#[ tracing::instrument(skip(db, body)) ]
pub async fn claim_keys_route (
pub async fn claim_keys_route (
db : State < ' _ , Arc < Database > > ,
db : DatabaseGuard ,
body : Ruma < claim_keys ::v1 ::Request > ,
body : Ruma < claim_keys ::v1 ::Request > ,
) -> ConduitResult < claim_keys ::v1 ::Response > {
) -> ConduitResult < claim_keys ::v1 ::Response > {
if ! db . globals . allow_federation ( ) {
if ! db . globals . allow_federation ( ) {