diff --git a/src/database.rs b/src/database.rs index 84ca68d..f5f3a9c 100644 --- a/src/database.rs +++ b/src/database.rs @@ -772,6 +772,7 @@ impl Database { #[cfg(feature = "sqlite")] { Self::start_wal_clean_task(Arc::clone(&db), config).await; + Self::start_watcher_prune_task(Arc::clone(&db)).await; } Ok(db) @@ -959,6 +960,43 @@ impl Database { } }); } + + #[cfg(feature = "sqlite")] + #[tracing::instrument(skip(db))] + pub async fn start_watcher_prune_task(db: Arc>) { + use std::time::Duration; + use tokio::time::sleep; + + tokio::spawn(async move { + let mut interval: u64 = 15; + + const MAX_PRUNE_INTERVAL_SECS: u64 = 60; + + loop { + sleep(Duration::from_secs(interval)).await; + + let mut pruned = false; + + for table in db.read().await._db.tables.read().values() { + if let Some(table) = table.upgrade() { + pruned |= table.prune_dead_watchers(); + } + } + + // Will half the next interval if it found anything (until its 1 second) + // Will add one second to next interval if it didn't find anything (until its >=MAX_PRUNE_INTERVAL_SECS) + if pruned { + if interval > 1 { + interval /= 2; + } + } else { + if interval < MAX_PRUNE_INTERVAL_SECS { + interval += 1; + } + } + } + }); + } } pub struct DatabaseGuard(OwnedRwLockReadGuard); diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 1d2038c..2848ab6 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -8,7 +8,7 @@ use std::{ future::Future, path::{Path, PathBuf}, pin::Pin, - sync::Arc, + sync::{Arc, Weak}, }; use thread_local::ThreadLocal; use tokio::sync::watch; @@ -46,6 +46,8 @@ pub struct Engine { path: PathBuf, cache_size_per_thread: u32, + + pub(in crate::database) tables: RwLock>>, } impl Engine { @@ -102,19 +104,46 @@ impl DatabaseEngine for Engine { read_iterator_conn_tls: ThreadLocal::new(), path, cache_size_per_thread, + tables: RwLock::new(HashMap::new()), }); Ok(arc) } fn open_tree(self: &Arc, name: &str) -> Result> { - self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; + fn create_new(engine: &Arc, name: &str) -> Result { + engine.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; + + Ok(SqliteTable { + engine: Arc::clone(engine), + name: name.to_owned(), + watchers: RwLock::new(HashMap::new()), + }) + } + + // Table mappings are `Weak` to prevent reference cycles, that creates this additional correctness logic. + Ok(match self.tables.write().entry(name.to_string()) { + hash_map::Entry::Occupied(mut o) => { + if let Some(table) = o.get().upgrade() { + table + } else { + // On the off-chance that a table was dropped somewhere... + + let table = Arc::new(create_new(self, name)?); + + o.insert(Arc::downgrade(&table)); + + table + } + } + hash_map::Entry::Vacant(v) => { + let table = Arc::new(create_new(self, name)?); + + v.insert(Arc::downgrade(&table)); - Ok(Arc::new(SqliteTable { - engine: Arc::clone(self), - name: name.to_owned(), - watchers: RwLock::new(HashMap::new()), - })) + table + } + }) } fn flush(self: &Arc) -> Result<()> { @@ -375,7 +404,8 @@ impl Tree for SqliteTable { }; Box::pin(async move { - // Tx is never destroyed + // Tx will be destroyed if receiver_count = 1, + // and self.watchers is held with write lock rx.changed().await.unwrap(); }) } @@ -390,3 +420,41 @@ impl Tree for SqliteTable { Ok(()) } } + +impl SqliteTable { + pub fn prune_dead_watchers(&self) -> bool { + let watchers = self.watchers.read(); + let mut interesting = Vec::new(); + for (key, (tx, _)) in watchers.iter() { + if tx.receiver_count() == 1 { + // We do read mode first to get all "interesting" keys, not blocking other watch threads. + interesting.push(key.clone()) + } + } + + drop(watchers); + + if !interesting.is_empty() { + let mut watchers = self.watchers.write(); + let mut cleared = 0; + for prefix in interesting { + // Test for occupied, there is only the slightest chance this entry has been deleted inbetween then and now. + if let hash_map::Entry::Occupied(o) = watchers.entry(prefix) { + // Check one last time, its possible a receiver is cloned inbetween then and now. + if o.get().0.receiver_count() == 1 { + // Receiver == 1, this is the receiver bundled in the entry, so it's okay to drop this. + o.remove_entry(); + + cleared += 1; + } + } + } + + debug!("Cleared {} dead watchers.", cleared); + + return cleared != 0; + } + + false + } +}