From 3015dfd04c8bbf263bd59f6ab759d9c9160bc6da Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Mon, 22 Nov 2021 22:42:08 +0100 Subject: [PATCH 1/5] Add watcher prune task for sqlite --- src/database.rs | 20 ++++++++++ src/database/abstraction/sqlite.rs | 61 ++++++++++++++++++++++++++---- 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/src/database.rs b/src/database.rs index 080e24b..c3ca057 100644 --- a/src/database.rs +++ b/src/database.rs @@ -774,6 +774,7 @@ impl Database { #[cfg(feature = "sqlite")] { Self::start_wal_clean_task(Arc::clone(&db), config).await; + Self::start_watcher_prune_task(Arc::clone(&db)).await; } Ok(db) @@ -961,6 +962,25 @@ impl Database { } }); } + + #[cfg(feature = "sqlite")] + #[tracing::instrument(skip(db))] + pub async fn start_watcher_prune_task(db: Arc>) { + use std::time::Duration; + use tokio::time::interval; + + tokio::spawn(async move { + let mut i = interval(Duration::from_secs(60)); + + loop { + i.tick().await; + + for table in db.read().await._db.tables.read().values() { + table.prune_dead_watchers(); + } + } + }); + } } pub struct DatabaseGuard(OwnedRwLockReadGuard); diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 1d2038c..6433059 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -46,6 +46,8 @@ pub struct Engine { path: PathBuf, cache_size_per_thread: u32, + + pub(in crate::database) tables: RwLock>>, } impl Engine { @@ -102,19 +104,29 @@ impl DatabaseEngine for Engine { read_iterator_conn_tls: ThreadLocal::new(), path, cache_size_per_thread, + tables: RwLock::new(HashMap::new()), }); Ok(arc) } fn open_tree(self: &Arc, name: &str) -> Result> { - self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; + Ok(match self.tables.write().entry(name.to_string()) { + hash_map::Entry::Occupied(o) => o.get().clone(), + hash_map::Entry::Vacant(v) => { + self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; + + let table = Arc::new(SqliteTable { + engine: Arc::clone(self), + name: name.to_owned(), + watchers: RwLock::new(HashMap::new()), + }); - Ok(Arc::new(SqliteTable { - engine: Arc::clone(self), - name: name.to_owned(), - watchers: RwLock::new(HashMap::new()), - })) + v.insert(table.clone()); + + table + } + }) } fn flush(self: &Arc) -> Result<()> { @@ -375,7 +387,8 @@ impl Tree for SqliteTable { }; Box::pin(async move { - // Tx is never destroyed + // Tx will be destroyed if receiver_count = 1, + // and self.watchers is held with write lock rx.changed().await.unwrap(); }) } @@ -390,3 +403,37 @@ impl Tree for SqliteTable { Ok(()) } } + +impl SqliteTable { + pub fn prune_dead_watchers(&self) { + let watchers = self.watchers.read(); + let mut interesting = Vec::new(); + for (key, (tx, _)) in watchers.iter() { + if tx.receiver_count() == 1 { + // We do read mode first to get all "interesting" keys, not blocking other watch threads. + interesting.push(key.clone()) + } + } + + drop(watchers); + + if !interesting.is_empty() { + let mut watchers = self.watchers.write(); + let mut cleared = 0; + for prefix in interesting { + // Test for occupied, there is only the slightest chance this entry has been deleted inbetween then and now. + if let hash_map::Entry::Occupied(o) = watchers.entry(prefix) { + // Check one last time, its possible a receiver is cloned inbetween then and now. + if o.get().0.receiver_count() == 1 { + // Receiver == 1, this is the receiver bundled in the entry, so it's okay to drop this. + o.remove_entry(); + + cleared += 1; + } + } + } + + debug!("Cleared {} dead watchers.", cleared) + } + } +} From 316425ccc31db45e0b7ab958ec33cf9faf27584b Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Mon, 22 Nov 2021 22:58:09 +0100 Subject: [PATCH 2/5] make SqliteTable value in hashmap Weak --- src/database.rs | 4 ++- src/database/abstraction/sqlite.rs | 39 +++++++++++++++++++++--------- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/database.rs b/src/database.rs index c3ca057..6900088 100644 --- a/src/database.rs +++ b/src/database.rs @@ -976,7 +976,9 @@ impl Database { i.tick().await; for table in db.read().await._db.tables.read().values() { - table.prune_dead_watchers(); + if let Some(table) = table.upgrade() { + table.prune_dead_watchers(); + } } } }); diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 6433059..6717013 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -8,7 +8,7 @@ use std::{ future::Future, path::{Path, PathBuf}, pin::Pin, - sync::Arc, + sync::{Arc, Weak}, }; use thread_local::ThreadLocal; use tokio::sync::watch; @@ -47,7 +47,7 @@ pub struct Engine { path: PathBuf, cache_size_per_thread: u32, - pub(in crate::database) tables: RwLock>>, + pub(in crate::database) tables: RwLock>>, } impl Engine { @@ -111,18 +111,35 @@ impl DatabaseEngine for Engine { } fn open_tree(self: &Arc, name: &str) -> Result> { + fn create_new(engine: &Arc, name: &str) -> Result { + engine.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; + + SqliteTable { + engine: Arc::clone(engine), + name: name.to_owned(), + watchers: RwLock::new(HashMap::new()), + } + } + + // Table mappings are `Weak` to prevent reference cycles, that creates this additional correctness logic. Ok(match self.tables.write().entry(name.to_string()) { - hash_map::Entry::Occupied(o) => o.get().clone(), - hash_map::Entry::Vacant(v) => { - self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; + hash_map::Entry::Occupied(o) => { + if let Some(table) = o.get().upgrade() { + table + } else { + // On the off-chance that a table was dropped somewhere... - let table = Arc::new(SqliteTable { - engine: Arc::clone(self), - name: name.to_owned(), - watchers: RwLock::new(HashMap::new()), - }); + let table = Arc::new(create_new(self, name)?); + + o.insert(table.downgrade()); + + table + } + } + hash_map::Entry::Vacant(v) => { + let table = Arc::new(create_new(self, name)?); - v.insert(table.clone()); + v.insert(table.downgrade()); table } From 794b84c4041e2255acb69691d9916cc8cc7685f3 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Mon, 22 Nov 2021 23:01:40 +0100 Subject: [PATCH 3/5] whoops --- src/database/abstraction/sqlite.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 6717013..06a8673 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -114,16 +114,16 @@ impl DatabaseEngine for Engine { fn create_new(engine: &Arc, name: &str) -> Result { engine.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; - SqliteTable { + Ok(SqliteTable { engine: Arc::clone(engine), name: name.to_owned(), watchers: RwLock::new(HashMap::new()), - } + }) } // Table mappings are `Weak` to prevent reference cycles, that creates this additional correctness logic. Ok(match self.tables.write().entry(name.to_string()) { - hash_map::Entry::Occupied(o) => { + hash_map::Entry::Occupied(mut o) => { if let Some(table) = o.get().upgrade() { table } else { @@ -131,7 +131,7 @@ impl DatabaseEngine for Engine { let table = Arc::new(create_new(self, name)?); - o.insert(table.downgrade()); + o.insert(Arc::downgrade(&table)); table } @@ -139,7 +139,7 @@ impl DatabaseEngine for Engine { hash_map::Entry::Vacant(v) => { let table = Arc::new(create_new(self, name)?); - v.insert(table.downgrade()); + v.insert(Arc::downgrade(&table)); table } From 87f0702dc9c64bc9b59c07113166ddbad0969408 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 23 Nov 2021 19:11:29 +0100 Subject: [PATCH 4/5] adaptive prune intervals --- src/database.rs | 22 ++++++++++++++++++---- src/database/abstraction/sqlite.rs | 8 ++++++-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/database.rs b/src/database.rs index 6900088..c198f0e 100644 --- a/src/database.rs +++ b/src/database.rs @@ -967,17 +967,31 @@ impl Database { #[tracing::instrument(skip(db))] pub async fn start_watcher_prune_task(db: Arc>) { use std::time::Duration; - use tokio::time::interval; + use tokio::time::sleep; tokio::spawn(async move { - let mut i = interval(Duration::from_secs(60)); + let mut interval: usize = 15; + + const MAX_PRUNE_INTERVAL_SECS: usize = 60; loop { - i.tick().await; + sleep(Duration::from_secs(interval)).await; + + let mut pruned = false; for table in db.read().await._db.tables.read().values() { if let Some(table) = table.upgrade() { - table.prune_dead_watchers(); + pruned |= table.prune_dead_watchers(); + } + } + + if pruned { + if interval > 1 { + interval /= 2; + } + } else { + if interval < MAX_PRUNE_INTERVAL_SECS { + interval += 1; } } } diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 06a8673..2848ab6 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -422,7 +422,7 @@ impl Tree for SqliteTable { } impl SqliteTable { - pub fn prune_dead_watchers(&self) { + pub fn prune_dead_watchers(&self) -> bool { let watchers = self.watchers.read(); let mut interesting = Vec::new(); for (key, (tx, _)) in watchers.iter() { @@ -450,7 +450,11 @@ impl SqliteTable { } } - debug!("Cleared {} dead watchers.", cleared) + debug!("Cleared {} dead watchers.", cleared); + + return cleared != 0; } + + false } } From 83e9655c0c2ff6e609cba8e4499cf0527f31a0ed Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 23 Nov 2021 19:13:11 +0100 Subject: [PATCH 5/5] add comments and fix type --- src/database.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/database.rs b/src/database.rs index c198f0e..4177b1a 100644 --- a/src/database.rs +++ b/src/database.rs @@ -970,9 +970,9 @@ impl Database { use tokio::time::sleep; tokio::spawn(async move { - let mut interval: usize = 15; + let mut interval: u64 = 15; - const MAX_PRUNE_INTERVAL_SECS: usize = 60; + const MAX_PRUNE_INTERVAL_SECS: u64 = 60; loop { sleep(Duration::from_secs(interval)).await; @@ -985,6 +985,8 @@ impl Database { } } + // Will half the next interval if it found anything (until its 1 second) + // Will add one second to next interval if it didn't find anything (until its >=MAX_PRUNE_INTERVAL_SECS) if pruned { if interval > 1 { interval /= 2;