|
|
|
@ -10,6 +10,7 @@ use std::{ |
|
|
|
pin::Pin, |
|
|
|
pin::Pin, |
|
|
|
sync::Arc, |
|
|
|
sync::Arc, |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
use thread_local::ThreadLocal; |
|
|
|
use tokio::sync::oneshot::Sender; |
|
|
|
use tokio::sync::oneshot::Sender; |
|
|
|
use tracing::debug; |
|
|
|
use tracing::debug; |
|
|
|
|
|
|
|
|
|
|
|
@ -40,6 +41,8 @@ impl<T> Drop for NonAliasingBox<T> { |
|
|
|
|
|
|
|
|
|
|
|
pub struct Engine { |
|
|
|
pub struct Engine { |
|
|
|
writer: Mutex<Connection>, |
|
|
|
writer: Mutex<Connection>, |
|
|
|
|
|
|
|
read_conn_tls: ThreadLocal<Connection>, |
|
|
|
|
|
|
|
read_iterator_conn_tls: ThreadLocal<Connection>, |
|
|
|
|
|
|
|
|
|
|
|
path: PathBuf, |
|
|
|
path: PathBuf, |
|
|
|
cache_size_per_thread: u32, |
|
|
|
cache_size_per_thread: u32, |
|
|
|
@ -62,34 +65,14 @@ impl Engine { |
|
|
|
self.writer.lock() |
|
|
|
self.writer.lock() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn read_lock(&self) -> &'static Connection { |
|
|
|
fn read_lock<'a>(&'a self) -> &'a Connection { |
|
|
|
READ_CONNECTION.with(|cell| { |
|
|
|
self.read_conn_tls |
|
|
|
let connection = &mut cell.borrow_mut(); |
|
|
|
.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap()) |
|
|
|
|
|
|
|
|
|
|
|
if (*connection).is_none() { |
|
|
|
|
|
|
|
let c = Box::leak(Box::new( |
|
|
|
|
|
|
|
Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap(), |
|
|
|
|
|
|
|
)); |
|
|
|
|
|
|
|
**connection = Some(c); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
connection.unwrap() |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn read_lock_iterator(&self) -> &'static Connection { |
|
|
|
fn read_lock_iterator<'a>(&'a self) -> &'a Connection { |
|
|
|
READ_CONNECTION_ITERATOR.with(|cell| { |
|
|
|
self.read_iterator_conn_tls |
|
|
|
let connection = &mut cell.borrow_mut(); |
|
|
|
.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap()) |
|
|
|
|
|
|
|
|
|
|
|
if (*connection).is_none() { |
|
|
|
|
|
|
|
let c = Box::leak(Box::new( |
|
|
|
|
|
|
|
Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap(), |
|
|
|
|
|
|
|
)); |
|
|
|
|
|
|
|
**connection = Some(c); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
connection.unwrap() |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub fn flush_wal(self: &Arc<Self>) -> Result<()> { |
|
|
|
pub fn flush_wal(self: &Arc<Self>) -> Result<()> { |
|
|
|
@ -105,15 +88,18 @@ impl DatabaseEngine for Engine { |
|
|
|
|
|
|
|
|
|
|
|
// calculates cache-size per permanent connection
|
|
|
|
// calculates cache-size per permanent connection
|
|
|
|
// 1. convert MB to KiB
|
|
|
|
// 1. convert MB to KiB
|
|
|
|
// 2. divide by permanent connections
|
|
|
|
// 2. divide by permanent connections + permanent iter connections + write connection
|
|
|
|
// 3. round down to nearest integer
|
|
|
|
// 3. round down to nearest integer
|
|
|
|
let cache_size_per_thread: u32 = |
|
|
|
let cache_size_per_thread: u32 = ((config.db_cache_capacity_mb * 1024.0) |
|
|
|
((config.db_cache_capacity_mb * 1024.0) / (num_cpus::get().max(1) + 1) as f64) as u32; |
|
|
|
/ ((num_cpus::get().max(1) * 2) + 1) as f64) |
|
|
|
|
|
|
|
as u32; |
|
|
|
|
|
|
|
|
|
|
|
let writer = Mutex::new(Self::prepare_conn(&path, cache_size_per_thread)?); |
|
|
|
let writer = Mutex::new(Self::prepare_conn(&path, cache_size_per_thread)?); |
|
|
|
|
|
|
|
|
|
|
|
let arc = Arc::new(Engine { |
|
|
|
let arc = Arc::new(Engine { |
|
|
|
writer, |
|
|
|
writer, |
|
|
|
|
|
|
|
read_conn_tls: ThreadLocal::new(), |
|
|
|
|
|
|
|
read_iterator_conn_tls: ThreadLocal::new(), |
|
|
|
path, |
|
|
|
path, |
|
|
|
cache_size_per_thread, |
|
|
|
cache_size_per_thread, |
|
|
|
}); |
|
|
|
}); |
|
|
|
|