Browse Source

Add watcher prune task for sqlite

merge-requests/227/head
Jonathan de Jong 4 years ago
parent
commit
3015dfd04c
  1. 20
      src/database.rs
  2. 61
      src/database/abstraction/sqlite.rs

20
src/database.rs

@ -774,6 +774,7 @@ impl Database {
#[cfg(feature = "sqlite")] #[cfg(feature = "sqlite")]
{ {
Self::start_wal_clean_task(Arc::clone(&db), config).await; Self::start_wal_clean_task(Arc::clone(&db), config).await;
Self::start_watcher_prune_task(Arc::clone(&db)).await;
} }
Ok(db) Ok(db)
@ -961,6 +962,25 @@ impl Database {
} }
}); });
} }
#[cfg(feature = "sqlite")]
#[tracing::instrument(skip(db))]
pub async fn start_watcher_prune_task(db: Arc<TokioRwLock<Self>>) {
use std::time::Duration;
use tokio::time::interval;
tokio::spawn(async move {
let mut i = interval(Duration::from_secs(60));
loop {
i.tick().await;
for table in db.read().await._db.tables.read().values() {
table.prune_dead_watchers();
}
}
});
}
} }
pub struct DatabaseGuard(OwnedRwLockReadGuard<Database>); pub struct DatabaseGuard(OwnedRwLockReadGuard<Database>);

61
src/database/abstraction/sqlite.rs

@ -46,6 +46,8 @@ pub struct Engine {
path: PathBuf, path: PathBuf,
cache_size_per_thread: u32, cache_size_per_thread: u32,
pub(in crate::database) tables: RwLock<HashMap<String, Arc<SqliteTable>>>,
} }
impl Engine { impl Engine {
@ -102,19 +104,29 @@ impl DatabaseEngine for Engine {
read_iterator_conn_tls: ThreadLocal::new(), read_iterator_conn_tls: ThreadLocal::new(),
path, path,
cache_size_per_thread, cache_size_per_thread,
tables: RwLock::new(HashMap::new()),
}); });
Ok(arc) Ok(arc)
} }
fn open_tree(self: &Arc<Self>, name: &str) -> Result<Arc<dyn Tree>> { fn open_tree(self: &Arc<Self>, name: &str) -> Result<Arc<dyn Tree>> {
self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; Ok(match self.tables.write().entry(name.to_string()) {
hash_map::Entry::Occupied(o) => o.get().clone(),
hash_map::Entry::Vacant(v) => {
self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?;
let table = Arc::new(SqliteTable {
engine: Arc::clone(self),
name: name.to_owned(),
watchers: RwLock::new(HashMap::new()),
});
Ok(Arc::new(SqliteTable { v.insert(table.clone());
engine: Arc::clone(self),
name: name.to_owned(), table
watchers: RwLock::new(HashMap::new()), }
})) })
} }
fn flush(self: &Arc<Self>) -> Result<()> { fn flush(self: &Arc<Self>) -> Result<()> {
@ -375,7 +387,8 @@ impl Tree for SqliteTable {
}; };
Box::pin(async move { Box::pin(async move {
// Tx is never destroyed // Tx will be destroyed if receiver_count = 1,
// and self.watchers is held with write lock
rx.changed().await.unwrap(); rx.changed().await.unwrap();
}) })
} }
@ -390,3 +403,37 @@ impl Tree for SqliteTable {
Ok(()) Ok(())
} }
} }
impl SqliteTable {
pub fn prune_dead_watchers(&self) {
let watchers = self.watchers.read();
let mut interesting = Vec::new();
for (key, (tx, _)) in watchers.iter() {
if tx.receiver_count() == 1 {
// We do read mode first to get all "interesting" keys, not blocking other watch threads.
interesting.push(key.clone())
}
}
drop(watchers);
if !interesting.is_empty() {
let mut watchers = self.watchers.write();
let mut cleared = 0;
for prefix in interesting {
// Test for occupied, there is only the slightest chance this entry has been deleted inbetween then and now.
if let hash_map::Entry::Occupied(o) = watchers.entry(prefix) {
// Check one last time, its possible a receiver is cloned inbetween then and now.
if o.get().0.receiver_count() == 1 {
// Receiver == 1, this is the receiver bundled in the entry, so it's okay to drop this.
o.remove_entry();
cleared += 1;
}
}
}
debug!("Cleared {} dead watchers.", cleared)
}
}
}

Loading…
Cancel
Save