Browse Source

Batch sending push notifications to the gateway, cleanup

merge-requests/18/head
Devin Ragotzy 5 years ago
parent
commit
b94dbf2544
  1. 148
      src/database/pusher.rs

148
src/database/pusher.rs

@ -1,11 +1,12 @@
use crate::{Database, Error, PduEvent, Result}; use crate::{database::globals::Globals, Database, Error, PduEvent, Result};
use log::{error, info, warn}; use log::{error, info, warn};
use rocket::futures::stream::{FuturesUnordered, StreamExt};
use ruma::{ use ruma::{
api::{ api::{
client::r0::push::{Pusher, PusherKind}, client::r0::push::{Pusher, PusherKind},
push_gateway::send_event_notification::{ push_gateway::send_event_notification::v1::{
self, self as send_event_notification, Device, Notification, NotificationCounts,
v1::{Device, Notification, NotificationCounts, NotificationPriority}, NotificationPriority,
}, },
OutgoingRequest, OutgoingRequest,
}, },
@ -16,8 +17,9 @@ use ruma::{
}, },
events::EventType, events::EventType,
push::{Action, PushCondition, PushFormat, Ruleset, Tweak}, push::{Action, PushCondition, PushFormat, Ruleset, Tweak},
uint, UInt, UserId, uint, EventId, RoomAliasId, RoomId, UInt, UserId,
}; };
use serde_json::value::RawValue as RawJsonValue;
use std::{convert::TryFrom, fmt::Debug, time::Duration}; use std::{convert::TryFrom, fmt::Debug, time::Duration};
@ -182,7 +184,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; send_notice(unread, pushers, tweaks, pdu, db).await?;
break; break;
} }
} }
@ -202,8 +204,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) send_notice(unread, pushers, tweaks, pdu, db).await?;
.await?;
break; break;
} }
} }
@ -234,8 +235,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) send_notice(unread, pushers, tweaks, pdu, db).await?;
.await?;
break; break;
} }
} }
@ -260,8 +260,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) send_notice(unread, pushers, tweaks, pdu, db).await?;
.await?;
break; break;
} }
} }
@ -277,7 +276,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; send_notice(unread, pushers, tweaks, pdu, db).await?;
break; break;
} }
} }
@ -315,8 +314,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) send_notice(unread, pushers, tweaks, pdu, db).await?;
.await?;
break; break;
} }
} }
@ -341,8 +339,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) send_notice(unread, pushers, tweaks, pdu, db).await?;
.await?;
break; break;
} }
} }
@ -358,7 +355,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; send_notice(unread, pushers, tweaks, pdu, db).await?;
break; break;
} }
} }
@ -374,7 +371,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; send_notice(unread, pushers, tweaks, pdu, db).await?;
break; break;
} }
} }
@ -390,7 +387,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; send_notice(unread, pushers, tweaks, pdu, db).await?;
break; break;
} }
} }
@ -404,7 +401,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; send_notice(unread, pushers, tweaks, pdu, db).await?;
break; break;
} }
} }
@ -418,7 +415,7 @@ pub async fn send_push_notice(
_ => None, _ => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; send_notice(unread, pushers, tweaks, pdu, db).await?;
break; break;
} }
} }
@ -435,7 +432,6 @@ async fn send_notice(
tweaks: Vec<Tweak>, tweaks: Vec<Tweak>,
event: &PduEvent, event: &PduEvent,
db: &Database, db: &Database,
name: &str,
) -> Result<()> { ) -> Result<()> {
let (http, _emails): (Vec<&Pusher>, _) = pushers let (http, _emails): (Vec<&Pusher>, _) = pushers
.iter() .iter()
@ -445,6 +441,7 @@ async fn send_notice(
// Two problems with this // Two problems with this
// 1. if "event_id_only" is the only format kind it seems we should never add more info // 1. if "event_id_only" is the only format kind it seems we should never add more info
// 2. can pusher/devices have conflicting formats // 2. can pusher/devices have conflicting formats
let mut outgoing = FuturesUnordered::new();
for pusher in http { for pusher in http {
let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly); let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly);
let url = if let Some(url) = pusher.data.url.as_ref() { let url = if let Some(url) = pusher.data.url.as_ref() {
@ -465,13 +462,16 @@ async fn send_notice(
device.tweaks = tweaks.clone(); device.tweaks = tweaks.clone();
} }
let d = &[device]; let devices = vec![device];
let mut notifi = Notification::new(d); let mut notifi = Notice {
devices,
prio: NotificationPriority::Low,
event_id: Some(event.event_id.clone()),
room_id: Some(event.room_id.clone()),
counts: NotificationCounts::new(unread, uint!(0)),
..Default::default()
};
notifi.prio = NotificationPriority::Low;
notifi.event_id = Some(&event.event_id);
notifi.room_id = Some(&event.room_id);
// TODO: missed calls
notifi.counts = NotificationCounts::new(unread, uint!(0)); notifi.counts = NotificationCounts::new(unread, uint!(0));
if event.kind == EventType::RoomEncrypted if event.kind == EventType::RoomEncrypted
@ -482,17 +482,9 @@ async fn send_notice(
notifi.prio = NotificationPriority::High notifi.prio = NotificationPriority::High
} }
if event_id_only { if !event_id_only {
error!("SEND PUSH NOTICE `{}`", name); notifi.sender = Some(event.sender.clone());
// send_request( notifi.event_type = Some(event.kind.clone());
// &db.globals,
// &url,
// send_event_notification::v1::Request::new(notifi),
// )
// .await?;
} else {
notifi.sender = Some(&event.sender);
notifi.event_type = Some(&event.kind);
notifi.content = serde_json::value::to_raw_value(&event.content).ok(); notifi.content = serde_json::value::to_raw_value(&event.content).ok();
if event.kind == EventType::RoomMember { if event.kind == EventType::RoomMember {
@ -500,7 +492,7 @@ async fn send_notice(
} }
let user_name = db.users.displayname(&event.sender)?; let user_name = db.users.displayname(&event.sender)?;
notifi.sender_display_name = user_name.as_deref(); notifi.sender_display_name = user_name;
let room_name = db let room_name = db
.rooms .rooms
.room_state_get(&event.room_id, &EventType::RoomName, "")? .room_state_get(&event.room_id, &EventType::RoomName, "")?
@ -509,20 +501,76 @@ async fn send_notice(
_ => None, _ => None,
}) })
.flatten(); .flatten();
notifi.room_name = room_name.as_deref(); notifi.room_name = room_name;
error!("SEND PUSH NOTICE Full `{}`", name); outgoing.push(send_helper(notifi, url, &db.globals));
// send_request( continue;
// &db.globals,
// &url,
// send_event_notification::v1::Request::new(notifi),
// )
// .await?;
} }
outgoing.push(send_helper(notifi, url, &db.globals));
} }
loop {
match outgoing.next().await {
Some(Ok(_)) => continue,
Some(Err(_)) => return Err(Error::BadServerResponse("Server failed to respond")),
None => break,
}
}
// TODO: email // TODO: email
// for email in emails {} // for email in emails {}
Ok(()) Ok(())
} }
#[derive(Debug, Default)]
struct Notice {
event_id: Option<EventId>,
room_id: Option<RoomId>,
event_type: Option<EventType>,
sender: Option<UserId>,
sender_display_name: Option<String>,
room_name: Option<String>,
room_alias: Option<RoomAliasId>,
user_is_target: bool,
prio: NotificationPriority,
content: Option<Box<RawJsonValue>>,
counts: NotificationCounts,
devices: Vec<Device>,
}
async fn send_helper(notice: Notice, url: &str, globals: &Globals) -> Result<()> {
let Notice {
event_id,
room_id,
event_type,
sender,
sender_display_name,
room_alias,
room_name,
user_is_target,
prio,
counts,
content,
devices,
} = notice;
let mut outgoing = Notification::new(&devices);
outgoing.event_id = event_id.as_ref();
outgoing.room_id = room_id.as_ref();
outgoing.event_type = event_type.as_ref();
outgoing.sender = sender.as_ref();
outgoing.sender_display_name = sender_display_name.as_deref();
outgoing.room_alias = room_alias.as_ref();
outgoing.room_name = room_name.as_deref();
outgoing.user_is_target = user_is_target;
outgoing.prio = prio;
outgoing.counts = counts;
outgoing.content = content;
send_request(
globals,
&url,
send_event_notification::Request::new(outgoing),
)
.await?;
Ok(())
}

Loading…
Cancel
Save