|
|
|
@ -438,25 +438,18 @@ impl Rooms { |
|
|
|
&self, |
|
|
|
&self, |
|
|
|
pdu: &PduEvent, |
|
|
|
pdu: &PduEvent, |
|
|
|
pdu_json: &serde_json::Value, |
|
|
|
pdu_json: &serde_json::Value, |
|
|
|
|
|
|
|
count: u64, |
|
|
|
|
|
|
|
pdu_id: IVec, |
|
|
|
globals: &super::globals::Globals, |
|
|
|
globals: &super::globals::Globals, |
|
|
|
account_data: &super::account_data::AccountData, |
|
|
|
account_data: &super::account_data::AccountData, |
|
|
|
sending: &super::sending::Sending, |
|
|
|
sending: &super::sending::Sending, |
|
|
|
) -> Result<Vec<u8>> { |
|
|
|
) -> Result<()> { |
|
|
|
self.replace_pdu_leaves(&pdu.room_id, &pdu.event_id)?; |
|
|
|
self.replace_pdu_leaves(&pdu.room_id, &pdu.event_id)?; |
|
|
|
|
|
|
|
|
|
|
|
// Increment the last index and use that
|
|
|
|
|
|
|
|
// This is also the next_batch/since value
|
|
|
|
|
|
|
|
let index = globals.next_count()?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Mark as read first so the sending client doesn't get a notification even if appending
|
|
|
|
// Mark as read first so the sending client doesn't get a notification even if appending
|
|
|
|
// fails
|
|
|
|
// fails
|
|
|
|
self.edus |
|
|
|
self.edus |
|
|
|
.private_read_set(&pdu.room_id, &pdu.sender, index, &globals)?; |
|
|
|
.private_read_set(&pdu.room_id, &pdu.sender, count, &globals)?; |
|
|
|
|
|
|
|
|
|
|
|
let room_id = pdu.room_id.clone(); |
|
|
|
|
|
|
|
let mut pdu_id = room_id.as_bytes().to_vec(); |
|
|
|
|
|
|
|
pdu_id.push(0xff); |
|
|
|
|
|
|
|
pdu_id.extend_from_slice(&index.to_be_bytes()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.pduid_pdu.insert(&pdu_id, &*pdu_json.to_string())?; |
|
|
|
self.pduid_pdu.insert(&pdu_id, &*pdu_json.to_string())?; |
|
|
|
|
|
|
|
|
|
|
|
@ -537,7 +530,7 @@ impl Rooms { |
|
|
|
}, |
|
|
|
}, |
|
|
|
&UserId::try_from(format!("@conduit:{}", globals.server_name())) |
|
|
|
&UserId::try_from(format!("@conduit:{}", globals.server_name())) |
|
|
|
.expect("@conduit:server_name is valid"), |
|
|
|
.expect("@conduit:server_name is valid"), |
|
|
|
&room_id, |
|
|
|
&pdu.room_id, |
|
|
|
&globals, |
|
|
|
&globals, |
|
|
|
&sending, |
|
|
|
&sending, |
|
|
|
&account_data, |
|
|
|
&account_data, |
|
|
|
@ -549,7 +542,7 @@ impl Rooms { |
|
|
|
_ => {} |
|
|
|
_ => {} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Ok(pdu_id) |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Generates a new StateHash and associates it with the incoming event.
|
|
|
|
/// Generates a new StateHash and associates it with the incoming event.
|
|
|
|
@ -834,10 +827,27 @@ impl Rooms { |
|
|
|
.expect("json is object") |
|
|
|
.expect("json is object") |
|
|
|
.insert("event_id".to_owned(), pdu.event_id.to_string().into()); |
|
|
|
.insert("event_id".to_owned(), pdu.event_id.to_string().into()); |
|
|
|
|
|
|
|
|
|
|
|
let pdu_id = self.append_pdu(&pdu, &pdu_json, globals, account_data, sending)?; |
|
|
|
// Increment the last index and use that
|
|
|
|
|
|
|
|
// This is also the next_batch/since value
|
|
|
|
|
|
|
|
let count = globals.next_count()?; |
|
|
|
|
|
|
|
let mut pdu_id = room_id.as_bytes().to_vec(); |
|
|
|
|
|
|
|
pdu_id.push(0xff); |
|
|
|
|
|
|
|
pdu_id.extend_from_slice(&count.to_be_bytes()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// We append to state before appending the pdu, so we don't have a moment in time with the
|
|
|
|
|
|
|
|
// pdu without it's state. This is okay because append_pdu can't fail.
|
|
|
|
self.append_to_state(&pdu_id, &pdu)?; |
|
|
|
self.append_to_state(&pdu_id, &pdu)?; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.append_pdu( |
|
|
|
|
|
|
|
&pdu, |
|
|
|
|
|
|
|
&pdu_json, |
|
|
|
|
|
|
|
count, |
|
|
|
|
|
|
|
pdu_id.clone().into(), |
|
|
|
|
|
|
|
globals, |
|
|
|
|
|
|
|
account_data, |
|
|
|
|
|
|
|
sending, |
|
|
|
|
|
|
|
)?; |
|
|
|
|
|
|
|
|
|
|
|
for server in self |
|
|
|
for server in self |
|
|
|
.room_servers(room_id) |
|
|
|
.room_servers(room_id) |
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
|