|
|
|
@ -64,6 +64,7 @@ use std::{ |
|
|
|
future::Future, |
|
|
|
future::Future, |
|
|
|
mem, |
|
|
|
mem, |
|
|
|
net::{IpAddr, SocketAddr}, |
|
|
|
net::{IpAddr, SocketAddr}, |
|
|
|
|
|
|
|
ops::Deref, |
|
|
|
pin::Pin, |
|
|
|
pin::Pin, |
|
|
|
sync::{Arc, RwLock, RwLockWriteGuard}, |
|
|
|
sync::{Arc, RwLock, RwLockWriteGuard}, |
|
|
|
time::{Duration, Instant, SystemTime}, |
|
|
|
time::{Duration, Instant, SystemTime}, |
|
|
|
@ -1636,7 +1637,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
db, |
|
|
|
db, |
|
|
|
&incoming_pdu, |
|
|
|
&incoming_pdu, |
|
|
|
val, |
|
|
|
val, |
|
|
|
extremities, |
|
|
|
extremities.iter().map(Deref::deref), |
|
|
|
state_ids_compressed, |
|
|
|
state_ids_compressed, |
|
|
|
soft_fail, |
|
|
|
soft_fail, |
|
|
|
&state_lock, |
|
|
|
&state_lock, |
|
|
|
@ -1821,7 +1822,7 @@ async fn upgrade_outlier_to_timeline_pdu( |
|
|
|
db, |
|
|
|
db, |
|
|
|
&incoming_pdu, |
|
|
|
&incoming_pdu, |
|
|
|
val, |
|
|
|
val, |
|
|
|
extremities, |
|
|
|
extremities.iter().map(Deref::deref), |
|
|
|
state_ids_compressed, |
|
|
|
state_ids_compressed, |
|
|
|
soft_fail, |
|
|
|
soft_fail, |
|
|
|
&state_lock, |
|
|
|
&state_lock, |
|
|
|
@ -2114,11 +2115,11 @@ pub(crate) async fn fetch_signing_keys( |
|
|
|
/// Append the incoming event setting the state snapshot to the state from the
|
|
|
|
/// Append the incoming event setting the state snapshot to the state from the
|
|
|
|
/// server that sent the event.
|
|
|
|
/// server that sent the event.
|
|
|
|
#[tracing::instrument(skip(db, pdu, pdu_json, new_room_leaves, state_ids_compressed, _mutex_lock))] |
|
|
|
#[tracing::instrument(skip(db, pdu, pdu_json, new_room_leaves, state_ids_compressed, _mutex_lock))] |
|
|
|
fn append_incoming_pdu( |
|
|
|
fn append_incoming_pdu<'a>( |
|
|
|
db: &Database, |
|
|
|
db: &Database, |
|
|
|
pdu: &PduEvent, |
|
|
|
pdu: &PduEvent, |
|
|
|
pdu_json: CanonicalJsonObject, |
|
|
|
pdu_json: CanonicalJsonObject, |
|
|
|
new_room_leaves: HashSet<Box<EventId>>, |
|
|
|
new_room_leaves: impl IntoIterator<Item = &'a EventId> + Clone + Debug, |
|
|
|
state_ids_compressed: HashSet<CompressedStateEvent>, |
|
|
|
state_ids_compressed: HashSet<CompressedStateEvent>, |
|
|
|
soft_fail: bool, |
|
|
|
soft_fail: bool, |
|
|
|
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex
|
|
|
|
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex
|
|
|
|
@ -2135,19 +2136,12 @@ fn append_incoming_pdu( |
|
|
|
if soft_fail { |
|
|
|
if soft_fail { |
|
|
|
db.rooms |
|
|
|
db.rooms |
|
|
|
.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; |
|
|
|
.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; |
|
|
|
db.rooms.replace_pdu_leaves( |
|
|
|
db.rooms |
|
|
|
&pdu.room_id, |
|
|
|
.replace_pdu_leaves(&pdu.room_id, new_room_leaves.clone())?; |
|
|
|
&new_room_leaves.into_iter().collect::<Vec<_>>(), |
|
|
|
|
|
|
|
)?; |
|
|
|
|
|
|
|
return Ok(None); |
|
|
|
return Ok(None); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
let pdu_id = db.rooms.append_pdu( |
|
|
|
let pdu_id = db.rooms.append_pdu(pdu, pdu_json, new_room_leaves, db)?; |
|
|
|
pdu, |
|
|
|
|
|
|
|
pdu_json, |
|
|
|
|
|
|
|
&new_room_leaves.into_iter().collect::<Vec<_>>(), |
|
|
|
|
|
|
|
db, |
|
|
|
|
|
|
|
)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for appservice in db.appservice.all()? { |
|
|
|
for appservice in db.appservice.all()? { |
|
|
|
if db.rooms.appservice_in_room(&pdu.room_id, &appservice, db)? { |
|
|
|
if db.rooms.appservice_in_room(&pdu.room_id, &appservice, db)? { |
|
|
|
|