|
|
|
@ -967,17 +967,31 @@ impl Database { |
|
|
|
#[tracing::instrument(skip(db))] |
|
|
|
#[tracing::instrument(skip(db))] |
|
|
|
pub async fn start_watcher_prune_task(db: Arc<TokioRwLock<Self>>) { |
|
|
|
pub async fn start_watcher_prune_task(db: Arc<TokioRwLock<Self>>) { |
|
|
|
use std::time::Duration; |
|
|
|
use std::time::Duration; |
|
|
|
use tokio::time::interval; |
|
|
|
use tokio::time::sleep; |
|
|
|
|
|
|
|
|
|
|
|
tokio::spawn(async move { |
|
|
|
tokio::spawn(async move { |
|
|
|
let mut i = interval(Duration::from_secs(60)); |
|
|
|
let mut interval: usize = 15; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const MAX_PRUNE_INTERVAL_SECS: usize = 60; |
|
|
|
|
|
|
|
|
|
|
|
loop { |
|
|
|
loop { |
|
|
|
i.tick().await; |
|
|
|
sleep(Duration::from_secs(interval)).await; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut pruned = false; |
|
|
|
|
|
|
|
|
|
|
|
for table in db.read().await._db.tables.read().values() { |
|
|
|
for table in db.read().await._db.tables.read().values() { |
|
|
|
if let Some(table) = table.upgrade() { |
|
|
|
if let Some(table) = table.upgrade() { |
|
|
|
table.prune_dead_watchers(); |
|
|
|
pruned |= table.prune_dead_watchers(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if pruned { |
|
|
|
|
|
|
|
if interval > 1 { |
|
|
|
|
|
|
|
interval /= 2; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
if interval < MAX_PRUNE_INTERVAL_SECS { |
|
|
|
|
|
|
|
interval += 1; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|