lmdb: use atomic open txs and stores count
This commit is contained in:
+36
-10
@@ -18,7 +18,7 @@ use heed::types::Bytes;
|
||||
use heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, WithoutTls};
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::Duration;
|
||||
use std::{fs, thread};
|
||||
@@ -96,10 +96,10 @@ static ENV_MAP: OnceLock<RwLock<HashMap<String, EnvState>>> = OnceLock::new();
|
||||
/// State of active database environment.
|
||||
struct EnvState {
|
||||
env: Env<WithoutTls>,
|
||||
open_txs_count: u32,
|
||||
open_txs_count: AtomicU32,
|
||||
resizing: AtomicBool,
|
||||
resize_checking: AtomicBool,
|
||||
stores_count: u32,
|
||||
stores_count: AtomicU32,
|
||||
}
|
||||
|
||||
/// LMDB-backed store facilitating data access and serialization. All writes
|
||||
@@ -117,7 +117,16 @@ impl Drop for Store {
|
||||
fn drop(&mut self) {
|
||||
{
|
||||
let mut w_map = ENV_MAP.get().unwrap().write();
|
||||
w_map.get_mut(&self.env_path).unwrap().stores_count -= 1;
|
||||
let stores_count = w_map
|
||||
.get(&self.env_path)
|
||||
.unwrap()
|
||||
.stores_count
|
||||
.load(Ordering::Relaxed);
|
||||
w_map
|
||||
.get_mut(&self.env_path)
|
||||
.unwrap()
|
||||
.stores_count
|
||||
.store(stores_count - 1, Ordering::Relaxed);
|
||||
}
|
||||
let no_stores = {
|
||||
ENV_MAP
|
||||
@@ -127,6 +136,7 @@ impl Drop for Store {
|
||||
.get(&self.env_path)
|
||||
.unwrap()
|
||||
.stores_count
|
||||
.load(Ordering::Relaxed)
|
||||
== 0
|
||||
};
|
||||
if no_stores {
|
||||
@@ -194,15 +204,24 @@ impl Store {
|
||||
full_path.clone(),
|
||||
EnvState {
|
||||
env,
|
||||
open_txs_count: 0,
|
||||
open_txs_count: AtomicU32::new(0),
|
||||
resizing: AtomicBool::new(false),
|
||||
resize_checking: AtomicBool::new(false),
|
||||
stores_count: 1,
|
||||
stores_count: AtomicU32::new(1),
|
||||
},
|
||||
);
|
||||
} else {
|
||||
let mut w_env_map = env_map.write();
|
||||
w_env_map.get_mut(&full_path).unwrap().stores_count += 1;
|
||||
let stores_count = w_env_map
|
||||
.get(&full_path)
|
||||
.unwrap()
|
||||
.stores_count
|
||||
.load(Ordering::Relaxed);
|
||||
w_env_map
|
||||
.get_mut(&full_path)
|
||||
.unwrap()
|
||||
.stores_count
|
||||
.store(stores_count + 1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// Database setup.
|
||||
@@ -300,6 +319,7 @@ impl Store {
|
||||
.get(&self.env_path)
|
||||
.unwrap()
|
||||
.open_txs_count
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Check if requirement for environment resize is checking.
|
||||
@@ -372,7 +392,8 @@ impl Store {
|
||||
.read()
|
||||
.get(&env_path)
|
||||
.unwrap()
|
||||
.open_txs_count;
|
||||
.open_txs_count
|
||||
.load(Ordering::Relaxed);
|
||||
if txs_count == 0 {
|
||||
debug!("Start resizing DB {}", env_path);
|
||||
ENV_MAP
|
||||
@@ -564,10 +585,15 @@ impl TxCounter {
|
||||
fn on_change_tx_count(env_path: &String, inc: bool) {
|
||||
let mut w_env_map = ENV_MAP.get().unwrap().write();
|
||||
let env_state = w_env_map.get_mut(env_path).unwrap();
|
||||
let open_txs_count = env_state.open_txs_count.load(Ordering::Relaxed);
|
||||
if inc {
|
||||
env_state.open_txs_count += 1;
|
||||
env_state
|
||||
.open_txs_count
|
||||
.store(open_txs_count + 1, Ordering::Relaxed);
|
||||
} else {
|
||||
env_state.open_txs_count -= 1;
|
||||
env_state
|
||||
.open_txs_count
|
||||
.store(open_txs_count - 1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user