Browse Source

Update tokio, ruma and rocket to use tokio:1.0

Remove js_int infavor of ruma export
merge-requests/18/head
Devin Ragotzy 5 years ago
parent
commit
b19e1ba9d9
  1. 898
      Cargo.lock
  2. 13
      Cargo.toml
  3. 78
      src/client_server/account.rs
  4. 2
      src/client_server/alias.rs
  5. 6
      src/client_server/directory.rs
  6. 6
      src/client_server/media.rs
  7. 67
      src/client_server/membership.rs
  8. 13
      src/client_server/message.rs
  9. 12
      src/client_server/profile.rs
  10. 6
      src/client_server/redact.rs
  11. 96
      src/client_server/room.rs
  12. 6
      src/client_server/state.rs
  13. 3
      src/client_server/sync.rs
  14. 2
      src/database.rs
  15. 6
      src/database/admin.rs
  16. 10
      src/database/globals.rs
  17. 144
      src/database/rooms.rs
  18. 3
      src/database/rooms/edus.rs
  19. 12
      src/database/sending.rs
  20. 3
      src/database/users.rs
  21. 142
      src/pdu.rs
  22. 11
      src/ruma_wrapper.rs
  23. 23
      src/server_server.rs

898
Cargo.lock generated

File diff suppressed because it is too large Load Diff

13
Cargo.toml

@ -14,7 +14,7 @@ edition = "2018"
[dependencies] [dependencies]
# Used to handle requests # Used to handle requests
# TODO: This can become optional as soon as proper configs are supported # TODO: This can become optional as soon as proper configs are supported
rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "1f1f44f336e5a172361fc1860461bb03667b1ed2", features = ["tls"] } # Used to handle requests rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "c24f15c18f02319be83af4f3c1951dc220b52c5e", features = ["tls"] } # Used to handle requests
#rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] }
# Used for matrix spec type definitions and helpers # Used for matrix spec type definitions and helpers
@ -24,11 +24,10 @@ ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-a
# Used when doing state resolution # Used when doing state resolution
# state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] } # state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] }
state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec", "gen-eventid"] } state-res = { git = "https://github.com/ruma/state-res", rev = "791c66d73cf064d09db0cdf767d5fef43a343425", features = ["unstable-pre-spec", "gen-eventid"] }#state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] }
#state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] }
# Used for long polling and federation sender, should be the same as rocket::tokio # Used for long polling and federation sender, should be the same as rocket::tokio
tokio = { version = "0.2.23" } tokio = { version = "1.1.0", features = ["macros", "time", "sync"] }
# Used for storing data permanently # Used for storing data permanently
sled = { version = "0.34.6", default-features = false } sled = { version = "0.34.6", default-features = false }
# Used for emitting log entries # Used for emitting log entries
@ -37,8 +36,6 @@ log = "0.4.11"
http = "0.2.1" http = "0.2.1"
# Used to find data directory for default db path # Used to find data directory for default db path
directories = "3.0.1" directories = "3.0.1"
# Used for number types for ruma
js_int = "0.1.9"
# Used for ruma wrapper # Used for ruma wrapper
serde_json = { version = "1.0.60", features = ["raw_value"] } serde_json = { version = "1.0.60", features = ["raw_value"] }
# Used for appservice registration files # Used for appservice registration files
@ -52,7 +49,7 @@ rust-argon2 = "0.8.3"
# Used to send requests # Used to send requests
reqwest = "0.10.9" reqwest = "0.10.9"
# Used for conduit::Error type # Used for conduit::Error type
thiserror = "1.0.22" thiserror = "1.0.23"
# Used to generate thumbnails for images # Used to generate thumbnails for images
image = { version = "0.23.12", default-features = false, features = ["jpeg", "png", "gif"] } image = { version = "0.23.12", default-features = false, features = ["jpeg", "png", "gif"] }
# Used to encode server public key # Used to encode server public key
@ -60,7 +57,7 @@ base64 = "0.13.0"
# Used when hashing the state # Used when hashing the state
ring = "0.16.19" ring = "0.16.19"
# Used when querying the SRV record of other servers # Used when querying the SRV record of other servers
trust-dns-resolver = "0.19.6" trust-dns-resolver = "0.20.0"
# Used to find matching events for appservices # Used to find matching events for appservices
regex = "1.4.2" regex = "1.4.2"
# jwt jsonwebtokens # jwt jsonwebtokens

78
src/client_server/account.rs

@ -239,11 +239,7 @@ pub async fn register_route(
}, },
&conduit_user, &conduit_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 2. Make conduit bot join // 2. Make conduit bot join
@ -264,11 +260,7 @@ pub async fn register_route(
}, },
&conduit_user, &conduit_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 3. Power levels // 3. Power levels
@ -302,11 +294,7 @@ pub async fn register_route(
}, },
&conduit_user, &conduit_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 4.1 Join Rules // 4.1 Join Rules
@ -323,11 +311,7 @@ pub async fn register_route(
}, },
&conduit_user, &conduit_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 4.2 History Visibility // 4.2 History Visibility
@ -346,11 +330,7 @@ pub async fn register_route(
}, },
&conduit_user, &conduit_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 4.3 Guest Access // 4.3 Guest Access
@ -367,11 +347,7 @@ pub async fn register_route(
}, },
&conduit_user, &conduit_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 6. Events implied by name and topic // 6. Events implied by name and topic
@ -390,11 +366,7 @@ pub async fn register_route(
}, },
&conduit_user, &conduit_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
@ -410,11 +382,7 @@ pub async fn register_route(
}, },
&conduit_user, &conduit_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// Room alias // Room alias
@ -436,11 +404,7 @@ pub async fn register_route(
}, },
&conduit_user, &conduit_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?; db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?;
@ -463,11 +427,7 @@ pub async fn register_route(
}, },
&conduit_user, &conduit_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
@ -486,11 +446,7 @@ pub async fn register_route(
}, },
&user_id, &user_id,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// Send welcome message // Send welcome message
@ -515,11 +471,7 @@ pub async fn register_route(
}, },
&conduit_user, &conduit_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
} }
@ -691,11 +643,7 @@ pub async fn deactivate_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
} }

