diff --git a/src/database/pusher.rs b/src/database/pusher.rs index c4f5801..60b21fc 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -1,11 +1,12 @@ -use crate::{Database, Error, PduEvent, Result}; +use crate::{database::globals::Globals, Database, Error, PduEvent, Result}; use log::{error, info, warn}; +use rocket::futures::stream::{FuturesUnordered, StreamExt}; use ruma::{ api::{ client::r0::push::{Pusher, PusherKind}, - push_gateway::send_event_notification::{ - self, - v1::{Device, Notification, NotificationCounts, NotificationPriority}, + push_gateway::send_event_notification::v1::{ + self as send_event_notification, Device, Notification, NotificationCounts, + NotificationPriority, }, OutgoingRequest, }, @@ -16,8 +17,9 @@ use ruma::{ }, events::EventType, push::{Action, PushCondition, PushFormat, Ruleset, Tweak}, - uint, UInt, UserId, + uint, EventId, RoomAliasId, RoomId, UInt, UserId, }; +use serde_json::value::RawValue as RawJsonValue; use std::{convert::TryFrom, fmt::Debug, time::Duration}; @@ -182,7 +184,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -202,8 +204,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) - .await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -234,8 +235,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) - .await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -260,8 +260,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) - .await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -277,7 +276,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -315,8 +314,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) - .await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -341,8 +339,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) - .await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -358,7 +355,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -374,7 +371,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -390,7 +387,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -404,7 +401,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -418,7 +415,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pushers, tweaks, pdu, db).await?; break; } } @@ -435,7 +432,6 @@ async fn send_notice( tweaks: Vec, event: &PduEvent, db: &Database, - name: &str, ) -> Result<()> { let (http, _emails): (Vec<&Pusher>, _) = pushers .iter() @@ -445,6 +441,7 @@ async fn send_notice( // Two problems with this // 1. if "event_id_only" is the only format kind it seems we should never add more info // 2. can pusher/devices have conflicting formats + let mut outgoing = FuturesUnordered::new(); for pusher in http { let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly); let url = if let Some(url) = pusher.data.url.as_ref() { @@ -465,13 +462,16 @@ async fn send_notice( device.tweaks = tweaks.clone(); } - let d = &[device]; - let mut notifi = Notification::new(d); + let devices = vec![device]; + let mut notifi = Notice { + devices, + prio: NotificationPriority::Low, + event_id: Some(event.event_id.clone()), + room_id: Some(event.room_id.clone()), + counts: NotificationCounts::new(unread, uint!(0)), + ..Default::default() + }; - notifi.prio = NotificationPriority::Low; - notifi.event_id = Some(&event.event_id); - notifi.room_id = Some(&event.room_id); - // TODO: missed calls notifi.counts = NotificationCounts::new(unread, uint!(0)); if event.kind == EventType::RoomEncrypted @@ -482,17 +482,9 @@ async fn send_notice( notifi.prio = NotificationPriority::High } - if event_id_only { - error!("SEND PUSH NOTICE `{}`", name); - // send_request( - // &db.globals, - // &url, - // send_event_notification::v1::Request::new(notifi), - // ) - // .await?; - } else { - notifi.sender = Some(&event.sender); - notifi.event_type = Some(&event.kind); + if !event_id_only { + notifi.sender = Some(event.sender.clone()); + notifi.event_type = Some(event.kind.clone()); notifi.content = serde_json::value::to_raw_value(&event.content).ok(); if event.kind == EventType::RoomMember { @@ -500,7 +492,7 @@ async fn send_notice( } let user_name = db.users.displayname(&event.sender)?; - notifi.sender_display_name = user_name.as_deref(); + notifi.sender_display_name = user_name; let room_name = db .rooms .room_state_get(&event.room_id, &EventType::RoomName, "")? @@ -509,20 +501,76 @@ async fn send_notice( _ => None, }) .flatten(); - notifi.room_name = room_name.as_deref(); - - error!("SEND PUSH NOTICE Full `{}`", name); - // send_request( - // &db.globals, - // &url, - // send_event_notification::v1::Request::new(notifi), - // ) - // .await?; + notifi.room_name = room_name; + + outgoing.push(send_helper(notifi, url, &db.globals)); + continue; } + + outgoing.push(send_helper(notifi, url, &db.globals)); } + loop { + match outgoing.next().await { + Some(Ok(_)) => continue, + Some(Err(_)) => return Err(Error::BadServerResponse("Server failed to respond")), + None => break, + } + } // TODO: email // for email in emails {} Ok(()) } + +#[derive(Debug, Default)] +struct Notice { + event_id: Option, + room_id: Option, + event_type: Option, + sender: Option, + sender_display_name: Option, + room_name: Option, + room_alias: Option, + user_is_target: bool, + prio: NotificationPriority, + content: Option>, + counts: NotificationCounts, + devices: Vec, +} + +async fn send_helper(notice: Notice, url: &str, globals: &Globals) -> Result<()> { + let Notice { + event_id, + room_id, + event_type, + sender, + sender_display_name, + room_alias, + room_name, + user_is_target, + prio, + counts, + content, + devices, + } = notice; + let mut outgoing = Notification::new(&devices); + outgoing.event_id = event_id.as_ref(); + outgoing.room_id = room_id.as_ref(); + outgoing.event_type = event_type.as_ref(); + outgoing.sender = sender.as_ref(); + outgoing.sender_display_name = sender_display_name.as_deref(); + outgoing.room_alias = room_alias.as_ref(); + outgoing.room_name = room_name.as_deref(); + outgoing.user_is_target = user_is_target; + outgoing.prio = prio; + outgoing.counts = counts; + outgoing.content = content; + send_request( + globals, + &url, + send_event_notification::Request::new(outgoing), + ) + .await?; + Ok(()) +}