From 3015dfd04c8bbf263bd59f6ab759d9c9160bc6da Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Mon, 22 Nov 2021 22:42:08 +0100 Subject: [PATCH] Add watcher prune task for sqlite --- src/database.rs | 20 ++++++++++ src/database/abstraction/sqlite.rs | 61 ++++++++++++++++++++++++++---- 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/src/database.rs b/src/database.rs index 080e24b..c3ca057 100644 --- a/src/database.rs +++ b/src/database.rs @@ -774,6 +774,7 @@ impl Database { #[cfg(feature = "sqlite")] { Self::start_wal_clean_task(Arc::clone(&db), config).await; + Self::start_watcher_prune_task(Arc::clone(&db)).await; } Ok(db) @@ -961,6 +962,25 @@ impl Database { } }); } + + #[cfg(feature = "sqlite")] + #[tracing::instrument(skip(db))] + pub async fn start_watcher_prune_task(db: Arc>) { + use std::time::Duration; + use tokio::time::interval; + + tokio::spawn(async move { + let mut i = interval(Duration::from_secs(60)); + + loop { + i.tick().await; + + for table in db.read().await._db.tables.read().values() { + table.prune_dead_watchers(); + } + } + }); + } } pub struct DatabaseGuard(OwnedRwLockReadGuard); diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 1d2038c..6433059 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -46,6 +46,8 @@ pub struct Engine { path: PathBuf, cache_size_per_thread: u32, + + pub(in crate::database) tables: RwLock>>, } impl Engine { @@ -102,19 +104,29 @@ impl DatabaseEngine for Engine { read_iterator_conn_tls: ThreadLocal::new(), path, cache_size_per_thread, + tables: RwLock::new(HashMap::new()), }); Ok(arc) } fn open_tree(self: &Arc, name: &str) -> Result> { - self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; + Ok(match self.tables.write().entry(name.to_string()) { + hash_map::Entry::Occupied(o) => o.get().clone(), + hash_map::Entry::Vacant(v) => { + self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; + + let table = Arc::new(SqliteTable { + engine: Arc::clone(self), + name: name.to_owned(), + watchers: RwLock::new(HashMap::new()), + }); - Ok(Arc::new(SqliteTable { - engine: Arc::clone(self), - name: name.to_owned(), - watchers: RwLock::new(HashMap::new()), - })) + v.insert(table.clone()); + + table + } + }) } fn flush(self: &Arc) -> Result<()> { @@ -375,7 +387,8 @@ impl Tree for SqliteTable { }; Box::pin(async move { - // Tx is never destroyed + // Tx will be destroyed if receiver_count = 1, + // and self.watchers is held with write lock rx.changed().await.unwrap(); }) } @@ -390,3 +403,37 @@ impl Tree for SqliteTable { Ok(()) } } + +impl SqliteTable { + pub fn prune_dead_watchers(&self) { + let watchers = self.watchers.read(); + let mut interesting = Vec::new(); + for (key, (tx, _)) in watchers.iter() { + if tx.receiver_count() == 1 { + // We do read mode first to get all "interesting" keys, not blocking other watch threads. + interesting.push(key.clone()) + } + } + + drop(watchers); + + if !interesting.is_empty() { + let mut watchers = self.watchers.write(); + let mut cleared = 0; + for prefix in interesting { + // Test for occupied, there is only the slightest chance this entry has been deleted inbetween then and now. + if let hash_map::Entry::Occupied(o) = watchers.entry(prefix) { + // Check one last time, its possible a receiver is cloned inbetween then and now. + if o.get().0.receiver_count() == 1 { + // Receiver == 1, this is the receiver bundled in the entry, so it's okay to drop this. + o.remove_entry(); + + cleared += 1; + } + } + } + + debug!("Cleared {} dead watchers.", cleared) + } + } +}