Browse Source

Merge branch 'watch-prune' into 'next'

Add watcher prune task for sqlite

See merge request famedly/conduit!227
merge-requests/227/merge
Jonathan de Jong 4 years ago
parent
commit
5aa7592bfe
  1. 38
      src/database.rs
  2. 80
      src/database/abstraction/sqlite.rs

38
src/database.rs

@ -772,6 +772,7 @@ impl Database { @@ -772,6 +772,7 @@ impl Database {
#[cfg(feature = "sqlite")]
{
Self::start_wal_clean_task(Arc::clone(&db), config).await;
Self::start_watcher_prune_task(Arc::clone(&db)).await;
}
Ok(db)
@ -959,6 +960,43 @@ impl Database { @@ -959,6 +960,43 @@ impl Database {
}
});
}
#[cfg(feature = "sqlite")]
#[tracing::instrument(skip(db))]
pub async fn start_watcher_prune_task(db: Arc<TokioRwLock<Self>>) {
use std::time::Duration;
use tokio::time::sleep;
tokio::spawn(async move {
let mut interval: u64 = 15;
const MAX_PRUNE_INTERVAL_SECS: u64 = 60;
loop {
sleep(Duration::from_secs(interval)).await;
let mut pruned = false;
for table in db.read().await._db.tables.read().values() {
if let Some(table) = table.upgrade() {
pruned |= table.prune_dead_watchers();
}
}
// Will half the next interval if it found anything (until its 1 second)
// Will add one second to next interval if it didn't find anything (until its >=MAX_PRUNE_INTERVAL_SECS)
if pruned {
if interval > 1 {
interval /= 2;
}
} else {
if interval < MAX_PRUNE_INTERVAL_SECS {
interval += 1;
}
}
}
});
}
}
pub struct DatabaseGuard(OwnedRwLockReadGuard<Database>);

80
src/database/abstraction/sqlite.rs

@ -8,7 +8,7 @@ use std::{ @@ -8,7 +8,7 @@ use std::{
future::Future,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
sync::{Arc, Weak},
};
use thread_local::ThreadLocal;
use tokio::sync::watch;
@ -46,6 +46,8 @@ pub struct Engine { @@ -46,6 +46,8 @@ pub struct Engine {
path: PathBuf,
cache_size_per_thread: u32,
pub(in crate::database) tables: RwLock<HashMap<String, Weak<SqliteTable>>>,
}
impl Engine {
@ -102,19 +104,46 @@ impl DatabaseEngine for Engine { @@ -102,19 +104,46 @@ impl DatabaseEngine for Engine {
read_iterator_conn_tls: ThreadLocal::new(),
path,
cache_size_per_thread,
tables: RwLock::new(HashMap::new()),
});
Ok(arc)
}
fn open_tree(self: &Arc<Self>, name: &str) -> Result<Arc<dyn Tree>> {
self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?;
fn create_new(engine: &Arc<Engine>, name: &str) -> Result<SqliteTable> {
engine.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?;
Ok(Arc::new(SqliteTable {
engine: Arc::clone(self),
Ok(SqliteTable {
engine: Arc::clone(engine),
name: name.to_owned(),
watchers: RwLock::new(HashMap::new()),
}))
})
}
// Table mappings are `Weak` to prevent reference cycles, that creates this additional correctness logic.
Ok(match self.tables.write().entry(name.to_string()) {
hash_map::Entry::Occupied(mut o) => {
if let Some(table) = o.get().upgrade() {
table
} else {
// On the off-chance that a table was dropped somewhere...
let table = Arc::new(create_new(self, name)?);
o.insert(Arc::downgrade(&table));
table
}
}
hash_map::Entry::Vacant(v) => {
let table = Arc::new(create_new(self, name)?);
v.insert(Arc::downgrade(&table));
table
}
})
}
fn flush(self: &Arc<Self>) -> Result<()> {
@ -375,7 +404,8 @@ impl Tree for SqliteTable { @@ -375,7 +404,8 @@ impl Tree for SqliteTable {
};
Box::pin(async move {
// Tx is never destroyed
// Tx will be destroyed if receiver_count = 1,
// and self.watchers is held with write lock
rx.changed().await.unwrap();
})
}
@ -390,3 +420,41 @@ impl Tree for SqliteTable { @@ -390,3 +420,41 @@ impl Tree for SqliteTable {
Ok(())
}
}
impl SqliteTable {
pub fn prune_dead_watchers(&self) -> bool {
let watchers = self.watchers.read();
let mut interesting = Vec::new();
for (key, (tx, _)) in watchers.iter() {
if tx.receiver_count() == 1 {
// We do read mode first to get all "interesting" keys, not blocking other watch threads.
interesting.push(key.clone())
}
}
drop(watchers);
if !interesting.is_empty() {
let mut watchers = self.watchers.write();
let mut cleared = 0;
for prefix in interesting {
// Test for occupied, there is only the slightest chance this entry has been deleted inbetween then and now.
if let hash_map::Entry::Occupied(o) = watchers.entry(prefix) {
// Check one last time, its possible a receiver is cloned inbetween then and now.
if o.get().0.receiver_count() == 1 {
// Receiver == 1, this is the receiver bundled in the entry, so it's okay to drop this.
o.remove_entry();
cleared += 1;
}
}
}
debug!("Cleared {} dead watchers.", cleared);
return cleared != 0;
}
false
}
}

Loading…
Cancel
Save