use std ::{
collections ::{ BTreeMap , HashMap , HashSet } ,
convert ::{ TryFrom , TryInto } ,
fmt ::Debug ,
future ::Future ,
sync ::Arc ,
time ::{ Duration , Instant } ,
} ;
use crate ::{
appservice_server , database ::pusher , server_server , utils , Database , Error , PduEvent , Result ,
} ;
use federation ::transactions ::send_transaction_message ;
use ring ::digest ;
use rocket ::futures ::{
channel ::mpsc ,
stream ::{ FuturesUnordered , StreamExt } ,
} ;
use ruma ::{
api ::{
appservice ,
federation ::{
self ,
transactions ::edu ::{
DeviceListUpdateContent , Edu , ReceiptContent , ReceiptData , ReceiptMap ,
} ,
} ,
OutgoingRequest ,
} ,
device_id ,
events ::{ push_rules , AnySyncEphemeralRoomEvent , EventType } ,
push ,
receipt ::ReceiptType ,
uint , MilliSecondsSinceUnixEpoch , ServerName , UInt , UserId ,
} ;
use tokio ::{
select ,
sync ::{ oneshot , RwLock , Semaphore } ,
} ;
use tracing ::{ debug , error , warn } ;
use super ::abstraction ::Tree ;
#[ derive(Clone, Debug, PartialEq, Eq, Hash) ]
pub enum OutgoingKind {
Appservice ( Box < ServerName > ) ,
Push ( Vec < u8 > , Vec < u8 > ) , // user and pushkey
Normal ( Box < ServerName > ) ,
}
impl OutgoingKind {
#[ tracing::instrument(skip(self)) ]
pub fn get_prefix ( & self ) -> Vec < u8 > {
let mut prefix = match self {
OutgoingKind ::Appservice ( server ) = > {
let mut p = b" + " . to_vec ( ) ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
}
OutgoingKind ::Push ( user , pushkey ) = > {
let mut p = b" $ " . to_vec ( ) ;
p . extend_from_slice ( & user ) ;
p . push ( 0xff ) ;
p . extend_from_slice ( & pushkey ) ;
p
}
OutgoingKind ::Normal ( server ) = > {
let mut p = Vec ::new ( ) ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
}
} ;
prefix . push ( 0xff ) ;
prefix
}
fn describe ( & self ) -> std ::result ::Result < String , std ::string ::FromUtf8Error > {
Ok ( match self {
OutgoingKind ::Appservice ( appservice_id ) = > {
format! ( "Appservice (ID {})" , appservice_id )
}
OutgoingKind ::Push ( user , _ ) = > {
let user = utils ::string_from_bytes ( user ) ? ;
format! ( "User Push Service (for {})" , user )
}
OutgoingKind ::Normal ( server ) = > {
format! ( "Matrix Server ({})" , server )
}
} )
}
}
#[ derive(Clone, Debug, PartialEq, Eq, Hash) ]
pub enum SendingEventType {
Pdu ( Vec < u8 > ) ,
Edu ( Vec < u8 > ) ,
}
pub struct Sending {
/// The state for a given state hash.
pub ( super ) servername_educount : Arc < dyn Tree > , // EduCount: Count of last EDU sync
pub ( super ) servernameevent_data : Arc < dyn Tree > , // ServernamEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id (for edus), Data = EDU content
pub ( super ) servercurrentevent_data : Arc < dyn Tree > , // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for edus), Data = EDU content
pub ( super ) maximum_requests : Arc < Semaphore > ,
pub sender : mpsc ::UnboundedSender < ( Vec < u8 > , Vec < u8 > ) > ,
pub ( super ) waker : mpsc ::UnboundedSender < OutgoingKind > ,
}
enum TransactionStatus {
// Currently running, in outgoing loop
Running ,
Retrying {
tries : u32 ,
} ,
// Retrying, in timers loop
Failed {
tries : u32 ,
wake : Option < oneshot ::Sender < ( ) > > ,
} ,
}
fn make_timeout (
at : Instant ,
outgoing : Vec < u8 > ,
) -> ( oneshot ::Sender < ( ) > , impl Future < Output = Vec < u8 > > ) {
let ( tx , rx ) = oneshot ::channel ( ) ;
let at = tokio ::time ::Instant ::from_std ( at ) ;
( tx , async move {
let _ = tokio ::time ::timeout_at ( at , rx ) . await ;
outgoing
} )
}
impl Sending {
pub ( super ) fn start_handler (
& self ,
db : Arc < RwLock < Database > > ,
receiver : mpsc ::UnboundedReceiver < ( Vec < u8 > , Vec < u8 > ) > ,
waker : mpsc ::UnboundedReceiver < OutgoingKind > ,
) {
tokio ::spawn ( Self ::handler ( db , receiver , waker ) ) ;
}
async fn handler (
db : Arc < RwLock < Database > > ,
mut receiver : mpsc ::UnboundedReceiver < ( Vec < u8 > , Vec < u8 > ) > ,
mut waker : mpsc ::UnboundedReceiver < OutgoingKind > ,
) {
let mut outgoing = FuturesUnordered ::new ( ) ;
let mut timers = FuturesUnordered ::new ( ) ;
let mut current_transaction_status = HashMap ::< Vec < u8 > , TransactionStatus > ::new ( ) ;
// Retry requests we could not finish yet
{
let mut initial_transactions = HashMap ::< OutgoingKind , Vec < SendingEventType > > ::new ( ) ;
let guard = db . read ( ) . await ;
for ( key , outgoing_kind , event ) in guard
. sending
. servercurrentevent_data
. iter ( )
. filter_map ( | ( key , v ) | {
Self ::parse_servercurrentevent ( & key , v )
. ok ( )
. map ( | ( k , e ) | ( key , k , e ) )
} )
{
let entry = initial_transactions
. entry ( outgoing_kind . clone ( ) )
. or_insert_with ( Vec ::new ) ;
if entry . len ( ) > 30 {
warn ! (
"Dropping some current events: {:?} {:?} {:?}" ,
key , outgoing_kind , event
) ;
guard . sending . servercurrentevent_data . remove ( & key ) . unwrap ( ) ;
continue ;
}
entry . push ( event ) ;
}
drop ( guard ) ;
for ( outgoing_kind , events ) in initial_transactions {
current_transaction_status
. insert ( outgoing_kind . get_prefix ( ) , TransactionStatus ::Running ) ;
outgoing . push ( Self ::handle_events (
outgoing_kind . clone ( ) ,
events ,
Arc ::clone ( & db ) ,
) ) ;
}
}
loop {
select ! {
// New transactions to be sent out (from server/user activity)
Some ( ( key , value ) ) = receiver . next ( ) = > {
match Self ::parse_servercurrentevent ( & key , value ) {
Ok ( ( outgoing_kind , event ) ) = > {
let guard = db . read ( ) . await ;
match Self ::prepare_transaction (
& outgoing_kind ,
vec! [ ( event , key ) ] ,
& mut current_transaction_status ,
& guard ,
false ,
) {
Ok ( Some ( events ) ) = > {
outgoing . push ( Self ::handle_events ( outgoing_kind , events , Arc ::clone ( & db ) ) ) ;
}
Ok ( None ) = > {
debug ! ( "prepare_transaction produced not ready for {:?} when receiving event" , outgoing_kind )
}
Err ( err ) = > {
error ! ( "prepare_transaction produced an error for {:?}; {}" , outgoing_kind , err )
}
}
} ,
Err ( err ) = > {
error ! ( "parse_servercurrentevent produced an error for {:?}; {}" , key , err )
}
}
} ,
// Result of outgoing transactions
Some ( response ) = outgoing . next ( ) = > {
match response {
// Outgoing transaction succeeded
Ok ( outgoing_kind ) = > {
let guard = db . read ( ) . await ;
let sending = & guard . sending ;
// Clear all outgoing events that were just sent out.
let prefix = outgoing_kind . get_prefix ( ) ;
for ( key , _ ) in sending . servercurrentevent_data
. scan_prefix ( prefix . clone ( ) )
{
sending . servercurrentevent_data . remove ( & key ) . unwrap ( ) ;
}
// Find events that have been added since starting the last request
let new_events = sending . servernameevent_data
. scan_prefix ( prefix . clone ( ) )
. filter_map ( | ( k , v ) | {
Self ::parse_servercurrentevent ( & k , v ) . ok ( ) . map ( | ev | ( ev , k ) )
} )
. take ( 30 )
. collect ::< Vec < _ > > ( ) ;
// TODO: find edus
if ! new_events . is_empty ( ) {
// Insert pdus we found
for ( e , key ) in & new_events {
let value = if let SendingEventType ::Edu ( value ) = & e . 1 { & * * value } else { & [ ] } ;
sending . servercurrentevent_data . insert ( & key , value ) . unwrap ( ) ;
sending . servernameevent_data . remove ( & key ) . unwrap ( ) ;
}
// Clear retries
current_transaction_status . insert ( prefix , TransactionStatus ::Running ) ;
outgoing . push (
Self ::handle_events (
outgoing_kind . clone ( ) ,
new_events . into_iter ( ) . map ( | ( event , _ ) | event . 1 ) . collect ( ) ,
Arc ::clone ( & db ) ,
)
) ;
} else {
current_transaction_status . remove ( & prefix ) ;
}
}
// Outgoing transaction failed
Err ( ( outgoing_kind , err ) ) = > {
// Set status to Failed, create timer
let timer = Self ::generate_next_timer ( & mut current_transaction_status , outgoing_kind . get_prefix ( ) ) ;
// Add timer to loop
timers . push ( timer ) ;
let destination = match outgoing_kind . describe ( ) {
Ok ( d ) = > d ,
Err ( err ) = > {
warn ! ( "Could not describe {:?}: {}" , outgoing_kind , err ) ;
continue ;
} ,
} ;
warn ! ( "Outgoing request to {} failed: {}" , destination , err ) ;
}
} ;
} ,
// Transaction retry timers firing
Some ( prefix ) = timers . next ( ) = > {
match Self ::parse_servercurrentevent ( & prefix , vec! [ ] ) {
Ok ( ( outgoing_kind , _ ) ) = > {
let guard = db . read ( ) . await ;
// Transition Failed => Retrying, return pending old transaction events
match Self ::prepare_transaction (
& outgoing_kind ,
vec! [ ] , // will be ignored because retry == true anyways
& mut current_transaction_status ,
& guard ,
true ,
) {
Ok ( Some ( events ) ) = > {
outgoing . push ( Self ::handle_events ( outgoing_kind , events , Arc ::clone ( & db ) ) ) ;
}
Ok ( None ) = > {
// Unreachable because retry == true
unreachable! ( "prepare_transaction went over to Ok(None), while being fired by timer (for {:?})" , outgoing_kind )
}
Err ( err ) = > {
error ! ( "prepare_transaction produced an error for {:?}; {}" , outgoing_kind , err )
}
}
} ,
Err ( err ) = > {
error ! ( "parse_servercurrentevent produced an error for {:?}; {}" , prefix , err )
}
}
} ,
// Explicit wakeups, makes an in-flight timer return immediately
Some ( outgoing ) = waker . next ( ) = > {
let prefix = outgoing . get_prefix ( ) ;
if let Some ( TransactionStatus ::Failed { wake , .. } ) = current_transaction_status . get_mut ( & prefix ) {
if let Some ( wake ) = wake . take ( ) {
let _ = wake . send ( ( ) ) ;
}
}
} ,
}
}
}
/// Generates timer/oneshot, alters status to reflect Failed
///
/// Returns timer/oneshot future to wake up loop for next retry
fn generate_next_timer (
status : & mut HashMap < Vec < u8 > , TransactionStatus > ,
prefix : Vec < u8 > ,
) -> impl Future < Output = Vec < u8 > > {
let now = Instant ::now ( ) ;
let entry = status
. get_mut ( & prefix )
. expect ( "guaranteed to be set before this function" ) ;
let new_tries = match entry {
// Running -> Failed
TransactionStatus ::Running = > 1 ,
// Retrying -> Failed
TransactionStatus ::Retrying { tries } = > * tries + 1 ,
// The transition of Failed -> Retrying is handled in another loop, this is impossible
TransactionStatus ::Failed { .. } = > {
unreachable! ( "Request that was not even running failed?!" )
}
} ;
const ONE_DAY : Duration = Duration ::from_secs ( 60 * 60 * 24 ) ;
// Exponential backoff, clamp upper value to one day
let next_wakeup = now + ( Duration ::from_secs ( 30 ) * new_tries * new_tries ) . min ( ONE_DAY ) ;
let ( wake , fut ) = make_timeout ( next_wakeup , prefix ) ;
* entry = TransactionStatus ::Failed {
tries : new_tries ,
wake : Some ( wake ) ,
} ;
fut
}
/// Wake up any timers for a particular outgoing kind.
pub fn wake_outgoing ( & self , outgoing : OutgoingKind ) {
let _ = self . waker . unbounded_send ( outgoing ) ;
}
#[ tracing::instrument(skip(outgoing_kind, new_events, status, db)) ]
fn prepare_transaction (
outgoing_kind : & OutgoingKind ,
new_events : Vec < ( SendingEventType , Vec < u8 > ) > , // Events we want to send: event and full key
status : & mut HashMap < Vec < u8 > , TransactionStatus > ,
db : & Database ,
retry : bool ,
) -> Result < Option < Vec < SendingEventType > > > {
let prefix = outgoing_kind . get_prefix ( ) ;
// If we're not retrying, and there's already an entry in the hashmap (retrying, failed, or running), bail.
//
// Transitioning from Failed to Retrying is not this function's responsibility, it's the timer loop's job.
//
// When retrying, it's asserted that the hashmap exists and has Failed, if it doesn't, its an error state.
if ! retry & & status . get ( & prefix ) . is_some ( ) {
return Ok ( None ) ;
} else if retry {
// Retrying, asserting hashmap is Failed, bail otherwise.
let tries : u32 = match status . get ( & prefix ) {
Some ( txn_status ) = > match txn_status {
TransactionStatus ::Failed { tries , .. } = > Ok ( tries . clone ( ) ) ,
_ = > Err ( Error ::bad_state ( "state hashmap entry was not Failed" ) ) ,
} ,
None = > Err ( Error ::bad_state ( "state hashmap entry was absent" ) ) ,
} ? ;
// When transitioning Retrying => Failed, tries was already incremented, no need to do it here.
status . insert ( prefix . clone ( ) , TransactionStatus ::Retrying { tries } ) ;
} else {
// Not retrying, hashmap didnt exist
status . insert ( prefix . clone ( ) , TransactionStatus ::Running ) ;
}
let mut events = Vec ::new ( ) ;
if retry {
// We retry the previous transaction
for ( key , value ) in db . sending . servercurrentevent_data . scan_prefix ( prefix ) {
if let Ok ( ( _ , e ) ) = Self ::parse_servercurrentevent ( & key , value ) {
events . push ( e ) ;
}
}
} else {
for ( e , full_key ) in new_events {
let value = if let SendingEventType ::Edu ( value ) = & e {
& * * value
} else {
& [ ] [ .. ]
} ;
db . sending
. servercurrentevent_data
. insert ( & full_key , value ) ? ;
// If it was a PDU we have to unqueue it
// TODO: don't try to unqueue EDUs
db . sending . servernameevent_data . remove ( & full_key ) ? ;
events . push ( e ) ;
}
if let OutgoingKind ::Normal ( server_name ) = outgoing_kind {
if let Ok ( ( select_edus , last_count ) ) = Self ::select_edus ( db , server_name ) {
events . extend ( select_edus . into_iter ( ) . map ( SendingEventType ::Edu ) ) ;
db . sending
. servername_educount
. insert ( server_name . as_bytes ( ) , & last_count . to_be_bytes ( ) ) ? ;
}
}
}
Ok ( Some ( events ) )
}
#[ tracing::instrument(skip(db, server)) ]
pub fn select_edus ( db : & Database , server : & ServerName ) -> Result < ( Vec < Vec < u8 > > , u64 ) > {
// u64: count of last edu
let since = db
. sending
. servername_educount
. get ( server . as_bytes ( ) ) ?
. map_or ( Ok ( 0 ) , | bytes | {
utils ::u64_from_bytes ( & bytes )
. map_err ( | _ | Error ::bad_database ( "Invalid u64 in servername_educount." ) )
} ) ? ;
let mut events = Vec ::new ( ) ;
let mut max_edu_count = since ;
let mut device_list_changes = HashSet ::new ( ) ;
' outer : for room_id in db . rooms . server_rooms ( server ) {
let room_id = room_id ? ;
// Look for device list updates in this room
device_list_changes . extend (
db . users
. keys_changed ( & room_id . to_string ( ) , since , None )
. filter_map ( | r | r . ok ( ) )
. filter ( | user_id | user_id . server_name ( ) = = db . globals . server_name ( ) ) ,
) ;
// Look for read receipts in this room
for r in db . rooms . edus . readreceipts_since ( & room_id , since ) {
let ( user_id , count , read_receipt ) = r ? ;
if count > max_edu_count {
max_edu_count = count ;
}
if user_id . server_name ( ) ! = db . globals . server_name ( ) {
continue ;
}
let event =
serde_json ::from_str ::< AnySyncEphemeralRoomEvent > ( & read_receipt . json ( ) . get ( ) )
. map_err ( | _ | Error ::bad_database ( "Invalid edu event in read_receipts." ) ) ? ;
let federation_event = match event {
AnySyncEphemeralRoomEvent ::Receipt ( r ) = > {
let mut read = BTreeMap ::new ( ) ;
let ( event_id , mut receipt ) = r
. content
. 0
. into_iter ( )
. next ( )
. expect ( "we only use one event per read receipt" ) ;
let receipt = receipt
. remove ( & ReceiptType ::Read )
. expect ( "our read receipts always set this" )
. remove ( & user_id )
. expect ( "our read receipts always have the user here" ) ;
read . insert (
user_id ,
ReceiptData {
data : receipt . clone ( ) ,
event_ids : vec ! [ event_id . clone ( ) ] ,
} ,
) ;
let receipt_map = ReceiptMap { read } ;
let mut receipts = BTreeMap ::new ( ) ;
receipts . insert ( room_id . clone ( ) , receipt_map ) ;
Edu ::Receipt ( ReceiptContent { receipts } )
}
_ = > {
Error ::bad_database ( "Invalid event type in read_receipts" ) ;
continue ;
}
} ;
events . push ( serde_json ::to_vec ( & federation_event ) . expect ( "json can be serialized" ) ) ;
if events . len ( ) > = 20 {
break 'outer ;
}
}
}
for user_id in device_list_changes {
// Empty prev id forces synapse to resync: https://github.com/matrix-org/synapse/blob/98aec1cc9da2bd6b8e34ffb282c85abf9b8b42ca/synapse/handlers/device.py#L767
// Because synapse resyncs, we can just insert dummy data
let edu = Edu ::DeviceListUpdate ( DeviceListUpdateContent {
user_id ,
device_id : device_id ! ( "dummy" ) ,
device_display_name : "Dummy" . to_owned ( ) ,
stream_id : uint ! ( 1 ) ,
prev_id : Vec ::new ( ) ,
deleted : None ,
keys : None ,
} ) ;
events . push ( serde_json ::to_vec ( & edu ) . expect ( "json can be serialized" ) ) ;
}
Ok ( ( events , max_edu_count ) )
}
#[ tracing::instrument(skip(self, pdu_id, senderkey)) ]
pub fn send_push_pdu ( & self , pdu_id : & [ u8 ] , senderkey : Vec < u8 > ) -> Result < ( ) > {
let mut key = b" $ " . to_vec ( ) ;
key . extend_from_slice ( & senderkey ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( pdu_id ) ;
self . servernameevent_data . insert ( & key , & [ ] ) ? ;
self . sender . unbounded_send ( ( key , vec! [ ] ) ) . unwrap ( ) ;
Ok ( ( ) )
}
#[ tracing::instrument(skip(self, server, pdu_id)) ]
pub fn send_pdu ( & self , server : & ServerName , pdu_id : & [ u8 ] ) -> Result < ( ) > {
let mut key = server . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( pdu_id ) ;
self . servernameevent_data . insert ( & key , & [ ] ) ? ;
self . sender . unbounded_send ( ( key , vec! [ ] ) ) . unwrap ( ) ;
Ok ( ( ) )
}
#[ tracing::instrument(skip(self, server, serialized)) ]
pub fn send_reliable_edu (
& self ,
server : & ServerName ,
serialized : Vec < u8 > ,
id : u64 ,
) -> Result < ( ) > {
let mut key = server . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( & id . to_be_bytes ( ) ) ;
self . servernameevent_data . insert ( & key , & serialized ) ? ;
self . sender . unbounded_send ( ( key , serialized ) ) . unwrap ( ) ;
Ok ( ( ) )
}
#[ tracing::instrument(skip(self)) ]
pub fn send_pdu_appservice ( & self , appservice_id : & str , pdu_id : & [ u8 ] ) -> Result < ( ) > {
let mut key = b" + " . to_vec ( ) ;
key . extend_from_slice ( appservice_id . as_bytes ( ) ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( pdu_id ) ;
self . servernameevent_data . insert ( & key , & [ ] ) ? ;
self . sender . unbounded_send ( ( key , vec! [ ] ) ) . unwrap ( ) ;
Ok ( ( ) )
}
#[ tracing::instrument(skip(keys)) ]
fn calculate_hash ( keys : & [ & [ u8 ] ] ) -> Vec < u8 > {
// We only hash the pdu's event ids, not the whole pdu
let bytes = keys . join ( & 0xff ) ;
let hash = digest ::digest ( & digest ::SHA256 , & bytes ) ;
hash . as_ref ( ) . to_owned ( )
}
#[ tracing::instrument(skip(db, events, kind)) ]
async fn handle_events (
kind : OutgoingKind ,
events : Vec < SendingEventType > ,
db : Arc < RwLock < Database > > ,
) -> std ::result ::Result < OutgoingKind , ( OutgoingKind , Error ) > {
let db = db . read ( ) . await ;
match & kind {
OutgoingKind ::Appservice ( server ) = > {
let mut pdu_jsons = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
pdu_jsons . push ( db . rooms
. get_pdu_from_id ( & pdu_id )
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
. ok_or_else ( | | {
(
kind . clone ( ) ,
Error ::bad_database (
"[Appservice] Event in servernameevent_data not found in db." ,
) ,
)
} ) ?
. to_room_event ( ) )
}
SendingEventType ::Edu ( _ ) = > {
// Appservices don't need EDUs (?)
}
}
}
let permit = db . sending . maximum_requests . acquire ( ) . await ;
let response = appservice_server ::send_request (
& db . globals ,
db . appservice
. get_registration ( server . as_str ( ) )
. unwrap ( )
. unwrap ( ) , // TODO: handle error
appservice ::event ::push_events ::v1 ::Request {
events : & pdu_jsons ,
txn_id : & base64 ::encode_config (
Self ::calculate_hash (
& events
. iter ( )
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) = > & * * b ,
} )
. collect ::< Vec < _ > > ( ) ,
) ,
base64 ::URL_SAFE_NO_PAD ,
) ,
} ,
)
. await
. map ( | _response | kind . clone ( ) )
. map_err ( | e | ( kind , e ) ) ;
drop ( permit ) ;
response
}
OutgoingKind ::Push ( user , pushkey ) = > {
let mut pdus = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
pdus . push (
db . rooms
. get_pdu_from_id ( & pdu_id )
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
. ok_or_else ( | | {
(
kind . clone ( ) ,
Error ::bad_database (
"[Push] Event in servernamevent_datas not found in db." ,
) ,
)
} ) ? ,
) ;
}
SendingEventType ::Edu ( _ ) = > {
// Push gateways don't need EDUs (?)
}
}
}
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
if pdu . unsigned . get ( "redacted_because" ) . is_some ( ) {
continue ;
}
let userid =
UserId ::try_from ( utils ::string_from_bytes ( user ) . map_err ( | _ | {
(
kind . clone ( ) ,
Error ::bad_database ( "Invalid push user string in db." ) ,
)
} ) ? )
. map_err ( | _ | {
(
kind . clone ( ) ,
Error ::bad_database ( "Invalid push user id in db." ) ,
)
} ) ? ;
let mut senderkey = user . clone ( ) ;
senderkey . push ( 0xff ) ;
senderkey . extend_from_slice ( pushkey ) ;
let pusher = match db
. pusher
. get_pusher ( & senderkey )
. map_err ( | e | ( OutgoingKind ::Push ( user . clone ( ) , pushkey . clone ( ) ) , e ) ) ?
{
Some ( pusher ) = > pusher ,
None = > continue ,
} ;
let rules_for_user = db
. account_data
. get ::< push_rules ::PushRulesEvent > ( None , & userid , EventType ::PushRules )
. unwrap_or_default ( )
. map ( | ev | ev . content . global )
. unwrap_or_else ( | | push ::Ruleset ::server_default ( & userid ) ) ;
let unread : UInt = db
. rooms
. notification_count ( & userid , & pdu . room_id )
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
. try_into ( )
. expect ( "notifiation count can't go that high" ) ;
let permit = db . sending . maximum_requests . acquire ( ) . await ;
let _response = pusher ::send_push_notice (
& userid ,
unread ,
& pusher ,
rules_for_user ,
& pdu ,
& db ,
)
. await
. map ( | _response | kind . clone ( ) )
. map_err ( | e | ( kind . clone ( ) , e ) ) ;
drop ( permit ) ;
}
Ok ( OutgoingKind ::Push ( user . clone ( ) , pushkey . clone ( ) ) )
}
OutgoingKind ::Normal ( server ) = > {
let mut edu_jsons = Vec ::new ( ) ;
let mut pdu_jsons = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
// TODO: check room version and remove event_id if needed
let raw = PduEvent ::convert_to_outgoing_federation_event (
db . rooms
. get_pdu_json_from_id ( & pdu_id )
. map_err ( | e | ( OutgoingKind ::Normal ( server . clone ( ) ) , e ) ) ?
. ok_or_else ( | | {
(
OutgoingKind ::Normal ( server . clone ( ) ) ,
Error ::bad_database (
"[Normal] Event in servernamevent_datas not found in db." ,
) ,
)
} ) ? ,
) ;
pdu_jsons . push ( raw ) ;
}
SendingEventType ::Edu ( edu ) = > {
if let Ok ( raw ) = serde_json ::from_slice ( edu ) {
edu_jsons . push ( raw ) ;
}
}
}
}
let permit = db . sending . maximum_requests . acquire ( ) . await ;
let response = server_server ::send_request (
& db . globals ,
& * server ,
send_transaction_message ::v1 ::Request {
origin : db . globals . server_name ( ) ,
pdus : & pdu_jsons ,
edus : & edu_jsons ,
origin_server_ts : MilliSecondsSinceUnixEpoch ::now ( ) ,
transaction_id : & base64 ::encode_config (
Self ::calculate_hash (
& events
. iter ( )
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) = > & * * b ,
} )
. collect ::< Vec < _ > > ( ) ,
) ,
base64 ::URL_SAFE_NO_PAD ,
) ,
} ,
)
. await
. map ( | response | {
for pdu in response . pdus {
if pdu . 1. is_err ( ) {
warn ! ( "Failed to send to {}: {:?}" , server , pdu ) ;
}
}
kind . clone ( )
} )
. map_err ( | e | ( kind , e ) ) ;
drop ( permit ) ;
response
}
}
}
#[ tracing::instrument(skip(key)) ]
fn parse_servercurrentevent (
key : & [ u8 ] ,
value : Vec < u8 > ,
) -> Result < ( OutgoingKind , SendingEventType ) > {
// Appservices start with a plus
Ok ::< _ , Error > ( if key . starts_with ( b" + " ) {
let mut parts = key [ 1 .. ] . splitn ( 2 , | & b | b = = 0xff ) ;
let server = parts . next ( ) . expect ( "splitn always returns one element" ) ;
let event = parts
. next ( )
. ok_or_else ( | | Error ::bad_database ( "Invalid bytes in servercurrentpdus." ) ) ? ;
let server = utils ::string_from_bytes ( & server ) . map_err ( | _ | {
Error ::bad_database ( "Invalid server bytes in server_currenttransaction" )
} ) ? ;
(
OutgoingKind ::Appservice ( Box ::< ServerName > ::try_from ( server ) . map_err ( | _ | {
Error ::bad_database ( "Invalid server string in server_currenttransaction" )
} ) ? ) ,
if value . is_empty ( ) {
SendingEventType ::Pdu ( event . to_vec ( ) )
} else {
SendingEventType ::Edu ( value )
} ,
)
} else if key . starts_with ( b" $ " ) {
let mut parts = key [ 1 .. ] . splitn ( 3 , | & b | b = = 0xff ) ;
let user = parts . next ( ) . expect ( "splitn always returns one element" ) ;
let pushkey = parts
. next ( )
. ok_or_else ( | | Error ::bad_database ( "Invalid bytes in servercurrentpdus." ) ) ? ;
let event = parts
. next ( )
. ok_or_else ( | | Error ::bad_database ( "Invalid bytes in servercurrentpdus." ) ) ? ;
(
OutgoingKind ::Push ( user . to_vec ( ) , pushkey . to_vec ( ) ) ,
if value . is_empty ( ) {
SendingEventType ::Pdu ( event . to_vec ( ) )
} else {
SendingEventType ::Edu ( value )
} ,
)
} else {
let mut parts = key . splitn ( 2 , | & b | b = = 0xff ) ;
let server = parts . next ( ) . expect ( "splitn always returns one element" ) ;
let event = parts
. next ( )
. ok_or_else ( | | Error ::bad_database ( "Invalid bytes in servercurrentpdus." ) ) ? ;
let server = utils ::string_from_bytes ( & server ) . map_err ( | _ | {
Error ::bad_database ( "Invalid server bytes in server_currenttransaction" )
} ) ? ;
(
OutgoingKind ::Normal ( Box ::< ServerName > ::try_from ( server ) . map_err ( | _ | {
Error ::bad_database ( "Invalid server string in server_currenttransaction" )
} ) ? ) ,
if value . is_empty ( ) {
SendingEventType ::Pdu ( event . to_vec ( ) )
} else {
SendingEventType ::Edu ( value )
} ,
)
} )
}
#[ tracing::instrument(skip(self, globals, destination, request)) ]
pub async fn send_federation_request < T : OutgoingRequest > (
& self ,
globals : & crate ::database ::globals ::Globals ,
destination : & ServerName ,
request : T ,
) -> Result < T ::IncomingResponse >
where
T : Debug ,
{
let permit = self . maximum_requests . acquire ( ) . await ;
let response = server_server ::send_request ( globals , destination , request ) . await ;
drop ( permit ) ;
response
}
#[ tracing::instrument(skip(self, globals, registration, request)) ]
pub async fn send_appservice_request < T : OutgoingRequest > (
& self ,
globals : & crate ::database ::globals ::Globals ,
registration : serde_yaml ::Value ,
request : T ,
) -> Result < T ::IncomingResponse >
where
T : Debug ,
{
let permit = self . maximum_requests . acquire ( ) . await ;
let response = appservice_server ::send_request ( globals , registration , request ) . await ;
drop ( permit ) ;
response
}
}