2
src/client_server/alias.rs

@ -70,7 +70,7 @@ pub async fn get_alias_helper(
.sending .sending
.send_federation_request( .send_federation_request(
&db.globals, &db.globals,
room_alias.server_name().to_owned(), room_alias.server_name(),
federation::query::get_room_information::v1::Request { room_alias }, federation::query::get_room_information::v1::Request { room_alias },
) )
.await?; .await?;

6
src/client_server/directory.rs

@ -21,7 +21,7 @@ use ruma::{
EventType, EventType,
}, },
serde::Raw, serde::Raw,
ServerName, ServerName, UInt,
}; };
#[cfg(feature = "conduit_bin")] #[cfg(feature = "conduit_bin")]
@ -124,7 +124,7 @@ pub async fn get_room_visibility_route(
pub async fn get_public_rooms_filtered_helper( pub async fn get_public_rooms_filtered_helper(
db: &Database, db: &Database,
server: Option<&ServerName>, server: Option<&ServerName>,
limit: Option<js_int::UInt>, limit: Option<UInt>,
since: Option<&str>, since: Option<&str>,
filter: &IncomingFilter, filter: &IncomingFilter,
_network: &IncomingRoomNetwork, _network: &IncomingRoomNetwork,
@ -137,7 +137,7 @@ pub async fn get_public_rooms_filtered_helper(
.sending .sending
.send_federation_request( .send_federation_request(
&db.globals, &db.globals,
other_server.to_owned(), other_server,
federation::directory::get_public_rooms_filtered::v1::Request { federation::directory::get_public_rooms_filtered::v1::Request {
limit, limit,
since: since.as_deref(), since: since.as_deref(),

6
src/client_server/media.rs

@ -77,7 +77,7 @@ pub async fn get_content_route(
.sending .sending
.send_federation_request( .send_federation_request(
&db.globals, &db.globals,
body.server_name.clone(), &body.server_name,
get_content::Request { get_content::Request {
allow_remote: false, allow_remote: false,
server_name: &body.server_name, server_name: &body.server_name,
@ -126,12 +126,12 @@ pub async fn get_content_thumbnail_route(
.sending .sending
.send_federation_request( .send_federation_request(
&db.globals, &db.globals,
body.server_name.clone(), &body.server_name,
get_content_thumbnail::Request { get_content_thumbnail::Request {
allow_remote: false, allow_remote: false,
height: body.height, height: body.height,
width: body.width, width: body.width,
method: body.method, method: body.method.clone(),
server_name: &body.server_name, server_name: &body.server_name,
media_id: &body.media_id, media_id: &body.media_id,
}, },

67
src/client_server/membership.rs

@ -21,7 +21,7 @@ use ruma::{
serde::{to_canonical_value, CanonicalJsonObject, Raw}, serde::{to_canonical_value, CanonicalJsonObject, Raw},
EventId, RoomId, RoomVersionId, ServerName, UserId, EventId, RoomId, RoomVersionId, ServerName, UserId,
}; };
use state_res::StateEvent; use state_res::Event;
use std::{ use std::{
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
convert::TryFrom, convert::TryFrom,
@ -124,11 +124,7 @@ pub async fn leave_room_route(
}, },
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
db.flush().await?; db.flush().await?;
@ -164,11 +160,7 @@ pub async fn invite_user_route(
}, },
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
db.flush().await?; db.flush().await?;
@ -220,11 +212,7 @@ pub async fn kick_user_route(
}, },
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
db.flush().await?; db.flush().await?;
@ -280,11 +268,7 @@ pub async fn ban_user_route(
}, },
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
db.flush().await?; db.flush().await?;
@ -332,11 +316,7 @@ pub async fn unban_user_route(
}, },
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
db.flush().await?; db.flush().await?;
@ -468,7 +448,7 @@ async fn join_room_by_id_helper(
.sending .sending
.send_federation_request( .send_federation_request(
&db.globals, &db.globals,
remote_server.clone(), remote_server,
federation::membership::create_join_event_template::v1::Request { federation::membership::create_join_event_template::v1::Request {
room_id, room_id,
user_id: sender_user, user_id: sender_user,
@ -547,7 +527,7 @@ async fn join_room_by_id_helper(
.sending .sending
.send_federation_request( .send_federation_request(
&db.globals, &db.globals,
remote_server.clone(), remote_server,
federation::membership::create_join_event::v2::Request { federation::membership::create_join_event::v2::Request {
room_id, room_id,
event_id: &event_id, event_id: &event_id,
@ -594,19 +574,19 @@ async fn join_room_by_id_helper(
.chain(iter::once(Ok((event_id, join_event)))) // Add join event we just created .chain(iter::once(Ok((event_id, join_event)))) // Add join event we just created
.map(|r| { .map(|r| {
let (event_id, value) = r?; let (event_id, value) = r?;
state_res::StateEvent::from_id_canon_obj(event_id.clone(), value.clone()) PduEvent::from_id_val(&event_id, value.clone())
.map(|ev| (event_id, Arc::new(ev))) .map(|ev| (event_id, Arc::new(ev)))
.map_err(|e| { .map_err(|e| {
warn!("{:?}: {}", value, e); warn!("{:?}: {}", value, e);
Error::BadServerResponse("Invalid PDU in send_join response.") Error::BadServerResponse("Invalid PDU in send_join response.")
}) })
}) })
.collect::<Result<BTreeMap<EventId, Arc<StateEvent>>>>()?; .collect::<Result<BTreeMap<EventId, Arc<PduEvent>>>>()?;
let control_events = event_map let control_events = event_map
.values() .values()
.filter(|pdu| pdu.is_power_event()) .filter(|pdu| state_res::is_power_event(pdu))
.map(|pdu| pdu.event_id()) .map(|pdu| pdu.event_id.clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// These events are not guaranteed to be sorted but they are resolved according to spec // These events are not guaranteed to be sorted but they are resolved according to spec
@ -618,7 +598,6 @@ async fn join_room_by_id_helper(
&room_id, &room_id,
&control_events, &control_events,
&mut event_map, &mut event_map,
&db.rooms,
&event_ids, &event_ids,
); );
@ -629,7 +608,6 @@ async fn join_room_by_id_helper(
&sorted_control_events, &sorted_control_events,
&BTreeMap::new(), // We have no "clean/resolved" events to add (these extend the `resolved_control_events`) &BTreeMap::new(), // We have no "clean/resolved" events to add (these extend the `resolved_control_events`)
&mut event_map, &mut event_map,
&db.rooms,
) )
.expect("iterative auth check failed on resolved events"); .expect("iterative auth check failed on resolved events");
@ -646,14 +624,14 @@ async fn join_room_by_id_helper(
.cloned() .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let power_level = resolved_control_events.get(&(EventType::RoomPowerLevels, "".into())); let power_level =
resolved_control_events.get(&(EventType::RoomPowerLevels, Some("".into())));
// Sort the remaining non control events // Sort the remaining non control events
let sorted_event_ids = state_res::StateResolution::mainline_sort( let sorted_event_ids = state_res::StateResolution::mainline_sort(
room_id, room_id,
&events_to_sort, &events_to_sort,
power_level, power_level,
&mut event_map, &mut event_map,
&db.rooms,
); );
let resolved_events = state_res::StateResolution::iterative_auth_check( let resolved_events = state_res::StateResolution::iterative_auth_check(
@ -662,7 +640,6 @@ async fn join_room_by_id_helper(
&sorted_event_ids, &sorted_event_ids,
&resolved_control_events, &resolved_control_events,
&mut event_map, &mut event_map,
&db.rooms,
) )
.expect("iterative auth check failed on resolved events"); .expect("iterative auth check failed on resolved events");
@ -685,17 +662,17 @@ async fn join_room_by_id_helper(
pdu_id.push(0xff); pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes()); pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_pdu( db.rooms.append_pdu(
&PduEvent::from(&**pdu), &pdu,
utils::to_canonical_object(&**pdu).expect("Pdu is valid canonical object"), utils::to_canonical_object(&**pdu).expect("Pdu is valid canonical object"),
count, count,
pdu_id.clone().into(), pdu_id.clone().into(),
&db.globals, &db,
&db.account_data,
&db.admin,
)?; )?;
if state_events.contains(ev_id) { if state_events.contains(ev_id) {
state.insert((pdu.kind(), pdu.state_key()), pdu_id); if let Some(key) = &pdu.state_key {
state.insert((pdu.kind(), key.to_string()), pdu_id);
}
} }
} }
@ -719,11 +696,7 @@ async fn join_room_by_id_helper(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
} }

13
src/client_server/message.rs

@ -8,7 +8,10 @@ use ruma::{
events::EventContent, events::EventContent,
EventId, EventId,
}; };
use std::convert::{TryFrom, TryInto}; use std::{
collections::BTreeMap,
convert::{TryFrom, TryInto},
};
#[cfg(feature = "conduit_bin")] #[cfg(feature = "conduit_bin")]
use rocket::{get, put}; use rocket::{get, put};
@ -46,7 +49,7 @@ pub async fn send_message_event_route(
return Ok(send_message_event::Response { event_id }.into()); return Ok(send_message_event::Response { event_id }.into());
} }
let mut unsigned = serde_json::Map::new(); let mut unsigned = BTreeMap::new();
unsigned.insert("transaction_id".to_owned(), body.txn_id.clone().into()); unsigned.insert("transaction_id".to_owned(), body.txn_id.clone().into());
let event_id = db.rooms.build_and_append_pdu( let event_id = db.rooms.build_and_append_pdu(
@ -65,11 +68,7 @@ pub async fn send_message_event_route(
}, },
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
db.transaction_ids.add_txnid( db.transaction_ids.add_txnid(

12
src/client_server/profile.rs

@ -63,11 +63,7 @@ pub async fn set_displayname_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// Presence update // Presence update
@ -160,11 +156,7 @@ pub async fn set_avatar_url_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// Presence update // Presence update

6
src/client_server/redact.rs

@ -31,11 +31,7 @@ pub async fn redact_event_route(
}, },
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
db.flush().await?; db.flush().await?;

96
src/client_server/room.rs

@ -65,11 +65,7 @@ pub async fn create_room_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 2. Let the room creator join // 2. Let the room creator join
@ -90,11 +86,7 @@ pub async fn create_room_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 3. Power levels // 3. Power levels
@ -135,11 +127,7 @@ pub async fn create_room_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 4. Events set by preset // 4. Events set by preset
@ -175,11 +163,7 @@ pub async fn create_room_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 4.2 History Visibility // 4.2 History Visibility
@ -196,11 +180,7 @@ pub async fn create_room_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 4.3 Guest Access // 4.3 Guest Access
@ -225,11 +205,7 @@ pub async fn create_room_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// 5. Events listed in initial_state // 5. Events listed in initial_state
@ -244,16 +220,8 @@ pub async fn create_room_route(
continue; continue;
} }
db.rooms.build_and_append_pdu( db.rooms
pdu_builder, .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db)?;
&sender_user,
&room_id,
&db.globals,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?;
} }
// 6. Events implied by name and topic // 6. Events implied by name and topic
@ -273,11 +241,7 @@ pub async fn create_room_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
} }
@ -295,11 +259,7 @@ pub async fn create_room_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
} }
@ -322,11 +282,7 @@ pub async fn create_room_route(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
} }
@ -413,11 +369,7 @@ pub async fn upgrade_room_route(
}, },
sender_user, sender_user,
&body.room_id, &body.room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// Get the old room federations status // Get the old room federations status
@ -457,11 +409,7 @@ pub async fn upgrade_room_route(
}, },
sender_user, sender_user,
&replacement_room, &replacement_room,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// Join the new room // Join the new room
@ -482,11 +430,7 @@ pub async fn upgrade_room_route(
}, },
sender_user, sender_user,
&replacement_room, &replacement_room,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
// Recommended transferable state events list from the specs // Recommended transferable state events list from the specs
@ -519,11 +463,7 @@ pub async fn upgrade_room_route(
}, },
sender_user, sender_user,
&replacement_room, &replacement_room,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
} }
@ -566,11 +506,7 @@ pub async fn upgrade_room_route(
}, },
sender_user, sender_user,
&body.room_id, &body.room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
db.flush().await?; db.flush().await?;

6
src/client_server/state.rs

@ -284,11 +284,7 @@ pub async fn send_state_event_for_key_helper(
}, },
&sender_user, &sender_user,
&room_id, &room_id,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
)?; )?;
Ok(event_id) Ok(event_id)

3
src/client_server/sync.rs

@ -698,7 +698,8 @@ pub async fn sync_events_route(
if duration.as_secs() > 30 { if duration.as_secs() > 30 {
duration = Duration::from_secs(30); duration = Duration::from_secs(30);
} }
let mut delay = tokio::time::delay_for(duration); let delay = tokio::time::sleep(duration);
tokio::pin!(delay);
tokio::select! { tokio::select! {
_ = &mut delay => {} _ = &mut delay => {}
_ = watcher => {} _ = watcher => {}

2
src/database.rs

@ -106,7 +106,7 @@ impl Database {
let (admin_sender, admin_receiver) = mpsc::unbounded(); let (admin_sender, admin_receiver) = mpsc::unbounded();
let db = Self { let db = Self {
globals: globals::Globals::load(db.open_tree("global")?, config).await?, globals: globals::Globals::load(db.open_tree("global")?, config)?,
users: users::Users { users: users::Users {
userid_password: db.open_tree("userid_password")?, userid_password: db.open_tree("userid_password")?,
userid_displayname: db.open_tree("userid_displayname")?, userid_displayname: db.open_tree("userid_displayname")?,

6
src/database/admin.rs

@ -60,11 +60,7 @@ impl Admin {
}, },
&conduit_user, &conduit_user,
&conduit_room, &conduit_room,
&db.globals, &db,
&db.sending,
&db.admin,
&db.account_data,
&db.appservice,
) )
.unwrap(); .unwrap();
} }

10
src/database/globals.rs

@ -21,7 +21,7 @@ pub struct Globals {
} }
impl Globals { impl Globals {
pub async fn load(globals: sled::Tree, config: Config) -> Result<Self> { pub fn load(globals: sled::Tree, config: Config) -> Result<Self> {
let bytes = &*globals let bytes = &*globals
.update_and_fetch("keypair", utils::generate_keypair)? .update_and_fetch("keypair", utils::generate_keypair)?
.expect("utils::generate_keypair always returns Some"); .expect("utils::generate_keypair always returns Some");
@ -73,11 +73,9 @@ impl Globals {
config, config,
keypair: Arc::new(keypair), keypair: Arc::new(keypair),
reqwest_client, reqwest_client,
dns_resolver: TokioAsyncResolver::tokio_from_system_conf() dns_resolver: TokioAsyncResolver::tokio_from_system_conf().map_err(|_| {
.await Error::bad_config("Failed to set up trust dns resolver with system config.")
.map_err(|_| { })?,
Error::bad_config("Failed to set up trust dns resolver with system config.")
})?,
actual_destination_cache: Arc::new(RwLock::new(HashMap::new())), actual_destination_cache: Arc::new(RwLock::new(HashMap::new())),
jwt_decoding_key, jwt_decoding_key,
}) })

144
src/database/rooms.rs

@ -2,7 +2,7 @@ mod edus;
pub use edus::RoomEdus; pub use edus::RoomEdus;
use crate::{pdu::PduBuilder, utils, Error, PduEvent, Result}; use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result};
use log::error; use log::error;
use regex::Regex; use regex::Regex;
use ring::digest; use ring::digest;
@ -20,7 +20,7 @@ use ruma::{
EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
}; };
use sled::IVec; use sled::IVec;
use state_res::{event_auth, Error as StateError, Requester, StateEvent, StateMap, StateStore}; use state_res::{event_auth, StateMap};
use std::{ use std::{
collections::{BTreeMap, HashMap}, collections::{BTreeMap, HashMap},
@ -67,44 +67,6 @@ pub struct Rooms {
pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + Short, PduId = Count (without roomid) pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + Short, PduId = Count (without roomid)
} }
impl StateStore for Rooms {
fn get_event(
&self,
room_id: &RoomId,
event_id: &EventId,
) -> state_res::Result<Arc<StateEvent>> {
let pid = self
.get_pdu_id(event_id)
.map_err(StateError::custom)?
.ok_or_else(|| {
StateError::NotFound(format!(
"PDU via room_id and event_id not found in the db: {}",
event_id.as_str()
))
})?;
serde_json::from_slice(
&self
.pduid_pdu
.get(pid)
.map_err(StateError::custom)?
.ok_or_else(|| StateError::NotFound("PDU via pduid not found in db.".into()))?,
)
.map_err(Into::into)
.and_then(|pdu: StateEvent| {
// conduit's PDU's always contain a room_id but some
// of ruma's do not so this must be an Option
if pdu.room_id() == room_id {
Ok(Arc::new(pdu))
} else {
Err(StateError::NotFound(
"Found PDU for incorrect room in db.".into(),
))
}
})
}
}
impl Rooms { impl Rooms {
/// Builds a StateMap by iterating over all keys that start /// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash. /// with state_hash, this gives the full state for the given state_hash.
@ -112,7 +74,7 @@ impl Rooms {
&self, &self,
room_id: &RoomId, room_id: &RoomId,
state_hash: &StateHashId, state_hash: &StateHashId,
) -> Result<StateMap<PduEvent>> { ) -> Result<BTreeMap<(EventType, String), PduEvent>> {
self.stateid_pduid self.stateid_pduid
.scan_prefix(&state_hash) .scan_prefix(&state_hash)
.values() .values()
@ -141,7 +103,7 @@ impl Rooms {
pdu, pdu,
)) ))
}) })
.collect::<Result<StateMap<_>>>() .collect()
} }
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
@ -181,7 +143,7 @@ impl Rooms {
))) )))
}) })
} else { } else {
return Ok(None); Ok(None)
} }
} }
@ -205,7 +167,7 @@ impl Rooms {
content: serde_json::Value, content: serde_json::Value,
) -> Result<StateMap<PduEvent>> { ) -> Result<StateMap<PduEvent>> {
let auth_events = state_res::auth_types_for_event( let auth_events = state_res::auth_types_for_event(
kind.clone(), &kind,
sender, sender,
state_key.map(|s| s.to_string()), state_key.map(|s| s.to_string()),
content, content,
@ -213,7 +175,13 @@ impl Rooms {
let mut events = StateMap::new(); let mut events = StateMap::new();
for (event_type, state_key) in auth_events { for (event_type, state_key) in auth_events {
if let Some((_, pdu)) = self.room_state_get(room_id, &event_type, &state_key)? { if let Some((_, pdu)) = self.room_state_get(
room_id,
&event_type,
&state_key
.as_deref()
.ok_or_else(|| Error::Conflict("Found a non state event in auth events"))?,
)? {
events.insert((event_type, state_key), pdu); events.insert((event_type, state_key), pdu);
} }
} }
@ -290,7 +258,10 @@ impl Rooms {
} }
/// Returns the full room state. /// Returns the full room state.
pub fn room_state_full(&self, room_id: &RoomId) -> Result<StateMap<PduEvent>> { pub fn room_state_full(
&self,
room_id: &RoomId,
) -> Result<BTreeMap<(EventType, String), PduEvent>> {
if let Some(current_state_hash) = self.current_state_hash(room_id)? { if let Some(current_state_hash) = self.current_state_hash(room_id)? {
self.state_full(&room_id, &current_state_hash) self.state_full(&room_id, &current_state_hash)
} else { } else {
@ -448,9 +419,7 @@ impl Rooms {
mut pdu_json: CanonicalJsonObject, mut pdu_json: CanonicalJsonObject,
count: u64, count: u64,
pdu_id: IVec, pdu_id: IVec,
globals: &super::globals::Globals, db: &Database,
account_data: &super::account_data::AccountData,
admin: &super::admin::Admin,
) -> Result<()> { ) -> Result<()> {
// Make unsigned fields correct. This is not properly documented in the spec, but state // Make unsigned fields correct. This is not properly documented in the spec, but state
// events need to have previous content in the unsigned field, so clients can easily // events need to have previous content in the unsigned field, so clients can easily
@ -484,7 +453,7 @@ impl Rooms {
// Mark as read first so the sending client doesn't get a notification even if appending // Mark as read first so the sending client doesn't get a notification even if appending
// fails // fails
self.edus self.edus
.private_read_set(&pdu.room_id, &pdu.sender, count, &globals)?; .private_read_set(&pdu.room_id, &pdu.sender, count, &db.globals)?;
self.pduid_pdu.insert( self.pduid_pdu.insert(
&pdu_id, &pdu_id,
@ -522,8 +491,8 @@ impl Rooms {
) )
})?, })?,
&pdu.sender, &pdu.sender,
account_data, &db.account_data,
globals, &db.globals,
)?; )?;
} }
} }
@ -541,10 +510,10 @@ impl Rooms {
self.tokenids.insert(key, &[])?; self.tokenids.insert(key, &[])?;
} }
if body.starts_with(&format!("@conduit:{}: ", globals.server_name())) if body.starts_with(&format!("@conduit:{}: ", db.globals.server_name()))
&& self && self
.id_from_alias( .id_from_alias(
&format!("#admins:{}", globals.server_name()) &format!("#admins:{}", db.globals.server_name())
.try_into() .try_into()
.expect("#admins:server_name is a valid room alias"), .expect("#admins:server_name is a valid room alias"),
)? )?
@ -571,10 +540,11 @@ impl Rooms {
); );
match parsed_config { match parsed_config {
Ok(yaml) => { Ok(yaml) => {
admin.send(AdminCommand::RegisterAppservice(yaml)); db.admin
.send(AdminCommand::RegisterAppservice(yaml));
} }
Err(e) => { Err(e) => {
admin.send(AdminCommand::SendMessage( db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain( message::MessageEventContent::text_plain(
format!( format!(
"Could not parse appservice config: {}", "Could not parse appservice config: {}",
@ -585,7 +555,7 @@ impl Rooms {
} }
} }
} else { } else {
admin.send(AdminCommand::SendMessage( db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain( message::MessageEventContent::text_plain(
"Expected code block in command body.", "Expected code block in command body.",
), ),
@ -593,10 +563,10 @@ impl Rooms {
} }
} }
"list_appservices" => { "list_appservices" => {
admin.send(AdminCommand::ListAppservices); db.admin.send(AdminCommand::ListAppservices);
} }
_ => { _ => {
admin.send(AdminCommand::SendMessage( db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(format!( message::MessageEventContent::text_plain(format!(
"Command: {}, Args: {:?}", "Command: {}, Args: {:?}",
command, args command, args
@ -703,11 +673,7 @@ impl Rooms {
pdu_builder: PduBuilder, pdu_builder: PduBuilder,
sender: &UserId, sender: &UserId,
room_id: &RoomId, room_id: &RoomId,
globals: &super::globals::Globals, db: &Database,
sending: &super::sending::Sending,
admin: &super::admin::Admin,
account_data: &super::account_data::AccountData,
appservice: &super::appservice::Appservice,
) -> Result<EventId> { ) -> Result<EventId> {
let PduBuilder { let PduBuilder {
event_type, event_type,
@ -790,7 +756,7 @@ impl Rooms {
if !match event_type { if !match event_type {
EventType::RoomEncryption => { EventType::RoomEncryption => {
// Only allow encryption events if it's allowed in the config // Only allow encryption events if it's allowed in the config
globals.allow_encryption() db.globals.allow_encryption()
} }
EventType::RoomMember => { EventType::RoomMember => {
let prev_event = self let prev_event = self
@ -798,23 +764,17 @@ impl Rooms {
ErrorKind::Unknown, ErrorKind::Unknown,
"Membership can't be the first event", "Membership can't be the first event",
))?)? ))?)?
.map(|pdu| pdu.convert_for_state_res()); .map(Arc::new);
event_auth::valid_membership_change( event_auth::valid_membership_change(
// TODO this is a bit of a hack but not sure how to have a type Some(state_key.as_str()),
// declared in `state_res` crate easily convert to/from conduit::PduEvent sender,
Requester { content.clone(),
prev_event_ids: prev_events.to_owned(),
room_id: &room_id,
content: &content,
state_key: Some(state_key.to_owned()),
sender: &sender,
},
prev_event, prev_event,
None, // TODO: third party invite None, // TODO: third party invite
&auth_events &auth_events
.iter() .iter()
.map(|((ty, key), pdu)| { .map(|((ty, key), pdu)| {
Ok(((ty.clone(), key.clone()), pdu.convert_for_state_res())) Ok(((ty.clone(), key.clone()), Arc::new(pdu.clone())))
}) })
.collect::<Result<StateMap<_>>>()?, .collect::<Result<StateMap<_>>>()?,
) )
@ -902,13 +862,13 @@ impl Rooms {
// Add origin because synapse likes that (and it's required in the spec) // Add origin because synapse likes that (and it's required in the spec)
pdu_json.insert( pdu_json.insert(
"origin".to_owned(), "origin".to_owned(),
to_canonical_value(globals.server_name()) to_canonical_value(db.globals.server_name())
.expect("server name is a valid CanonicalJsonValue"), .expect("server name is a valid CanonicalJsonValue"),
); );
ruma::signatures::hash_and_sign_event( ruma::signatures::hash_and_sign_event(
globals.server_name().as_str(), db.globals.server_name().as_str(),
globals.keypair(), db.globals.keypair(),
&mut pdu_json, &mut pdu_json,
&RoomVersionId::Version6, &RoomVersionId::Version6,
) )
@ -929,24 +889,16 @@ impl Rooms {
// Increment the last index and use that // Increment the last index and use that
// This is also the next_batch/since value // This is also the next_batch/since value
let count = globals.next_count()?; let count = db.globals.next_count()?;
let mut pdu_id = room_id.as_bytes().to_vec(); let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff); pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes()); pdu_id.extend_from_slice(&count.to_be_bytes());
// We append to state before appending the pdu, so we don't have a moment in time with the // We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail. // pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = self.append_to_state(&pdu_id, &pdu, &globals)?; let statehashid = self.append_to_state(&pdu_id, &pdu, &db.globals)?;
self.append_pdu( self.append_pdu(&pdu, pdu_json, count, pdu_id.clone().into(), db)?;
&pdu,
pdu_json,
count,
pdu_id.clone().into(),
globals,
account_data,
admin,
)?;
// We set the room state after inserting the pdu, so that we never have a moment in time // We set the room state after inserting the pdu, so that we never have a moment in time
// where events in the current room state do not exist // where events in the current room state do not exist
@ -955,12 +907,12 @@ impl Rooms {
for server in self for server in self
.room_servers(room_id) .room_servers(room_id)
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.filter(|server| &**server != globals.server_name()) .filter(|server| &**server != db.globals.server_name())
{ {
sending.send_pdu(&server, &pdu_id)?; db.sending.send_pdu(&server, &pdu_id)?;
} }
for appservice in appservice.iter_all().filter_map(|r| r.ok()) { for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) {
if let Some(namespaces) = appservice.1.get("namespaces") { if let Some(namespaces) = appservice.1.get("namespaces") {
let users = namespaces let users = namespaces
.get("users") .get("users")
@ -996,7 +948,7 @@ impl Rooms {
.get("sender_localpart") .get("sender_localpart")
.and_then(|string| string.as_str()) .and_then(|string| string.as_str())
.and_then(|string| { .and_then(|string| {
UserId::parse_with_server_name(string, globals.server_name()).ok() UserId::parse_with_server_name(string, db.globals.server_name()).ok()
}); });
if bridge_user_id.map_or(false, |bridge_user_id| { if bridge_user_id.map_or(false, |bridge_user_id| {
@ -1018,7 +970,7 @@ impl Rooms {
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.any(|member| users.iter().any(|regex| regex.is_match(member.as_str()))) .any(|member| users.iter().any(|regex| regex.is_match(member.as_str())))
{ {
sending.send_pdu_appservice(&appservice.0, &pdu_id)?; db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
} }
} }
} }

3
src/database/rooms/edus.rs

@ -1,5 +1,4 @@
use crate::{utils, Error, Result}; use crate::{utils, Error, Result};
use js_int::UInt;
use ruma::{ use ruma::{
events::{ events::{
presence::{PresenceEvent, PresenceEventContent}, presence::{PresenceEvent, PresenceEventContent},
@ -7,7 +6,7 @@ use ruma::{
}, },
presence::PresenceState, presence::PresenceState,
serde::Raw, serde::Raw,
RoomId, UserId, RoomId, UInt, UserId,
}; };
use std::{ use std::{
collections::HashMap, collections::HashMap,

12
src/database/sending.rs

@ -169,8 +169,13 @@ impl Sending {
(last_failed.0+1, Instant::now()) (last_failed.0+1, Instant::now())
}, },
None => { None => {
(1, Instant::now())
}
});
servercurrentpdus.remove(&prefix).unwrap(); servercurrentpdus.remove(&prefix).unwrap();
};
}
}
}, },
Some(event) = &mut subscriber => { Some(event) = &mut subscriber => {
if let sled::Event::Insert { key, .. } = event { if let sled::Event::Insert { key, .. } = event {
@ -255,7 +260,7 @@ impl Sending {
); );
} }
} }
} },
} }
} }
}); });
@ -517,12 +522,11 @@ impl Sending {
) )
}) })
} }
pub async fn send_federation_request<T: OutgoingRequest>( pub async fn send_federation_request<T: OutgoingRequest>(
&self, &self,
globals: &crate::database::globals::Globals, globals: &crate::database::globals::Globals,
destination: Box<ServerName>, destination: &ServerName,
request: T, request: T,
) -> Result<T::IncomingResponse> ) -> Result<T::IncomingResponse>
where where

3
src/database/users.rs

@ -1,5 +1,4 @@
use crate::{utils, Error, Result}; use crate::{utils, Error, Result};
use js_int::UInt;
use ruma::{ use ruma::{
api::client::{ api::client::{
error::ErrorKind, error::ErrorKind,
@ -11,7 +10,7 @@ use ruma::{
encryption::DeviceKeys, encryption::DeviceKeys,
events::{AnyToDeviceEvent, EventType}, events::{AnyToDeviceEvent, EventType},
serde::Raw, serde::Raw,
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, UserId, DeviceId, DeviceKeyAlgorithm, DeviceKeyId, UInt, UserId,
}; };
use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime}; use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime};

142
src/pdu.rs

@ -1,23 +1,17 @@
use crate::Error; use crate::Error;
use js_int::UInt;
use ruma::{ use ruma::{
events::{ events::{
pdu::EventHash, room::member::MemberEventContent, AnyEvent, AnyRoomEvent, AnyStateEvent, pdu::EventHash, room::member::MemberEventContent, AnyEvent, AnyRoomEvent, AnyStateEvent,
AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent, EventType, StateEvent, AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent, EventType, StateEvent,
}, },
serde::{to_canonical_value, CanonicalJsonObject, CanonicalJsonValue, Raw}, serde::{to_canonical_value, CanonicalJsonObject, CanonicalJsonValue, Raw},
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UInt, UserId,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use std::{ use std::{cmp::Ordering, collections::BTreeMap, convert::TryFrom, time::UNIX_EPOCH};
collections::BTreeMap,
convert::{TryFrom, TryInto},
sync::Arc,
time::UNIX_EPOCH,
};
#[derive(Deserialize, Serialize, Debug)] #[derive(Clone, Deserialize, Serialize, Debug)]
pub struct PduEvent { pub struct PduEvent {
pub event_id: EventId, pub event_id: EventId,
pub room_id: RoomId, pub room_id: RoomId,
@ -33,8 +27,8 @@ pub struct PduEvent {
pub auth_events: Vec<EventId>, pub auth_events: Vec<EventId>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub redacts: Option<EventId>, pub redacts: Option<EventId>,
#[serde(default, skip_serializing_if = "serde_json::Map::is_empty")] #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub unsigned: serde_json::Map<String, serde_json::Value>, pub unsigned: BTreeMap<String, serde_json::Value>,
pub hashes: EventHash, pub hashes: EventHash,
pub signatures: BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, String>>, pub signatures: BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, String>>,
} }
@ -227,61 +221,85 @@ impl PduEvent {
) )
.expect("Raw::from_value always works") .expect("Raw::from_value always works")
} }
pub fn from_id_val(
event_id: &EventId,
mut json: CanonicalJsonObject,
) -> Result<Self, serde_json::Error> {
json.insert(
"event_id".to_string(),
ruma::serde::to_canonical_value(event_id).expect("event_id is a valid Value"),
);
serde_json::from_value(serde_json::to_value(json).expect("valid JSON"))
}
} }
impl From<&state_res::StateEvent> for PduEvent { impl state_res::Event for PduEvent {
fn from(pdu: &state_res::StateEvent) -> Self { fn event_id(&self) -> &EventId {
Self { &self.event_id
event_id: pdu.event_id(), }
room_id: pdu.room_id().clone(),
sender: pdu.sender().clone(), fn room_id(&self) -> &RoomId {
origin_server_ts: (pdu &self.room_id
.origin_server_ts() }
.duration_since(UNIX_EPOCH)
.expect("time is valid") fn sender(&self) -> &UserId {
.as_millis() as u64) &self.sender
.try_into() }
.expect("time is valid"), fn kind(&self) -> EventType {
kind: pdu.kind(), self.kind.clone()
content: pdu.content().clone(), }
state_key: Some(pdu.state_key()),
prev_events: pdu.prev_event_ids(), fn content(&self) -> serde_json::Value {
depth: *pdu.depth(), self.content.clone()
auth_events: pdu.auth_events(), }
redacts: pdu.redacts().cloned(), fn origin_server_ts(&self) -> std::time::SystemTime {
unsigned: pdu.unsigned().clone().into_iter().collect(), UNIX_EPOCH + std::time::Duration::from_millis(self.origin_server_ts.into())
hashes: pdu.hashes().clone(), }
signatures: pdu.signatures(),
} fn state_key(&self) -> Option<String> {
self.state_key.clone()
}
fn prev_events(&self) -> Vec<EventId> {
self.prev_events.to_vec()
}
fn depth(&self) -> &UInt {
&self.depth
}
fn auth_events(&self) -> Vec<EventId> {
self.auth_events.to_vec()
}
fn redacts(&self) -> Option<&EventId> {
self.redacts.as_ref()
}
fn hashes(&self) -> &EventHash {
&self.hashes
}
fn signatures(&self) -> BTreeMap<Box<ServerName>, BTreeMap<ruma::ServerSigningKeyId, String>> {
self.signatures.clone()
}
fn unsigned(&self) -> &BTreeMap<String, serde_json::Value> {
&self.unsigned
} }
} }
impl PduEvent { // These impl's allow us to dedup state snapshots when resolving state
pub fn convert_for_state_res(&self) -> Arc<state_res::StateEvent> { // for incoming events (federation/send/{txn}).
Arc::new( impl Eq for PduEvent {}
// For consistency of eventId (just in case) we use the one impl PartialEq for PduEvent {
// generated by conduit for everything. fn eq(&self, other: &Self) -> bool {
state_res::StateEvent::from_id_value( self.event_id == other.event_id
self.event_id.clone(), }
json!({ }
"event_id": self.event_id, impl PartialOrd for PduEvent {
"room_id": self.room_id, fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
"sender": self.sender, self.event_id.partial_cmp(&other.event_id)
"origin_server_ts": self.origin_server_ts, }
"type": self.kind, }
"content": self.content, impl Ord for PduEvent {
"state_key": self.state_key, fn cmp(&self, other: &Self) -> Ordering {
"prev_events": self.prev_events, self.event_id.cmp(&other.event_id)
"depth": self.depth,
"auth_events": self.auth_events,
"redacts": self.redacts,
"unsigned": self.unsigned,
"hashes": self.hashes,
"signatures": self.signatures,
}),
)
.expect("all conduit PDUs are state events"),
)
} }
} }
@ -315,7 +333,7 @@ pub struct PduBuilder {
#[serde(rename = "type")] #[serde(rename = "type")]
pub event_type: EventType, pub event_type: EventType,
pub content: serde_json::Value, pub content: serde_json::Value,
pub unsigned: Option<serde_json::Map<String, serde_json::Value>>, pub unsigned: Option<BTreeMap<String, serde_json::Value>>,
pub state_key: Option<String>, pub state_key: Option<String>,
pub redacts: Option<EventId>, pub redacts: Option<EventId>,
} }

