@ -88,7 +88,9 @@ pub async fn sync_events_route(
let we_have_to_wait = rx . borrow ( ) . is_none ( ) ;
let we_have_to_wait = rx . borrow ( ) . is_none ( ) ;
if we_have_to_wait {
if we_have_to_wait {
let _ = rx . changed ( ) . await ;
if let Err ( e ) = rx . changed ( ) . await {
error ! ( "Error waiting for sync: {}" , e ) ;
}
}
}
let result = match rx
let result = match rx
@ -222,13 +224,16 @@ async fn sync_helper(
// Database queries:
// Database queries:
let current_shortstatehash = db . rooms . current_shortstatehash ( & room_id ) ? ;
let current_shortstatehash = db
. rooms
. current_shortstatehash ( & room_id ) ?
. expect ( "All rooms have state" ) ;
// These type is Option<Option<_>>. The outer Option is None when there is no event between
let first_pdu_before_since = db
// since and the current room state, meaning there should be no updates.
. rooms
// The inner Option is None when there is an event, but there is no state hash associated
. pdus_until ( & sender_user , & room_id , since )
// with it. This can happen for the RoomCreate event, so all updates should arrive.
. next ( )
let first_pdu_before_since = db . rooms . pdus_until ( & sender_user , & room_id , since ) . next ( ) ;
. transpose ( ) ? ;
let pdus_after_since = db
let pdus_after_since = db
. rooms
. rooms
@ -236,11 +241,78 @@ async fn sync_helper(
. next ( )
. next ( )
. is_some ( ) ;
. is_some ( ) ;
let since_shortstatehash = first_pdu_before_since . as_ref ( ) . map ( | pdu | {
let since_shortstatehash = first_pdu_before_since
db . rooms
. as_ref ( )
. pdu_shortstatehash ( & pdu . as_ref ( ) . ok ( ) ? . 1. event_id )
. map ( | pdu | {
. ok ( ) ?
db . rooms
} ) ;
. pdu_shortstatehash ( & pdu . 1. event_id )
. transpose ( )
. expect ( "all pdus have state" )
} )
. transpose ( ) ? ;
// Calculates joined_member_count, invited_member_count and heroes
let calculate_counts = | | {
let joined_member_count = db . rooms . room_members ( & room_id ) . count ( ) ;
let invited_member_count = db . rooms . room_members_invited ( & room_id ) . count ( ) ;
// Recalculate heroes (first 5 members)
let mut heroes = Vec ::new ( ) ;
if joined_member_count + invited_member_count < = 5 {
// Go through all PDUs and for each member event, check if the user is still joined or
// invited until we have 5 or we reach the end
for hero in db
. rooms
. all_pdus ( & sender_user , & room_id )
. filter_map ( | pdu | pdu . ok ( ) ) // Ignore all broken pdus
. filter ( | ( _ , pdu ) | pdu . kind = = EventType ::RoomMember )
. map ( | ( _ , pdu ) | {
let content = serde_json ::from_value ::<
ruma ::events ::room ::member ::MemberEventContent ,
> ( pdu . content . clone ( ) )
. map_err ( | _ | Error ::bad_database ( "Invalid member event in database." ) ) ? ;
if let Some ( state_key ) = & pdu . state_key {
let user_id = UserId ::try_from ( state_key . clone ( ) ) . map_err ( | _ | {
Error ::bad_database ( "Invalid UserId in member PDU." )
} ) ? ;
// The membership was and still is invite or join
if matches! (
content . membership ,
MembershipState ::Join | MembershipState ::Invite
) & & ( db . rooms . is_joined ( & user_id , & room_id ) ?
| | db . rooms . is_invited ( & user_id , & room_id ) ? )
{
Ok ::< _ , Error > ( Some ( state_key . clone ( ) ) )
} else {
Ok ( None )
}
} else {
Ok ( None )
}
} )
// Filter out buggy users
. filter_map ( | u | u . ok ( ) )
// Filter for possible heroes
. flatten ( )
{
if heroes . contains ( & hero ) | | hero = = sender_user . as_str ( ) {
continue ;
}
heroes . push ( hero ) ;
}
}
(
Some ( joined_member_count ) ,
Some ( invited_member_count ) ,
heroes ,
)
} ;
let (
let (
heroes ,
heroes ,
@ -248,63 +320,107 @@ async fn sync_helper(
invited_member_count ,
invited_member_count ,
joined_since_last_sync ,
joined_since_last_sync ,
state_events ,
state_events ,
) = if pdus_after_since & & Some ( current_shortstatehash ) ! = since_shortstatehash {
) = if since_shortstatehash . is_none ( ) {
let current_state = db . rooms . room_state_full ( & room_id ) ? ;
// Probably since = 0, we will do an initial sync
let current_members = current_state
let ( joined_member_count , invited_member_count , heroes ) = calculate_counts ( ) ;
let current_state_ids = db . rooms . state_full_ids ( current_shortstatehash ) ? ;
let state_events = current_state_ids
. iter ( )
. iter ( )
. filter ( | ( key , _ ) | key . 0 = = EventType ::RoomMember )
. map ( | id | db . rooms . get_pdu ( id ) )
. map ( | ( key , value ) | ( & key . 1 , value ) ) // Only keep state key
. filter_ map( | r | r . ok ( ) . flatten ( ) )
. collect ::< Vec < _ > > ( ) ;
. collect ::< Vec < _ > > ( ) ;
let encrypted_room = current_state
. get ( & ( EventType ::RoomEncryption , "" . to_owned ( ) ) )
. is_some ( ) ;
let since_state = since_shortstatehash
. as_ref ( )
. map ( | since_shortstatehash | {
since_shortstatehash
. map ( | since_shortstatehash | db . rooms . state_full ( since_shortstatehash ) )
. transpose ( )
} )
. transpose ( ) ? ;
let since_encryption = since_state . as_ref ( ) . map ( | state | {
(
state
heroes ,
. as_ref ( )
joined_member_count ,
. map ( | state | state . get ( & ( EventType ::RoomEncryption , "" . to_owned ( ) ) ) )
invited_member_count ,
} ) ;
true ,
state_events
)
} else if ! pdus_after_since | | since_shortstatehash = = Some ( current_shortstatehash ) {
// No state changes
( Vec ::new ( ) , None , None , false , Vec ::new ( ) )
} else {
// Incremental /sync
let since_shortstatehash = since_shortstatehash . unwrap ( ) ;
// Calculations:
let since_sender_member = db
let new_encrypted_room =
. rooms
encrypted_room & & since_encryption . map_or ( true , | encryption | encryption . is_none ( ) ) ;
. state_get (
since_shortstatehash ,
& EventType ::RoomMember ,
sender_user . as_str ( ) ,
) ?
. and_then ( | pdu | {
serde_json ::from_value ::< Raw < ruma ::events ::room ::member ::MemberEventContent > > (
pdu . content . clone ( ) ,
)
. expect ( "Raw::from_value always works" )
. deserialize ( )
. map_err ( | _ | Error ::bad_database ( "Invalid PDU in database." ) )
. ok ( )
} ) ;
let joined_since_last_sync = since_sender_member
. map_or ( true , | member | member . membership ! = MembershipState ::Join ) ;
let current_state_ids = db . rooms . state_full_ids ( current_shortstatehash ) ? ;
let since_state_ids = db . rooms . state_full_ids ( since_shortstatehash ) ? ;
let send_member_count = since_state . as_ref ( ) . map_or ( true , | since_state | {
let state_events = if joined_since_last_sync {
since_state . as_ref ( ) . map_or ( true , | since_state | {
current_state_ids
current_members . len ( )
. iter ( )
! = since_state
. map ( | id | db . rooms . get_pdu ( id ) )
. filter_map ( | r | r . ok ( ) . flatten ( ) )
. collect ::< Vec < _ > > ( )
} else {
current_state_ids
. difference ( & since_state_ids )
. filter ( | id | {
! timeline_pdus
. iter ( )
. iter ( )
. filter ( | ( key , _ ) | key . 0 = = EventType ::RoomMember )
. any ( | ( _ , timeline_pdu ) | timeline_pdu . event_id = = * * id )
. count ( )
} )
} )
. map ( | id | db . rooms . get_pdu ( id ) )
} ) ;
. filter_map ( | r | r . ok ( ) . flatten ( ) )
. collect ( )
} ;
let since_sender_member = since_state . as_ref ( ) . map ( | since_state | {
let encrypted_room = db
since_state . as_ref ( ) . and_then ( | state | {
. rooms
state
. state_get ( current_shortstatehash , & EventType ::RoomEncryption , "" ) ?
. get ( & ( EventType ::RoomMember , sender_user . as_str ( ) . to_owned ( ) ) )
. is_some ( ) ;
. and_then ( | pdu | {
serde_json ::from_value ::<
let since_encryption =
Raw < ruma ::events ::room ::member ::MemberEventContent > ,
db . rooms
> ( pdu . content . clone ( ) )
. state_get ( since_shortstatehash , & EventType ::RoomEncryption , "" ) ? ;
. expect ( "Raw::from_value always works" )
. deserialize ( )
// Calculations:
. map_err ( | _ | Error ::bad_database ( "Invalid PDU in database." ) )
let new_encrypted_room = encrypted_room & & since_encryption . is_none ( ) ;
. ok ( )
} )
let send_member_count = state_events
} )
. iter ( )
} ) ;
. any ( | event | event . kind = = EventType ::RoomMember ) ;
if encrypted_room {
if encrypted_room {
for ( user_id , current_member ) in current_members {
for ( user_id , current_member ) in db
. rooms
. room_members ( & room_id )
. filter_map ( | r | r . ok ( ) )
. filter_map ( | user_id | {
db . rooms
. state_get (
current_shortstatehash ,
& EventType ::RoomMember ,
user_id . as_str ( ) ,
)
. ok ( )
. flatten ( )
. map ( | current_member | ( user_id , current_member ) )
} )
{
let current_membership = serde_json ::from_value ::<
let current_membership = serde_json ::from_value ::<
Raw < ruma ::events ::room ::member ::MemberEventContent > ,
Raw < ruma ::events ::room ::member ::MemberEventContent > ,
> ( current_member . content . clone ( ) )
> ( current_member . content . clone ( ) )
@ -313,31 +429,23 @@ async fn sync_helper(
. map_err ( | _ | Error ::bad_database ( "Invalid PDU in database." ) ) ?
. map_err ( | _ | Error ::bad_database ( "Invalid PDU in database." ) ) ?
. membership ;
. membership ;
let since_membership =
let since_membership = db
since_state
. rooms
. as_ref ( )
. state_get (
. map_or ( MembershipState ::Leave , | since_state | {
since_shortstatehash ,
since_state
& EventType ::RoomMember ,
. as_ref ( )
user_id . as_str ( ) ,
. and_then ( | since_state | {
) ?
since_state
. and_then ( | since_member | {
. get ( & ( EventType ::RoomMember , user_id . clone ( ) ) )
serde_json ::from_value ::<
. and_then ( | since_member | {
Raw < ruma ::events ::room ::member ::MemberEventContent > ,
serde_json ::from_value ::<
> ( since_member . content . clone ( ) )
Raw < ruma ::events ::room ::member ::MemberEventContent > ,
. expect ( "Raw::from_value always works" )
> (
. deserialize ( )
since_member . content . clone ( )
. map_err ( | _ | Error ::bad_database ( "Invalid PDU in database." ) )
)
. ok ( )
. expect ( "Raw::from_value always works" )
} )
. deserialize ( )
. map_or ( MembershipState ::Leave , | member | member . membership ) ;
. map_err ( | _ | {
Error ::bad_database ( "Invalid PDU in database." )
} )
. ok ( )
} )
} )
. map_or ( MembershipState ::Leave , | member | member . membership )
} ) ;
let user_id = UserId ::try_from ( user_id . clone ( ) )
let user_id = UserId ::try_from ( user_id . clone ( ) )
. map_err ( | _ | Error ::bad_database ( "Invalid UserId in member PDU." ) ) ? ;
. map_err ( | _ | Error ::bad_database ( "Invalid UserId in member PDU." ) ) ? ;
@ -359,10 +467,6 @@ async fn sync_helper(
}
}
}
}
let joined_since_last_sync = since_sender_member . map_or ( true , | member | {
member . map_or ( true , | member | member . membership ! = MembershipState ::Join )
} ) ;
if joined_since_last_sync & & encrypted_room | | new_encrypted_room {
if joined_since_last_sync & & encrypted_room | | new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users
// If the user is in a new encrypted room, give them all joined users
device_list_updates . extend (
device_list_updates . extend (
@ -382,100 +486,11 @@ async fn sync_helper(
}
}
let ( joined_member_count , invited_member_count , heroes ) = if send_member_count {
let ( joined_member_count , invited_member_count , heroes ) = if send_member_count {
let joined_member_count = db . rooms . room_members ( & room_id ) . count ( ) ;
calculate_counts ( )
let invited_member_count = db . rooms . room_members_invited ( & room_id ) . count ( ) ;
// Recalculate heroes (first 5 members)
let mut heroes = Vec ::new ( ) ;
if joined_member_count + invited_member_count < = 5 {
// Go through all PDUs and for each member event, check if the user is still joined or
// invited until we have 5 or we reach the end
for hero in db
. rooms
. all_pdus ( & sender_user , & room_id )
. filter_map ( | pdu | pdu . ok ( ) ) // Ignore all broken pdus
. filter ( | ( _ , pdu ) | pdu . kind = = EventType ::RoomMember )
. map ( | ( _ , pdu ) | {
let content = serde_json ::from_value ::<
ruma ::events ::room ::member ::MemberEventContent ,
> ( pdu . content . clone ( ) )
. map_err ( | _ | {
Error ::bad_database ( "Invalid member event in database." )
} ) ? ;
if let Some ( state_key ) = & pdu . state_key {
let user_id =
UserId ::try_from ( state_key . clone ( ) ) . map_err ( | _ | {
Error ::bad_database ( "Invalid UserId in member PDU." )
} ) ? ;
// The membership was and still is invite or join
if matches! (
content . membership ,
MembershipState ::Join | MembershipState ::Invite
) & & ( db . rooms . is_joined ( & user_id , & room_id ) ?
| | db . rooms . is_invited ( & user_id , & room_id ) ? )
{
Ok ::< _ , Error > ( Some ( state_key . clone ( ) ) )
} else {
Ok ( None )
}
} else {
Ok ( None )
}
} )
// Filter out buggy users
. filter_map ( | u | u . ok ( ) )
// Filter for possible heroes
. flatten ( )
{
if heroes . contains ( & hero ) | | hero = = sender_user . as_str ( ) {
continue ;
}
heroes . push ( hero ) ;
}
}
(
Some ( joined_member_count ) ,
Some ( invited_member_count ) ,
heroes ,
)
} else {
} else {
( None , None , Vec ::new ( ) )
( None , None , Vec ::new ( ) )
} ;
} ;
let state_events = if joined_since_last_sync {
current_state
. iter ( )
. map ( | ( _ , pdu ) | pdu . to_sync_state_event ( ) )
. collect ( )
} else {
match since_state {
None = > Vec ::new ( ) ,
Some ( Some ( since_state ) ) = > current_state
. iter ( )
. filter ( | ( key , value ) | {
since_state . get ( key ) . map ( | e | & e . event_id ) ! = Some ( & value . event_id )
} )
. filter ( | ( _ , value ) | {
! timeline_pdus . iter ( ) . any ( | ( _ , timeline_pdu ) | {
timeline_pdu . kind = = value . kind
& & timeline_pdu . state_key = = value . state_key
} )
} )
. map ( | ( _ , pdu ) | pdu . to_sync_state_event ( ) )
. collect ( ) ,
Some ( None ) = > current_state
. iter ( )
. map ( | ( _ , pdu ) | pdu . to_sync_state_event ( ) )
. collect ( ) ,
}
} ;
(
(
heroes ,
heroes ,
joined_member_count ,
joined_member_count ,
@ -483,8 +498,6 @@ async fn sync_helper(
joined_since_last_sync ,
joined_since_last_sync ,
state_events ,
state_events ,
)
)
} else {
( Vec ::new ( ) , None , None , false , Vec ::new ( ) )
} ;
} ;
// Look for device list updates in this room
// Look for device list updates in this room
@ -575,7 +588,10 @@ async fn sync_helper(
events : room_events ,
events : room_events ,
} ,
} ,
state : sync_events ::State {
state : sync_events ::State {
events : state_events ,
events : state_events
. iter ( )
. map ( | pdu | pdu . to_sync_state_event ( ) )
. collect ( ) ,
} ,
} ,
ephemeral : sync_events ::Ephemeral { events : edus } ,
ephemeral : sync_events ::Ephemeral { events : edus } ,
} ;
} ;