|
|
|
|
@ -1,11 +1,15 @@
@@ -1,11 +1,15 @@
|
|
|
|
|
use crate::{utils, Database, PduEvent}; |
|
|
|
|
use ruma_events::{collections::only::Event as EduEvent, EventJson, EventType}; |
|
|
|
|
use crate::{database::COUNTER, utils, Database, PduEvent}; |
|
|
|
|
use ruma_events::{ |
|
|
|
|
collections::only::Event as EduEvent, room::power_levels::PowerLevelsEventContent, EventJson, |
|
|
|
|
EventType, |
|
|
|
|
}; |
|
|
|
|
use ruma_federation_api::RoomV3Pdu; |
|
|
|
|
use ruma_identifiers::{EventId, RoomId, UserId}; |
|
|
|
|
use serde_json::json; |
|
|
|
|
use std::{ |
|
|
|
|
collections::HashMap, |
|
|
|
|
convert::{TryFrom, TryInto}, |
|
|
|
|
mem, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
pub struct Data { |
|
|
|
|
@ -189,7 +193,14 @@ impl Data {
@@ -189,7 +193,14 @@ impl Data {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn room_join(&self, room_id: &RoomId, user_id: &UserId) -> bool { |
|
|
|
|
if !self.room_exists(room_id) { |
|
|
|
|
if !self.room_exists(room_id) |
|
|
|
|
&& !self |
|
|
|
|
.db |
|
|
|
|
.userid_joinroomids |
|
|
|
|
.get_iter(user_id.to_string().as_bytes()) |
|
|
|
|
.values() |
|
|
|
|
.any(|r| r.unwrap() == room_id.to_string().as_bytes()) |
|
|
|
|
{ |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -249,8 +260,7 @@ impl Data {
@@ -249,8 +260,7 @@ impl Data {
|
|
|
|
|
/// Check if a room exists by looking for PDUs in that room.
|
|
|
|
|
pub fn room_exists(&self, room_id: &RoomId) -> bool { |
|
|
|
|
// Create the first part of the full pdu id
|
|
|
|
|
let mut prefix = vec![b'd']; |
|
|
|
|
prefix.extend_from_slice(room_id.to_string().as_bytes()); |
|
|
|
|
let mut prefix = room_id.to_string().as_bytes().to_vec(); |
|
|
|
|
prefix.push(0xff); // Add delimiter so we don't find rooms starting with the same id
|
|
|
|
|
|
|
|
|
|
if let Some((key, _)) = self.db.pduid_pdu.get_gt(&prefix).unwrap() { |
|
|
|
|
@ -397,13 +407,14 @@ impl Data {
@@ -397,13 +407,14 @@ impl Data {
|
|
|
|
|
.collect() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn room_pdu_first(&self, room_id: &RoomId, pdu_index: u64) -> bool { |
|
|
|
|
let mut pdu_id = vec![b'd']; |
|
|
|
|
pdu_id.extend_from_slice(room_id.to_string().as_bytes()); |
|
|
|
|
pdu_id.push(0xff); |
|
|
|
|
pdu_id.extend_from_slice(&pdu_index.to_be_bytes()); |
|
|
|
|
|
|
|
|
|
self.db.pduid_pdu.get_lt(&pdu_id).unwrap().is_none() |
|
|
|
|
pub fn pdu_get_count(&self, event_id: &EventId) -> Option<u64> { |
|
|
|
|
self.db |
|
|
|
|
.eventid_pduid |
|
|
|
|
.get(event_id.to_string().as_bytes()) |
|
|
|
|
.unwrap() |
|
|
|
|
.map(|pdu_id| { |
|
|
|
|
utils::u64_from_bytes(&pdu_id[pdu_id.len() - mem::size_of::<u64>()..pdu_id.len()]) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn pdu_get(&self, event_id: &EventId) -> Option<RoomV3Pdu> { |
|
|
|
|
@ -459,7 +470,39 @@ impl Data {
@@ -459,7 +470,39 @@ impl Data {
|
|
|
|
|
content: serde_json::Value, |
|
|
|
|
unsigned: Option<serde_json::Map<String, serde_json::Value>>, |
|
|
|
|
state_key: Option<String>, |
|
|
|
|
) -> EventId { |
|
|
|
|
) -> Option<EventId> { |
|
|
|
|
// Is the event authorized?
|
|
|
|
|
if state_key.is_some() { |
|
|
|
|
if let Some(pdu) = self |
|
|
|
|
.room_state(&room_id) |
|
|
|
|
.get(&(EventType::RoomPowerLevels, "".to_owned())) |
|
|
|
|
{ |
|
|
|
|
let power_levels = serde_json::from_value::<EventJson<PowerLevelsEventContent>>( |
|
|
|
|
pdu.content.clone(), |
|
|
|
|
) |
|
|
|
|
.unwrap() |
|
|
|
|
.deserialize() |
|
|
|
|
.unwrap(); |
|
|
|
|
|
|
|
|
|
match event_type { |
|
|
|
|
EventType::RoomMember => { |
|
|
|
|
// Member events are okay for now (TODO)
|
|
|
|
|
} |
|
|
|
|
_ if power_levels |
|
|
|
|
.users |
|
|
|
|
.get(&sender) |
|
|
|
|
.unwrap_or(&power_levels.users_default) |
|
|
|
|
<= &0.into() => |
|
|
|
|
{ |
|
|
|
|
// Not authorized
|
|
|
|
|
return None; |
|
|
|
|
} |
|
|
|
|
// User has sufficient power
|
|
|
|
|
_ => {} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// prev_events are the leaves of the current graph. This method removes all leaves from the
|
|
|
|
|
// room and replaces them with our event
|
|
|
|
|
// TODO: Make sure this isn't called twice in parallel
|
|
|
|
|
@ -523,22 +566,19 @@ impl Data {
@@ -523,22 +566,19 @@ impl Data {
|
|
|
|
|
|
|
|
|
|
self.pdu_leaves_replace(&room_id, &pdu.event_id); |
|
|
|
|
|
|
|
|
|
// The new value will need a new index. We store the last used index in 'n'
|
|
|
|
|
// The count will go up regardless of the room_id
|
|
|
|
|
// This is also the next_batch/since value
|
|
|
|
|
// Increment the last index and use that
|
|
|
|
|
let index = utils::u64_from_bytes( |
|
|
|
|
&self |
|
|
|
|
.db |
|
|
|
|
.pduid_pdu |
|
|
|
|
.update_and_fetch(b"n", utils::increment) |
|
|
|
|
.global |
|
|
|
|
.update_and_fetch(COUNTER, utils::increment) |
|
|
|
|
.unwrap() |
|
|
|
|
.unwrap(), |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
let mut pdu_id = vec![b'd']; |
|
|
|
|
pdu_id.extend_from_slice(room_id.to_string().as_bytes()); |
|
|
|
|
|
|
|
|
|
let mut pdu_id = room_id.to_string().as_bytes().to_vec(); |
|
|
|
|
pdu_id.push(0xff); // Add delimiter so we don't find rooms starting with the same id
|
|
|
|
|
pdu_id.extend_from_slice(&index.to_be_bytes()); |
|
|
|
|
|
|
|
|
|
@ -564,7 +604,9 @@ impl Data {
@@ -564,7 +604,9 @@ impl Data {
|
|
|
|
|
.unwrap(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pdu.event_id |
|
|
|
|
self.room_read_set(&room_id, &sender, &pdu.event_id); |
|
|
|
|
|
|
|
|
|
Some(pdu.event_id) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns a vector of all PDUs in a room.
|
|
|
|
|
@ -573,12 +615,11 @@ impl Data {
@@ -573,12 +615,11 @@ impl Data {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn last_pdu_index(&self) -> u64 { |
|
|
|
|
let count_key: Vec<u8> = vec![b'n']; |
|
|
|
|
utils::u64_from_bytes( |
|
|
|
|
&self |
|
|
|
|
.db |
|
|
|
|
.pduid_pdu |
|
|
|
|
.get(&count_key) |
|
|
|
|
.global |
|
|
|
|
.get(&COUNTER) |
|
|
|
|
.unwrap() |
|
|
|
|
.unwrap_or_else(|| (&0_u64.to_be_bytes()).into()), |
|
|
|
|
) |
|
|
|
|
@ -586,15 +627,23 @@ impl Data {
@@ -586,15 +627,23 @@ impl Data {
|
|
|
|
|
|
|
|
|
|
/// Returns a vector of all events in a room that happened after the event with id `since`.
|
|
|
|
|
pub fn pdus_since(&self, room_id: &RoomId, since: u64) -> Vec<PduEvent> { |
|
|
|
|
// Create the first part of the full pdu id
|
|
|
|
|
let mut pdu_id = room_id.to_string().as_bytes().to_vec(); |
|
|
|
|
pdu_id.push(0xff); // Add delimiter so we don't find rooms starting with the same id
|
|
|
|
|
pdu_id.extend_from_slice(&(since).to_be_bytes()); |
|
|
|
|
|
|
|
|
|
self.pdus_since_pduid(room_id, pdu_id) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns a vector of all events in a room that happened after the event with id `since`.
|
|
|
|
|
pub fn pdus_since_pduid(&self, room_id: &RoomId, pdu_id: Vec<u8>) -> Vec<PduEvent> { |
|
|
|
|
let mut pdus = Vec::new(); |
|
|
|
|
|
|
|
|
|
// Create the first part of the full pdu id
|
|
|
|
|
let mut prefix = vec![b'd']; |
|
|
|
|
prefix.extend_from_slice(room_id.to_string().as_bytes()); |
|
|
|
|
let mut prefix = room_id.to_string().as_bytes().to_vec(); |
|
|
|
|
prefix.push(0xff); // Add delimiter so we don't find rooms starting with the same id
|
|
|
|
|
|
|
|
|
|
let mut current = prefix.clone(); |
|
|
|
|
current.extend_from_slice(&since.to_be_bytes()); |
|
|
|
|
let mut current = pdu_id; |
|
|
|
|
|
|
|
|
|
while let Some((key, value)) = self.db.pduid_pdu.get_gt(¤t).unwrap() { |
|
|
|
|
if key.starts_with(&prefix) { |
|
|
|
|
@ -608,19 +657,18 @@ impl Data {
@@ -608,19 +657,18 @@ impl Data {
|
|
|
|
|
pdus |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn pdus_until(&self, room_id: &RoomId, until: u64) -> Vec<PduEvent> { |
|
|
|
|
pub fn pdus_until(&self, room_id: &RoomId, until: u64, max: u32) -> Vec<PduEvent> { |
|
|
|
|
let mut pdus = Vec::new(); |
|
|
|
|
|
|
|
|
|
// Create the first part of the full pdu id
|
|
|
|
|
let mut prefix = vec![b'd']; |
|
|
|
|
prefix.extend_from_slice(room_id.to_string().as_bytes()); |
|
|
|
|
let mut prefix = room_id.to_string().as_bytes().to_vec(); |
|
|
|
|
prefix.push(0xff); // Add delimiter so we don't find rooms starting with the same id
|
|
|
|
|
|
|
|
|
|
let mut current = prefix.clone(); |
|
|
|
|
current.extend_from_slice(&until.to_be_bytes()); |
|
|
|
|
|
|
|
|
|
while let Some((key, value)) = self.db.pduid_pdu.get_lt(¤t).unwrap() { |
|
|
|
|
if key.starts_with(&prefix) { |
|
|
|
|
if pdus.len() < max as usize && key.starts_with(&prefix) { |
|
|
|
|
current = key.to_vec(); |
|
|
|
|
pdus.push(serde_json::from_slice(&value).expect("pdu in db is valid")); |
|
|
|
|
} else { |
|
|
|
|
@ -670,8 +718,8 @@ impl Data {
@@ -670,8 +718,8 @@ impl Data {
|
|
|
|
|
let index = utils::u64_from_bytes( |
|
|
|
|
&self |
|
|
|
|
.db |
|
|
|
|
.pduid_pdu |
|
|
|
|
.update_and_fetch(b"n", utils::increment) |
|
|
|
|
.global |
|
|
|
|
.update_and_fetch(COUNTER, utils::increment) |
|
|
|
|
.unwrap() |
|
|
|
|
.unwrap(), |
|
|
|
|
); |
|
|
|
|
@ -695,17 +743,14 @@ impl Data {
@@ -695,17 +743,14 @@ impl Data {
|
|
|
|
|
prefix.push(0xff); |
|
|
|
|
|
|
|
|
|
let mut current = prefix.clone(); |
|
|
|
|
current.extend_from_slice(&since.to_be_bytes()); |
|
|
|
|
current.extend_from_slice(&(since + 1).to_be_bytes()); |
|
|
|
|
|
|
|
|
|
while let Some((key, value)) = self.db.roomlatestid_roomlatest.get_gt(¤t).unwrap() { |
|
|
|
|
if key.starts_with(&prefix) { |
|
|
|
|
current = key.to_vec(); |
|
|
|
|
room_latests.push( |
|
|
|
|
serde_json::from_slice::<EventJson<EduEvent>>(&value) |
|
|
|
|
.expect("room_latest in db is valid") |
|
|
|
|
.deserialize() |
|
|
|
|
.expect("room_latest in db is valid") |
|
|
|
|
.into(), |
|
|
|
|
.expect("room_latest in db is valid"), |
|
|
|
|
); |
|
|
|
|
} else { |
|
|
|
|
break; |
|
|
|
|
@ -715,6 +760,11 @@ impl Data {
@@ -715,6 +760,11 @@ impl Data {
|
|
|
|
|
room_latests |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns a vector of the most recent read_receipts in a room that happened after the event with id `since`.
|
|
|
|
|
pub fn roomlatests_all(&self, room_id: &RoomId) -> Vec<EventJson<EduEvent>> { |
|
|
|
|
self.roomlatests_since(room_id, 0) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn roomactive_add(&self, event: EduEvent, room_id: &RoomId, timeout: u64) { |
|
|
|
|
let mut prefix = room_id.to_string().as_bytes().to_vec(); |
|
|
|
|
prefix.push(0xff); |
|
|
|
|
@ -737,8 +787,8 @@ impl Data {
@@ -737,8 +787,8 @@ impl Data {
|
|
|
|
|
let index = utils::u64_from_bytes( |
|
|
|
|
&self |
|
|
|
|
.db |
|
|
|
|
.pduid_pdu |
|
|
|
|
.update_and_fetch(b"n", utils::increment) |
|
|
|
|
.global |
|
|
|
|
.update_and_fetch(COUNTER, utils::increment) |
|
|
|
|
.unwrap() |
|
|
|
|
.unwrap(), |
|
|
|
|
); |
|
|
|
|
@ -790,10 +840,7 @@ impl Data {
@@ -790,10 +840,7 @@ impl Data {
|
|
|
|
|
current = key.to_vec(); |
|
|
|
|
room_actives.push( |
|
|
|
|
serde_json::from_slice::<EventJson<EduEvent>>(&value) |
|
|
|
|
.expect("room_active in db is valid") |
|
|
|
|
.deserialize() |
|
|
|
|
.expect("room_active in db is valid") |
|
|
|
|
.into(), |
|
|
|
|
.expect("room_active in db is valid"), |
|
|
|
|
); |
|
|
|
|
} else { |
|
|
|
|
break; |
|
|
|
|
@ -813,6 +860,158 @@ impl Data {
@@ -813,6 +860,158 @@ impl Data {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn room_userdata_update( |
|
|
|
|
&self, |
|
|
|
|
room_id: Option<&RoomId>, |
|
|
|
|
user_id: &UserId, |
|
|
|
|
event: EduEvent, |
|
|
|
|
) { |
|
|
|
|
let mut prefix = room_id |
|
|
|
|
.map(|r| r.to_string()) |
|
|
|
|
.unwrap_or_default() |
|
|
|
|
.as_bytes() |
|
|
|
|
.to_vec(); |
|
|
|
|
prefix.push(0xff); |
|
|
|
|
prefix.extend_from_slice(&user_id.to_string().as_bytes()); |
|
|
|
|
prefix.push(0xff); |
|
|
|
|
|
|
|
|
|
// Start with last
|
|
|
|
|
if let Some(mut current) = self |
|
|
|
|
.db |
|
|
|
|
.roomuserdataid_accountdata |
|
|
|
|
.scan_prefix(&prefix) |
|
|
|
|
.keys() |
|
|
|
|
.next_back() |
|
|
|
|
.map(|c| c.unwrap()) |
|
|
|
|
{ |
|
|
|
|
// Remove old entry (there should be at most one)
|
|
|
|
|
loop { |
|
|
|
|
if !current.starts_with(&prefix) { |
|
|
|
|
// We're in another room or user
|
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
if current.rsplit(|&b| b == 0xff).nth(2).unwrap() == user_id.to_string().as_bytes() |
|
|
|
|
{ |
|
|
|
|
// This is the old room_latest
|
|
|
|
|
self.db.roomuserdataid_accountdata.remove(current).unwrap(); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
// Else, try the event before that
|
|
|
|
|
if let Some((k, _)) = self.db.roomuserdataid_accountdata.get_lt(current).unwrap() { |
|
|
|
|
current = k; |
|
|
|
|
} else { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Increment the last index and use that
|
|
|
|
|
let index = utils::u64_from_bytes( |
|
|
|
|
&self |
|
|
|
|
.db |
|
|
|
|
.global |
|
|
|
|
.update_and_fetch(COUNTER, utils::increment) |
|
|
|
|
.unwrap() |
|
|
|
|
.unwrap(), |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
let mut key = prefix; |
|
|
|
|
key.extend_from_slice(&index.to_be_bytes()); |
|
|
|
|
|
|
|
|
|
let json = serde_json::to_value(&event).unwrap(); |
|
|
|
|
key.extend_from_slice(json["type"].as_str().unwrap().as_bytes()); |
|
|
|
|
|
|
|
|
|
self.db |
|
|
|
|
.roomuserdataid_accountdata |
|
|
|
|
.insert(key, &*json.to_string()) |
|
|
|
|
.unwrap(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn room_userdata_get( |
|
|
|
|
&self, |
|
|
|
|
room_id: Option<&RoomId>, |
|
|
|
|
user_id: &UserId, |
|
|
|
|
kind: &str, |
|
|
|
|
) -> Option<EventJson<EduEvent>> { |
|
|
|
|
self.room_userdata_all(room_id, user_id).remove(kind) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn room_userdata_since( |
|
|
|
|
&self, |
|
|
|
|
room_id: Option<&RoomId>, |
|
|
|
|
user_id: &UserId, |
|
|
|
|
since: u64, |
|
|
|
|
) -> HashMap<String, EventJson<EduEvent>> { |
|
|
|
|
let mut userdata = HashMap::new(); |
|
|
|
|
|
|
|
|
|
let mut prefix = room_id |
|
|
|
|
.map(|r| r.to_string()) |
|
|
|
|
.unwrap_or_default() |
|
|
|
|
.as_bytes() |
|
|
|
|
.to_vec(); |
|
|
|
|
prefix.push(0xff); |
|
|
|
|
prefix.extend_from_slice(&user_id.to_string().as_bytes()); |
|
|
|
|
prefix.push(0xff); |
|
|
|
|
|
|
|
|
|
let mut current = prefix.clone(); |
|
|
|
|
current.extend_from_slice(&(since + 1).to_be_bytes()); |
|
|
|
|
|
|
|
|
|
while let Some((key, value)) = self.db.roomuserdataid_accountdata.get_gt(¤t).unwrap() |
|
|
|
|
{ |
|
|
|
|
if key.starts_with(&prefix) { |
|
|
|
|
current = key.to_vec(); |
|
|
|
|
let json = serde_json::from_slice::<serde_json::Value>(&value).unwrap(); |
|
|
|
|
userdata.insert( |
|
|
|
|
json["type"].as_str().unwrap().to_owned(), |
|
|
|
|
serde_json::from_value::<EventJson<EduEvent>>(json) |
|
|
|
|
.expect("userdata in db is valid"), |
|
|
|
|
); |
|
|
|
|
} else { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
userdata |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn room_userdata_all( |
|
|
|
|
&self, |
|
|
|
|
room_id: Option<&RoomId>, |
|
|
|
|
user_id: &UserId, |
|
|
|
|
) -> HashMap<String, EventJson<EduEvent>> { |
|
|
|
|
self.room_userdata_since(room_id, user_id, 0) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn room_read_set( |
|
|
|
|
&self, |
|
|
|
|
room_id: &RoomId, |
|
|
|
|
user_id: &UserId, |
|
|
|
|
event_id: &EventId, |
|
|
|
|
) -> Option<()> { |
|
|
|
|
let mut key = room_id.to_string().as_bytes().to_vec(); |
|
|
|
|
key.push(0xff); |
|
|
|
|
key.extend_from_slice(&user_id.to_string().as_bytes()); |
|
|
|
|
|
|
|
|
|
self.db |
|
|
|
|
.roomuserid_lastread |
|
|
|
|
.insert(key, &self.pdu_get_count(event_id)?.to_be_bytes()) |
|
|
|
|
.unwrap(); |
|
|
|
|
|
|
|
|
|
Some(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn room_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Option<u64> { |
|
|
|
|
let mut key = room_id.to_string().as_bytes().to_vec(); |
|
|
|
|
key.push(0xff); |
|
|
|
|
key.extend_from_slice(&user_id.to_string().as_bytes()); |
|
|
|
|
|
|
|
|
|
self.db |
|
|
|
|
.roomuserid_lastread |
|
|
|
|
.get(key) |
|
|
|
|
.unwrap() |
|
|
|
|
.map(|v| utils::u64_from_bytes(&v)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn debug(&self) { |
|
|
|
|
self.db.debug(); |
|
|
|
|
} |
|
|
|
|
|