diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index c922156..17dbf66 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -16,6 +16,7 @@ pub struct RocksDbEngineTree<'a> { db: Arc, name: &'a str, watchers: RwLock, Vec>>>, + write_lock: RwLock<()> } impl DatabaseEngine for Engine { @@ -77,6 +78,7 @@ impl DatabaseEngine for Engine { name, db: Arc::clone(self), watchers: RwLock::new(HashMap::new()), + write_lock: RwLock::new(()), })) } @@ -120,7 +122,13 @@ impl Tree for RocksDbEngineTree<'_> { } } - Ok(self.db.rocks.put_cf(self.cf(), key, value)?) + let lock = self.write_lock.read().unwrap(); + + let result = self.db.rocks.put_cf(self.cf(), key, value)?; + + drop(lock); + + Ok(result) } fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { @@ -168,20 +176,27 @@ impl Tree for RocksDbEngineTree<'_> { } fn increment(&self, key: &[u8]) -> Result> { - // TODO: make atomic + let lock = self.write_lock.write().unwrap(); + let old = self.db.rocks.get_cf(self.cf(), &key)?; let new = utils::increment(old.as_deref()).unwrap(); self.db.rocks.put_cf(self.cf(), key, &new)?; + + drop(lock); Ok(new) } fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { + let lock = self.write_lock.write().unwrap(); + for key in iter { let old = self.db.rocks.get_cf(self.cf(), &key)?; let new = utils::increment(old.as_deref()).unwrap(); self.db.rocks.put_cf(self.cf(), key, new)?; } + drop(lock); + Ok(()) }