11
src/ruma_wrapper.rs

@ -1,6 +1,6 @@
use crate::Error; use crate::Error;
use ruma::{ use ruma::{
api::{AuthScheme, OutgoingRequest}, api::{AuthScheme, IncomingRequest, OutgoingRequest},
identifiers::{DeviceId, UserId}, identifiers::{DeviceId, UserId},
Outgoing, Outgoing,
}; };
@ -40,10 +40,7 @@ pub struct Ruma<T: Outgoing> {
#[cfg(feature = "conduit_bin")] #[cfg(feature = "conduit_bin")]
impl<'a, T: Outgoing + OutgoingRequest> FromTransformedData<'a> for Ruma<T> impl<'a, T: Outgoing + OutgoingRequest> FromTransformedData<'a> for Ruma<T>
where where
<T as Outgoing>::Incoming: TryFrom<http::request::Request<std::vec::Vec<u8>>> + std::fmt::Debug, T::Incoming: IncomingRequest,
<<T as Outgoing>::Incoming as std::convert::TryFrom<
http::request::Request<std::vec::Vec<u8>>,
>>::Error: std::fmt::Debug,
{ {
type Error = (); type Error = ();
type Owned = Data; type Owned = Data;
@ -153,7 +150,7 @@ where
let http_request = http_request.body(body.clone()).unwrap(); let http_request = http_request.body(body.clone()).unwrap();
debug!("{:?}", http_request); debug!("{:?}", http_request);
match <T as Outgoing>::Incoming::try_from(http_request) { match <T::Incoming as IncomingRequest>::try_from_http_request(http_request) {
Ok(t) => Success(Ruma { Ok(t) => Success(Ruma {
body: t, body: t,
sender_user, sender_user,
@ -166,7 +163,7 @@ where
}), }),
Err(e) => { Err(e) => {
warn!("{:?}", e); warn!("{:?}", e);
Failure((Status::raw(583), ())) Failure((Status::BadRequest, ()))
} }
} }
}) })

