|
|
|
@ -54,14 +54,15 @@ impl Sending { |
|
|
|
)) |
|
|
|
)) |
|
|
|
}) |
|
|
|
}) |
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
|
|
|
|
.filter(|pdu| !pdu.is_empty()) // Skip reservation key
|
|
|
|
|
|
|
|
.take(50) |
|
|
|
|
|
|
|
// This should not contain more than 50 anyway
|
|
|
|
{ |
|
|
|
{ |
|
|
|
if !pdu.is_empty() { |
|
|
|
|
|
|
|
current_transactions |
|
|
|
current_transactions |
|
|
|
.entry(server) |
|
|
|
.entry(server) |
|
|
|
.or_insert_with(Vec::new) |
|
|
|
.or_insert_with(Vec::new) |
|
|
|
.push(pdu); |
|
|
|
.push(pdu); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (server, pdus) in current_transactions { |
|
|
|
for (server, pdus) in current_transactions { |
|
|
|
futures.push(Self::handle_event(server, pdus, &globals, &rooms)); |
|
|
|
futures.push(Self::handle_event(server, pdus, &globals, &rooms)); |
|
|
|
@ -95,7 +96,9 @@ impl Sending { |
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
.map(|k| { |
|
|
|
.map(|k| { |
|
|
|
k.subslice(prefix.len(), k.len() - prefix.len()) |
|
|
|
k.subslice(prefix.len(), k.len() - prefix.len()) |
|
|
|
}).collect::<Vec<_>>(); |
|
|
|
}) |
|
|
|
|
|
|
|
.take(50) |
|
|
|
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
|
|
|
|
|
|
|
if !new_pdus.is_empty() { |
|
|
|
if !new_pdus.is_empty() { |
|
|
|
for pdu_id in &new_pdus { |
|
|
|
for pdu_id in &new_pdus { |
|
|
|
@ -108,6 +111,7 @@ impl Sending { |
|
|
|
futures.push(Self::handle_event(server, new_pdus, &globals, &rooms)); |
|
|
|
futures.push(Self::handle_event(server, new_pdus, &globals, &rooms)); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
servercurrentpdus.remove(&prefix).unwrap(); |
|
|
|
servercurrentpdus.remove(&prefix).unwrap(); |
|
|
|
|
|
|
|
// servercurrentpdus with the prefix should be empty now
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
Err((_server, _e)) => { |
|
|
|
Err((_server, _e)) => { |
|
|
|
|