|
|
|
@ -30,7 +30,10 @@ use ruma::{ |
|
|
|
receipt::ReceiptType, |
|
|
|
receipt::ReceiptType, |
|
|
|
MilliSecondsSinceUnixEpoch, ServerName, UInt, UserId, |
|
|
|
MilliSecondsSinceUnixEpoch, ServerName, UInt, UserId, |
|
|
|
}; |
|
|
|
}; |
|
|
|
use tokio::{select, sync::{Semaphore, RwLock}}; |
|
|
|
use tokio::{ |
|
|
|
|
|
|
|
select, |
|
|
|
|
|
|
|
sync::{RwLock, Semaphore}, |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
use super::abstraction::Tree; |
|
|
|
use super::abstraction::Tree; |
|
|
|
|
|
|
|
|
|
|
|
@ -90,7 +93,11 @@ enum TransactionStatus { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl Sending { |
|
|
|
impl Sending { |
|
|
|
pub fn start_handler(&self, db: Arc<RwLock<Database>>, mut receiver: mpsc::UnboundedReceiver<Vec<u8>>) { |
|
|
|
pub fn start_handler( |
|
|
|
|
|
|
|
&self, |
|
|
|
|
|
|
|
db: Arc<RwLock<Database>>, |
|
|
|
|
|
|
|
mut receiver: mpsc::UnboundedReceiver<Vec<u8>>, |
|
|
|
|
|
|
|
) { |
|
|
|
tokio::spawn(async move { |
|
|
|
tokio::spawn(async move { |
|
|
|
let mut futures = FuturesUnordered::new(); |
|
|
|
let mut futures = FuturesUnordered::new(); |
|
|
|
|
|
|
|
|
|
|
|
@ -102,7 +109,8 @@ impl Sending { |
|
|
|
let guard = db.read().await; |
|
|
|
let guard = db.read().await; |
|
|
|
|
|
|
|
|
|
|
|
for (key, outgoing_kind, event) in |
|
|
|
for (key, outgoing_kind, event) in |
|
|
|
guard.sending |
|
|
|
guard |
|
|
|
|
|
|
|
.sending |
|
|
|
.servercurrentevents |
|
|
|
.servercurrentevents |
|
|
|
.iter() |
|
|
|
.iter() |
|
|
|
.filter_map(|(key, _)| { |
|
|
|
.filter_map(|(key, _)| { |
|
|
|
@ -132,7 +140,11 @@ impl Sending { |
|
|
|
for (outgoing_kind, events) in initial_transactions { |
|
|
|
for (outgoing_kind, events) in initial_transactions { |
|
|
|
current_transaction_status |
|
|
|
current_transaction_status |
|
|
|
.insert(outgoing_kind.get_prefix(), TransactionStatus::Running); |
|
|
|
.insert(outgoing_kind.get_prefix(), TransactionStatus::Running); |
|
|
|
futures.push(Self::handle_events(outgoing_kind.clone(), events, Arc::clone(&db))); |
|
|
|
futures.push(Self::handle_events( |
|
|
|
|
|
|
|
outgoing_kind.clone(), |
|
|
|
|
|
|
|
events, |
|
|
|
|
|
|
|
Arc::clone(&db), |
|
|
|
|
|
|
|
)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
loop { |
|
|
|
loop { |
|
|
|
|