diff --git a/src/appservice_server.rs b/src/appservice_server.rs index 7868e45..9fc7dce 100644 --- a/src/appservice_server.rs +++ b/src/appservice_server.rs @@ -46,7 +46,11 @@ where *reqwest_request.timeout_mut() = Some(Duration::from_secs(30)); let url = reqwest_request.url().clone(); - let mut response = globals.reqwest_client().execute(reqwest_request).await?; + let mut response = globals + .reqwest_client()? + .build()? + .execute(reqwest_request) + .await?; // reqwest::Response -> http::Response conversion let status = response.status(); diff --git a/src/client_server/keys.rs b/src/client_server/keys.rs index 8db7688..0815737 100644 --- a/src/client_server/keys.rs +++ b/src/client_server/keys.rs @@ -1,5 +1,6 @@ use super::SESSION_ID_LENGTH; use crate::{database::DatabaseGuard, utils, ConduitResult, Database, Error, Result, Ruma}; +use rocket::futures::{prelude::*, stream::FuturesUnordered}; use ruma::{ api::{ client::{ @@ -18,7 +19,7 @@ use ruma::{ DeviceId, DeviceKeyAlgorithm, UserId, }; use serde_json::json; -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; #[cfg(feature = "conduit_bin")] use rocket::{get, post}; @@ -294,7 +295,7 @@ pub async fn get_keys_helper bool>( let mut user_signing_keys = BTreeMap::new(); let mut device_keys = BTreeMap::new(); - let mut get_over_federation = BTreeMap::new(); + let mut get_over_federation = HashMap::new(); for (user_id, device_ids) in device_keys_input { if user_id.server_name() != db.globals.server_name() { @@ -364,22 +365,30 @@ pub async fn get_keys_helper bool>( let mut failures = BTreeMap::new(); - for (server, vec) in get_over_federation { - let mut device_keys_input_fed = BTreeMap::new(); - for (user_id, keys) in vec { - device_keys_input_fed.insert(user_id.clone(), keys.clone()); - } - match db - .sending - .send_federation_request( - &db.globals, + let mut futures = get_over_federation + .into_iter() + .map(|(server, vec)| async move { + let mut device_keys_input_fed = BTreeMap::new(); + for (user_id, keys) in vec { + device_keys_input_fed.insert(user_id.clone(), keys.clone()); + } + ( server, - federation::keys::get_keys::v1::Request { - device_keys: device_keys_input_fed, - }, + db.sending + .send_federation_request( + &db.globals, + server, + federation::keys::get_keys::v1::Request { + device_keys: device_keys_input_fed, + }, + ) + .await, ) - .await - { + }) + .collect::>(); + + while let Some((server, response)) = futures.next().await { + match response { Ok(response) => { master_keys.extend(response.master_keys); self_signing_keys.extend(response.self_signing_keys); @@ -430,13 +439,15 @@ pub async fn claim_keys_helper( one_time_keys.insert(user_id.clone(), container); } + let mut failures = BTreeMap::new(); + for (server, vec) in get_over_federation { let mut one_time_keys_input_fed = BTreeMap::new(); for (user_id, keys) in vec { one_time_keys_input_fed.insert(user_id.clone(), keys.clone()); } // Ignore failures - let keys = db + if let Ok(keys) = db .sending .send_federation_request( &db.globals, @@ -445,13 +456,16 @@ pub async fn claim_keys_helper( one_time_keys: one_time_keys_input_fed, }, ) - .await?; - - one_time_keys.extend(keys.one_time_keys); + .await + { + one_time_keys.extend(keys.one_time_keys); + } else { + failures.insert(server.to_string(), json!({})); + } } Ok(claim_keys::Response { - failures: BTreeMap::new(), + failures, one_time_keys, }) } diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 29926e3..e6ebf84 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -69,6 +69,7 @@ pub async fn join_room_by_id_route( &db, body.sender_user.as_ref(), &body.room_id, + body.reason.clone(), &servers, body.third_party_signed.as_ref(), ) @@ -120,6 +121,7 @@ pub async fn join_room_by_id_or_alias_route( &db, body.sender_user.as_ref(), &room_id, + body.reason.clone(), &servers, body.third_party_signed.as_ref(), ) @@ -144,7 +146,9 @@ pub async fn leave_room_route( ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - db.rooms.leave_room(sender_user, &body.room_id, &db).await?; + db.rooms + .leave_room(sender_user, &body.room_id, body.reason.clone(), &db) + .await?; db.flush()?; @@ -163,7 +167,15 @@ pub async fn invite_user_route( let sender_user = body.sender_user.as_ref().expect("user is authenticated"); if let invite_user::IncomingInvitationRecipient::UserId { user_id } = &body.recipient { - invite_helper(sender_user, user_id, &body.room_id, &db, false).await?; + invite_helper( + sender_user, + user_id, + &body.room_id, + body.reason.clone(), + &db, + false, + ) + .await?; db.flush()?; Ok(invite_user::Response {}.into()) } else { @@ -201,7 +213,7 @@ pub async fn kick_user_route( .map_err(|_| Error::bad_database("Invalid member event in database."))?; event.membership = ruma::events::room::member::MembershipState::Leave; - // TODO: reason + event.reason = body.reason.clone(); let mutex_state = Arc::clone( db.globals @@ -245,8 +257,6 @@ pub async fn ban_user_route( ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - // TODO: reason - let event = db .rooms .room_state_get( @@ -262,7 +272,7 @@ pub async fn ban_user_route( is_direct: None, third_party_invite: None, blurhash: db.users.blurhash(&body.user_id)?, - reason: None, + reason: body.reason.clone(), }), |event| { let mut event = serde_json::from_value::>( @@ -272,6 +282,7 @@ pub async fn ban_user_route( .deserialize() .map_err(|_| Error::bad_database("Invalid member event in database."))?; event.membership = ruma::events::room::member::MembershipState::Ban; + event.reason = body.reason.clone(); Ok(event) }, )?; @@ -337,6 +348,7 @@ pub async fn unban_user_route( .map_err(|_| Error::bad_database("Invalid member event in database."))?; event.membership = ruma::events::room::member::MembershipState::Leave; + event.reason = body.reason.clone(); let mutex_state = Arc::clone( db.globals @@ -482,6 +494,7 @@ async fn join_room_by_id_helper( db: &Database, sender_user: Option<&UserId>, room_id: &RoomId, + reason: Option, servers: &HashSet>, _third_party_signed: Option<&IncomingThirdPartySigned>, ) -> ConduitResult { @@ -564,7 +577,7 @@ async fn join_room_by_id_helper( is_direct: None, third_party_invite: None, blurhash: db.users.blurhash(&sender_user)?, - reason: None, + reason, }) .expect("event is valid, we just created it"), ); @@ -714,7 +727,7 @@ async fn join_room_by_id_helper( is_direct: None, third_party_invite: None, blurhash: db.users.blurhash(&sender_user)?, - reason: None, + reason, }; db.rooms.build_and_append_pdu( @@ -807,6 +820,7 @@ pub async fn invite_helper<'a>( sender_user: &UserId, user_id: &UserId, room_id: &RoomId, + reason: Option, db: &Database, is_direct: bool, ) -> Result<()> { @@ -869,7 +883,7 @@ pub async fn invite_helper<'a>( membership: MembershipState::Invite, third_party_invite: None, blurhash: None, - reason: None, + reason, }) .expect("member event is valid value"); @@ -1064,7 +1078,7 @@ pub async fn invite_helper<'a>( is_direct: Some(is_direct), third_party_invite: None, blurhash: db.users.blurhash(&user_id)?, - reason: None, + reason, }) .expect("event is valid, we just created it"), unsigned: None, diff --git a/src/database/globals.rs b/src/database/globals.rs index 823ce34..9fdc130 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -1,4 +1,4 @@ -use crate::{database::Config, utils, ConduitResult, Error, Result}; +use crate::{database::Config, server_server::FedDest, utils, ConduitResult, Error, Result}; use ruma::{ api::{ client::r0::sync::sync_events, @@ -6,25 +6,25 @@ use ruma::{ }, DeviceId, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName, ServerSigningKeyId, UserId, }; -use rustls::{ServerCertVerifier, WebPKIVerifier}; use std::{ collections::{BTreeMap, HashMap}, fs, future::Future, + net::IpAddr, path::PathBuf, sync::{Arc, Mutex, RwLock}, time::{Duration, Instant}, }; use tokio::sync::{broadcast, watch::Receiver, Mutex as TokioMutex, Semaphore}; -use tracing::{error, info}; +use tracing::error; use trust_dns_resolver::TokioAsyncResolver; use super::abstraction::Tree; pub const COUNTER: &[u8] = b"c"; -type WellKnownMap = HashMap, (String, String)>; -type TlsNameMap = HashMap; +type WellKnownMap = HashMap, (FedDest, String)>; +type TlsNameMap = HashMap>; type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries type SyncHandle = ( Option, // since @@ -37,7 +37,6 @@ pub struct Globals { pub(super) globals: Arc, config: Config, keypair: Arc, - reqwest_client: reqwest::Client, dns_resolver: TokioAsyncResolver, jwt_decoding_key: Option>, pub(super) server_signingkeys: Arc, @@ -51,40 +50,6 @@ pub struct Globals { pub rotate: RotationHandler, } -struct MatrixServerVerifier { - inner: WebPKIVerifier, - tls_name_override: Arc>, -} - -impl ServerCertVerifier for MatrixServerVerifier { - #[tracing::instrument(skip(self, roots, presented_certs, dns_name, ocsp_response))] - fn verify_server_cert( - &self, - roots: &rustls::RootCertStore, - presented_certs: &[rustls::Certificate], - dns_name: webpki::DNSNameRef<'_>, - ocsp_response: &[u8], - ) -> std::result::Result { - if let Some(override_name) = self.tls_name_override.read().unwrap().get(dns_name.into()) { - let result = self.inner.verify_server_cert( - roots, - presented_certs, - override_name.as_ref(), - ocsp_response, - ); - if result.is_ok() { - return result; - } - info!( - "Server {:?} is non-compliant, retrying TLS verification with original name", - dns_name - ); - } - self.inner - .verify_server_cert(roots, presented_certs, dns_name, ocsp_response) - } -} - /// Handles "rotation" of long-polling requests. "Rotation" in this context is similar to "rotation" of log files and the like. /// /// This is utilized to have sync workers return early and release read locks on the database. @@ -162,24 +127,6 @@ impl Globals { }; let tls_name_override = Arc::new(RwLock::new(TlsNameMap::new())); - let verifier = Arc::new(MatrixServerVerifier { - inner: WebPKIVerifier::new(), - tls_name_override: tls_name_override.clone(), - }); - let mut tlsconfig = rustls::ClientConfig::new(); - tlsconfig.dangerous().set_certificate_verifier(verifier); - tlsconfig.root_store = - rustls_native_certs::load_native_certs().expect("Error loading system certificates"); - - let mut reqwest_client_builder = reqwest::Client::builder() - .connect_timeout(Duration::from_secs(30)) - .timeout(Duration::from_secs(60 * 3)) - .pool_max_idle_per_host(1) - .use_preconfigured_tls(tlsconfig); - if let Some(proxy) = config.proxy.to_proxy()? { - reqwest_client_builder = reqwest_client_builder.proxy(proxy); - } - let reqwest_client = reqwest_client_builder.build().unwrap(); let jwt_decoding_key = config .jwt_secret @@ -190,7 +137,6 @@ impl Globals { globals, config, keypair: Arc::new(keypair), - reqwest_client, dns_resolver: TokioAsyncResolver::tokio_from_system_conf().map_err(|_| { Error::bad_config("Failed to set up trust dns resolver with system config.") })?, @@ -219,8 +165,17 @@ impl Globals { } /// Returns a reqwest client which can be used to send requests. - pub fn reqwest_client(&self) -> &reqwest::Client { - &self.reqwest_client + pub fn reqwest_client(&self) -> Result { + let mut reqwest_client_builder = reqwest::Client::builder() + .connect_timeout(Duration::from_secs(30)) + .timeout(Duration::from_secs(60 * 3)) + .pool_max_idle_per_host(1); + //.use_preconfigured_tls(tlsconfig); + if let Some(proxy) = self.config.proxy.to_proxy()? { + reqwest_client_builder = reqwest_client_builder.proxy(proxy); + } + + Ok(reqwest_client_builder) } #[tracing::instrument(skip(self))] diff --git a/src/database/pusher.rs b/src/database/pusher.rs index 3df9ed4..da4a6e7 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -113,7 +113,11 @@ where //*reqwest_request.timeout_mut() = Some(Duration::from_secs(5)); let url = reqwest_request.url().clone(); - let response = globals.reqwest_client().execute(reqwest_request).await; + let response = globals + .reqwest_client()? + .build()? + .execute(reqwest_request) + .await; match response { Ok(mut response) => { diff --git a/src/database/rooms.rs b/src/database/rooms.rs index e0ffded..ffc8dd1 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -2482,6 +2482,7 @@ impl Rooms { &self, user_id: &UserId, room_id: &RoomId, + reason: Option, db: &Database, ) -> Result<()> { // Ask a remote server if we don't have this room @@ -2530,6 +2531,7 @@ impl Rooms { .map_err(|_| Error::bad_database("Invalid member event in database."))?; event.membership = member::MembershipState::Leave; + event.reason = reason; self.build_and_append_pdu( PduBuilder { diff --git a/src/server_server.rs b/src/server_server.rs index 65fd4a8..e220000 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -83,7 +83,7 @@ use rocket::{get, post, put}; /// FedDest::Named("198.51.100.5".to_owned(), "".to_owned()); /// ``` #[derive(Clone, Debug, PartialEq)] -enum FedDest { +pub enum FedDest { Literal(SocketAddr), Named(String, String), } @@ -109,6 +109,13 @@ impl FedDest { Self::Named(host, _) => host.clone(), } } + + fn port(&self) -> Option { + match &self { + Self::Literal(addr) => Some(addr.port()), + Self::Named(_, port) => port[1..].parse().ok(), + } + } } #[tracing::instrument(skip(globals, request))] @@ -124,41 +131,34 @@ where return Err(Error::bad_config("Federation is disabled.")); } - let maybe_result = globals + let mut write_destination_to_cache = false; + + let cached_result = globals .actual_destination_cache .read() .unwrap() .get(destination) .cloned(); - let (actual_destination, host) = if let Some(result) = maybe_result { + let (actual_destination, host) = if let Some(result) = cached_result { result } else { + write_destination_to_cache = true; + let result = find_actual_destination(globals, &destination).await; - let (actual_destination, host) = result.clone(); - let result_string = (result.0.into_https_string(), result.1.into_uri_string()); - globals - .actual_destination_cache - .write() - .unwrap() - .insert(Box::::from(destination), result_string.clone()); - let dest_hostname = actual_destination.hostname(); - let host_hostname = host.hostname(); - if dest_hostname != host_hostname { - globals.tls_name_override.write().unwrap().insert( - dest_hostname, - webpki::DNSNameRef::try_from_ascii_str(&host_hostname) - .unwrap() - .to_owned(), - ); - } - result_string + + (result.0, result.1.clone().into_uri_string()) }; + let actual_destination_str = actual_destination.clone().into_https_string(); + let mut http_request = request - .try_into_http_request::>(&actual_destination, SendAccessToken::IfRequired("")) + .try_into_http_request::>(&actual_destination_str, SendAccessToken::IfRequired("")) .map_err(|e| { - warn!("Failed to find destination {}: {}", actual_destination, e); + warn!( + "Failed to find destination {}: {}", + actual_destination_str, e + ); Error::BadServerResponse("Invalid destination") })?; @@ -232,7 +232,22 @@ where .expect("all http requests are valid reqwest requests"); let url = reqwest_request.url().clone(); - let response = globals.reqwest_client().execute(reqwest_request).await; + + let mut client = globals.reqwest_client()?; + if let Some(override_name) = globals + .tls_name_override + .read() + .unwrap() + .get(&actual_destination.hostname()) + { + client = client.resolve( + &actual_destination.hostname(), + SocketAddr::new(override_name[0], 0), + ); + // port will be ignored + } + + let response = client.build()?.execute(reqwest_request).await; match response { Ok(mut response) => { @@ -271,6 +286,13 @@ where if status == 200 { let response = T::IncomingResponse::try_from_http_response(http_response); + if response.is_ok() && write_destination_to_cache { + globals.actual_destination_cache.write().unwrap().insert( + Box::::from(destination), + (actual_destination, host), + ); + } + response.map_err(|e| { warn!( "Invalid 200 response from {} on: {} {}", @@ -343,16 +365,46 @@ async fn find_actual_destination( match get_ip_with_port(&delegated_hostname) { Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file None => { - if let Some(pos) = destination_str.find(':') { + if let Some(pos) = delegated_hostname.find(':') { // 3.2: Hostname with port in .well-known file - let (host, port) = destination_str.split_at(pos); + let (host, port) = delegated_hostname.split_at(pos); FedDest::Named(host.to_string(), port.to_string()) } else { - match query_srv_record(globals, &delegated_hostname).await { + if let Some(hostname_override) = + query_srv_record(globals, &delegated_hostname).await + { // 3.3: SRV lookup successful - Some(hostname) => hostname, + let host = match delegated_hostname.find(':') { + None => delegated_hostname.clone(), + Some(pos) => { + delegated_hostname.split_at(pos).0.to_owned() + } + }; + + if let Ok(override_ip) = globals + .dns_resolver() + .lookup_ip(hostname_override.hostname()) + .await + { + globals + .tls_name_override + .write() + .unwrap() + .insert(host.clone(), override_ip.iter().collect()); + } else { + warn!("Using SRV record, but could not resolve to IP"); + } + + let force_port = hostname_override.port(); + + if let Some(port) = force_port { + FedDest::Named(host, format!(":{}", port.to_string())) + } else { + add_port_to_hostname(&delegated_hostname) + } + } else { // 3.4: No SRV records, just use the hostname from .well-known - None => add_port_to_hostname(&delegated_hostname), + add_port_to_hostname(&delegated_hostname) } } } @@ -423,6 +475,9 @@ pub async fn request_well_known( let body: serde_json::Value = serde_json::from_str( &globals .reqwest_client() + .ok()? + .build() + .ok()? .get(&format!( "https://{}/.well-known/matrix/server", destination @@ -1980,15 +2035,9 @@ fn get_auth_chain( let mut buckets = vec![BTreeSet::new(); NUM_BUCKETS]; for id in starting_events { - if let Some(pdu) = db.rooms.get_pdu(&id)? { - for auth_event in &pdu.auth_events { - let short = db - .rooms - .get_or_create_shorteventid(&auth_event, &db.globals)?; - let bucket_id = (short % NUM_BUCKETS as u64) as usize; - buckets[bucket_id].insert((short, auth_event.clone())); - } - } + let short = db.rooms.get_or_create_shorteventid(&id, &db.globals)?; + let bucket_id = (short % NUM_BUCKETS as u64) as usize; + buckets[bucket_id].insert((short, id.clone())); } let mut full_auth_chain = HashSet::new(); @@ -2000,10 +2049,6 @@ fn get_auth_chain( continue; } - // The code below will only get the auth chains, not the events in the chunk. So let's add - // them first - full_auth_chain.extend(chunk.iter().map(|(id, _)| id)); - let chunk_key = chunk .iter() .map(|(short, _)| short)