23
src/server_server.rs

@ -10,7 +10,7 @@ use ruma::{
get_server_keys, get_server_version::v1 as get_server_version, ServerSigningKeys, get_server_keys, get_server_version::v1 as get_server_version, ServerSigningKeys,
VerifyKey, VerifyKey,
}, },
event::{get_event, get_missing_events, get_room_state_ids}, event::get_missing_events,
query::get_profile_information::{self, v1::ProfileField}, query::get_profile_information::{self, v1::ProfileField},
transactions::send_transaction_message, transactions::send_transaction_message,
}, },
@ -29,7 +29,7 @@ use std::{
pub async fn send_request<T: OutgoingRequest>( pub async fn send_request<T: OutgoingRequest>(
globals: &crate::database::globals::Globals, globals: &crate::database::globals::Globals,
destination: Box<ServerName>, destination: &ServerName,
request: T, request: T,
) -> Result<T::IncomingResponse> ) -> Result<T::IncomingResponse>
where where
@ -43,18 +43,18 @@ where
.actual_destination_cache .actual_destination_cache
.read() .read()
.unwrap() .unwrap()
.get(&destination) .get(destination)
.cloned(); .cloned();
let (actual_destination, host) = if let Some(result) = maybe_result { let (actual_destination, host) = if let Some(result) = maybe_result {
result result
} else { } else {
let result = find_actual_destination(globals, &destination).await; let result = find_actual_destination(globals, destination).await;
globals globals
.actual_destination_cache .actual_destination_cache
.write() .write()
.unwrap() .unwrap()
.insert(destination.clone(), result.clone()); .insert(Box::<ServerName>::from(destination), result.clone());
result result
}; };
@ -215,7 +215,7 @@ fn add_port_to_hostname(destination_str: String) -> String {
/// Numbers in comments below refer to bullet points in linked section of specification /// Numbers in comments below refer to bullet points in linked section of specification
pub(crate) async fn find_actual_destination( pub(crate) async fn find_actual_destination(
globals: &crate::database::globals::Globals, globals: &crate::database::globals::Globals,
destination: &Box<ServerName>, destination: &ServerName,
) -> (String, Option<String>) { ) -> (String, Option<String>) {
let mut host = None; let mut host = None;
@ -558,15 +558,8 @@ pub async fn send_transaction_message_route<'a>(
let next_room_state = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; let next_room_state = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;
db.rooms.append_pdu( db.rooms
&pdu, .append_pdu(&pdu, value, count, pdu_id.clone().into(), &db)?;
value,
count,
pdu_id.clone().into(),
&db.globals,
&db.account_data,
&db.admin,
)?;
db.rooms.set_room_state(&room_id, &next_room_state)?; db.rooms.set_room_state(&room_id, &next_room_state)?;

Loading…
Cancel
Save