From 40ffc9ac02897bfb7ea60db54f3cd36657956b71 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Wed, 1 Sep 2021 17:47:57 +0200 Subject: [PATCH] make federation retry timer-based --- src/database.rs | 4 +- src/database/sending.rs | 365 +++++++++++++++++++++++++++++----------- src/error.rs | 9 + 3 files changed, 276 insertions(+), 102 deletions(-) diff --git a/src/database.rs b/src/database.rs index 6ea0abd..0880bc6 100644 --- a/src/database.rs +++ b/src/database.rs @@ -206,6 +206,7 @@ impl Database { let (admin_sender, admin_receiver) = mpsc::unbounded(); let (sending_sender, sending_receiver) = mpsc::unbounded(); + let (waking_sender, waking_receiver) = mpsc::unbounded(); let db = Arc::new(TokioRwLock::from(Self { _db: builder.clone(), @@ -323,6 +324,7 @@ impl Database { servercurrentevent_data: builder.open_tree("servercurrentevent_data")?, maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)), sender: sending_sender, + waker: waking_sender, }, admin: admin::Admin { sender: admin_sender, @@ -746,7 +748,7 @@ impl Database { guard.admin.start_handler(Arc::clone(&db), admin_receiver); guard .sending - .start_handler(Arc::clone(&db), sending_receiver); + .start_handler(Arc::clone(&db), sending_receiver, waking_receiver); drop(guard); diff --git a/src/database/sending.rs b/src/database/sending.rs index 1050c07..637a83e 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -2,6 +2,7 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, convert::{TryFrom, TryInto}, fmt::Debug, + future::Future, sync::Arc, time::{Duration, Instant}, }; @@ -34,9 +35,9 @@ use ruma::{ }; use tokio::{ select, - sync::{RwLock, Semaphore}, + sync::{oneshot, RwLock, Semaphore}, }; -use tracing::{error, warn}; +use tracing::{debug, error, warn}; use super::abstraction::Tree; @@ -73,6 +74,21 @@ impl OutgoingKind { prefix } + + fn describe(&self) -> std::result::Result { + Ok(match self { + OutgoingKind::Appservice(appservice_id) => { + format!("Appservice (ID {})", appservice_id) + } + OutgoingKind::Push(user, _) => { + let user = utils::string_from_bytes(user)?; + format!("User Push Service (for {})", user) + } + OutgoingKind::Normal(server) => { + format!("Matrix Server ({})", server) + } + }) + } } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -88,26 +104,59 @@ pub struct Sending { pub(super) servercurrentevent_data: Arc, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for edus), Data = EDU content pub(super) maximum_requests: Arc, pub sender: mpsc::UnboundedSender<(Vec, Vec)>, + pub(super) waker: mpsc::UnboundedSender, } enum TransactionStatus { + // Currently running, in outgoing loop Running, - Failed(u32, Instant), // number of times failed, time of last failure - Retrying(u32), // number of times failed + Retrying { + tries: u32, + }, + // Retrying, in timers loop + Failed { + tries: u32, + wake: Option>, + }, +} + +fn make_timeout( + at: Instant, + outgoing: Vec, +) -> (oneshot::Sender<()>, impl Future>) { + let (tx, rx) = oneshot::channel(); + let at = tokio::time::Instant::from_std(at); + + (tx, async move { + let _ = tokio::time::timeout_at(at, rx).await; + + outgoing + }) } impl Sending { - pub fn start_handler( + pub(super) fn start_handler( &self, + db: Arc>, + receiver: mpsc::UnboundedReceiver<(Vec, Vec)>, + waker: mpsc::UnboundedReceiver, + ) { + tokio::spawn(Self::handler(db, receiver, waker)); + } + + async fn handler( db: Arc>, mut receiver: mpsc::UnboundedReceiver<(Vec, Vec)>, + mut waker: mpsc::UnboundedReceiver, ) { - tokio::spawn(async move { - let mut futures = FuturesUnordered::new(); + let mut outgoing = FuturesUnordered::new(); + + let mut timers = FuturesUnordered::new(); - let mut current_transaction_status = HashMap::, TransactionStatus>::new(); + let mut current_transaction_status = HashMap::, TransactionStatus>::new(); - // Retry requests we could not finish yet + // Retry requests we could not finish yet + { let mut initial_transactions = HashMap::>::new(); let guard = db.read().await; @@ -143,127 +192,241 @@ impl Sending { for (outgoing_kind, events) in initial_transactions { current_transaction_status .insert(outgoing_kind.get_prefix(), TransactionStatus::Running); - futures.push(Self::handle_events( + outgoing.push(Self::handle_events( outgoing_kind.clone(), events, Arc::clone(&db), )); } + } + + loop { + select! { + // New transactions to be sent out (from server/user activity) + Some((key, value)) = receiver.next() => { + match Self::parse_servercurrentevent(&key, value) { + Ok((outgoing_kind, event)) => { + let guard = db.read().await; - loop { - select! { - Some(response) = futures.next() => { - match response { - Ok(outgoing_kind) => { - let guard = db.read().await; - - let prefix = outgoing_kind.get_prefix(); - for (key, _) in guard.sending.servercurrentevent_data - .scan_prefix(prefix.clone()) - { - guard.sending.servercurrentevent_data.remove(&key).unwrap(); + match Self::prepare_transaction( + &outgoing_kind, + vec![(event, key)], + &mut current_transaction_status, + &guard, + false, + ) { + Ok(Some(events)) => { + outgoing.push(Self::handle_events(outgoing_kind, events, Arc::clone(&db))); + } + Ok(None) => { + debug!("prepare_transaction produced not ready for {:?} when receiving event", outgoing_kind) } - // Find events that have been added since starting the last request - let new_events = guard.sending.servernameevent_data - .scan_prefix(prefix.clone()) - .filter_map(|(k, v)| { - Self::parse_servercurrentevent(&k, v).ok().map(|ev| (ev, k)) - }) - .take(30) - .collect::>(); - - // TODO: find edus - - if !new_events.is_empty() { - // Insert pdus we found - for (e, key) in &new_events { - let value = if let SendingEventType::Edu(value) = &e.1 { &**value } else { &[] }; - guard.sending.servercurrentevent_data.insert(&key, value).unwrap(); - guard.sending.servernameevent_data.remove(&key).unwrap(); - } - - drop(guard); - - futures.push( - Self::handle_events( - outgoing_kind.clone(), - new_events.into_iter().map(|(event, _)| event.1).collect(), - Arc::clone(&db), - ) - ); - } else { - current_transaction_status.remove(&prefix); + Err(err) => { + error!("prepare_transaction produced an error for {:?}; {}", outgoing_kind, err) } } - Err((outgoing_kind, _)) => { - current_transaction_status.entry(outgoing_kind.get_prefix()).and_modify(|e| *e = match e { - TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), - TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), - TransactionStatus::Failed(_, _) => { - error!("Request that was not even running failed?!"); - return - }, - }); + }, + Err(err) => { + error!("parse_servercurrentevent produced an error for {:?}; {}", key, err) + } + } + }, + + // Result of outgoing transactions + Some(response) = outgoing.next() => { + match response { + + // Outgoing transaction succeeded + Ok(outgoing_kind) => { + let guard = db.read().await; + let sending = &guard.sending; + + // Clear all outgoing events that were just sent out. + let prefix = outgoing_kind.get_prefix(); + for (key, _) in sending.servercurrentevent_data + .scan_prefix(prefix.clone()) + { + sending.servercurrentevent_data.remove(&key).unwrap(); } - }; - }, - Some((key, value)) = receiver.next() => { - if let Ok((outgoing_kind, event)) = Self::parse_servercurrentevent(&key, value) { + + // Find events that have been added since starting the last request + let new_events = sending.servernameevent_data + .scan_prefix(prefix.clone()) + .filter_map(|(k, v)| { + Self::parse_servercurrentevent(&k, v).ok().map(|ev| (ev, k)) + }) + .take(30) + .collect::>(); + + // TODO: find edus + + if !new_events.is_empty() { + // Insert pdus we found + for (e, key) in &new_events { + let value = if let SendingEventType::Edu(value) = &e.1 { &**value } else { &[] }; + sending.servercurrentevent_data.insert(&key, value).unwrap(); + sending.servernameevent_data.remove(&key).unwrap(); + } + + // Clear retries + current_transaction_status.insert(prefix, TransactionStatus::Running); + + outgoing.push( + Self::handle_events( + outgoing_kind.clone(), + new_events.into_iter().map(|(event, _)| event.1).collect(), + Arc::clone(&db), + ) + ); + } else { + current_transaction_status.remove(&prefix); + } + } + + // Outgoing transaction failed + Err((outgoing_kind, err)) => { + // Set status to Failed, create timer + let timer = Self::generate_next_timer(&mut current_transaction_status, outgoing_kind.get_prefix()); + + // Add timer to loop + timers.push(timer); + + let destination = match outgoing_kind.describe() { + Ok(d) => d, + Err(err) => { + warn!("Could not describe {:?}: {}", outgoing_kind, err); + continue; + }, + }; + + warn!("Outgoing request to {} failed: {}", destination, err); + } + }; + }, + + // Transaction retry timers firing + Some(prefix) = timers.next() => { + match Self::parse_servercurrentevent(&prefix, vec![]) { + Ok((outgoing_kind, _)) => { let guard = db.read().await; - if let Ok(Some(events)) = Self::select_events( + // Transition Failed => Retrying, return pending old transaction events + match Self::prepare_transaction( &outgoing_kind, - vec![(event, key)], + vec![], // will be ignored because retry == true anyways &mut current_transaction_status, - &guard + &guard, + true, ) { - futures.push(Self::handle_events(outgoing_kind, events, Arc::clone(&db))); + Ok(Some(events)) => { + outgoing.push(Self::handle_events(outgoing_kind, events, Arc::clone(&db))); + } + Ok(None) => { + // Unreachable because retry == true + unreachable!("prepare_transaction went over to Ok(None), while being fired by timer (for {:?})", outgoing_kind) + } + + Err(err) => { + error!("prepare_transaction produced an error for {:?}; {}", outgoing_kind, err) + } } + }, + Err(err) => { + error!("parse_servercurrentevent produced an error for {:?}; {}", prefix, err) } } - } + }, + + // Explicit wakeups, makes an in-flight timer return immediately + Some(outgoing) = waker.next() => { + let prefix = outgoing.get_prefix(); + if let Some(TransactionStatus::Failed { wake, .. }) = current_transaction_status.get_mut(&prefix) { + if let Some(wake) = wake.take() { + let _ = wake.send(()); + } + } + }, } - }); + } } - #[tracing::instrument(skip(outgoing_kind, new_events, current_transaction_status, db))] - fn select_events( + /// Generates timer/oneshot, alters status to reflect Failed + /// + /// Returns timer/oneshot future to wake up loop for next retry + fn generate_next_timer( + status: &mut HashMap, TransactionStatus>, + prefix: Vec, + ) -> impl Future> { + let now = Instant::now(); + + let entry = status + .get_mut(&prefix) + .expect("guaranteed to be set before this function"); + + let new_tries = match entry { + // Running -> Failed + TransactionStatus::Running => 1, + // Retrying -> Failed + TransactionStatus::Retrying { tries } => *tries + 1, + + // The transition of Failed -> Retrying is handled in another loop, this is impossible + TransactionStatus::Failed { .. } => { + unreachable!("Request that was not even running failed?!") + } + }; + + const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24); + + // Exponential backoff, clamp upper value to one day + let next_wakeup = now + (Duration::from_secs(30) * new_tries * new_tries).min(ONE_DAY); + + let (wake, fut) = make_timeout(next_wakeup, prefix); + + *entry = TransactionStatus::Failed { + tries: new_tries, + wake: Some(wake), + }; + + fut + } + + /// Wake up any timers for a particular outgoing kind. + pub fn wake_outgoing(&self, outgoing: OutgoingKind) { + let _ = self.waker.unbounded_send(outgoing); + } + + #[tracing::instrument(skip(outgoing_kind, new_events, status, db))] + fn prepare_transaction( outgoing_kind: &OutgoingKind, new_events: Vec<(SendingEventType, Vec)>, // Events we want to send: event and full key - current_transaction_status: &mut HashMap, TransactionStatus>, + status: &mut HashMap, TransactionStatus>, db: &Database, + retry: bool, ) -> Result>> { - let mut retry = false; - let mut allow = true; - let prefix = outgoing_kind.get_prefix(); - let entry = current_transaction_status.entry(prefix.clone()); - - entry - .and_modify(|e| match e { - TransactionStatus::Running | TransactionStatus::Retrying(_) => { - allow = false; // already running - } - TransactionStatus::Failed(tries, time) => { - // Fail if a request has failed recently (exponential backoff) - let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { - min_elapsed_duration = Duration::from_secs(60 * 60 * 24); - } - if time.elapsed() < min_elapsed_duration { - allow = false; - } else { - retry = true; - *e = TransactionStatus::Retrying(*tries); - } - } - }) - .or_insert(TransactionStatus::Running); - - if !allow { + // If we're not retrying, and there's already an entry in the hashmap (retrying, failed, or running), bail. + // + // Transitioning from Failed to Retrying is not this function's responsibility, it's the timer loop's job. + // + // When retrying, it's asserted that the hashmap exists and has Failed, if it doesn't, its an error state. + if !retry && status.get(&prefix).is_some() { return Ok(None); + } else if retry { + // Retrying, asserting hashmap is Failed, bail otherwise. + let tries: u32 = match status.get(&prefix) { + Some(txn_status) => match txn_status { + TransactionStatus::Failed { tries, .. } => Ok(tries.clone()), + _ => Err(Error::bad_state("state hashmap entry was not Failed")), + }, + None => Err(Error::bad_state("state hashmap entry was absent")), + }?; + // When transitioning Retrying => Failed, tries was already incremented, no need to do it here. + status.insert(prefix.clone(), TransactionStatus::Retrying { tries }); + } else { + // Not retrying, hashmap didnt exist + status.insert(prefix.clone(), TransactionStatus::Running); } let mut events = Vec::new(); diff --git a/src/error.rs b/src/error.rs index 1ecef3a..dcec0e2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -59,10 +59,14 @@ pub enum Error { #[error("{0}")] BadServerResponse(&'static str), #[error("{0}")] + /// Don't create this directly. Use Error::bad_config instead. BadConfig(&'static str), #[error("{0}")] /// Don't create this directly. Use Error::bad_database instead. BadDatabase(&'static str), + #[error("{0}")] + /// Don't create this directly. Use Error::bad_state instead. + BadState(&'static str), #[error("uiaa")] Uiaa(UiaaInfo), #[error("{0}: {1}")] @@ -81,6 +85,11 @@ impl Error { error!("BadConfig: {}", message); Self::BadConfig(message) } + + pub fn bad_state(message: &'static str) -> Self { + error!("BadState: {}", message); + Self::BadState(message) + } } impl Error {