lmdb: immediately set resizing flag, ignore resizing flag while there are more than 0 opened txs to avoid stuck, optimize tx counter for some operations

This commit is contained in:
ardocrat
2026-06-02 01:25:06 +03:00
parent 650df0f3b7
commit d2ead628ee
+68 -71
View File
@@ -389,7 +389,7 @@ impl Store {
/// Wait while database is resizing.
fn wait_for_resize(&self) {
loop {
if !ENV_MAP
if ENV_MAP
.get()
.unwrap()
.read()
@@ -397,11 +397,13 @@ impl Store {
.unwrap()
.resizing
.load(Ordering::Relaxed)
&& self.open_txs_count() == 0
{
break;
debug!("Wait on resizing DB {}", self.env_path);
thread::sleep(Duration::from_millis(100));
continue;
}
trace!("Wait on resizing DB {}", self.env_path);
thread::sleep(Duration::from_millis(100));
break;
}
}
@@ -421,9 +423,16 @@ impl Store {
let env_path = self.env_path.clone();
let env = self.env.clone();
{
let mut w_env_map = ENV_MAP.get().unwrap().write();
let env_state = w_env_map.get_mut(&env_path).unwrap();
env_state.resizing.store(true, Ordering::Relaxed);
}
// Resize immediately or at another thread to not interrupt current
// transaction waiting all open transactions to be closed.
if self.open_txs_count() != 0 {
debug!("Waiting txs to be closed before DB {} resize", env_path);
thread::spawn(move || {
loop {
let txs_count = ENV_MAP
@@ -436,19 +445,9 @@ impl Store {
.load(Ordering::Relaxed);
if txs_count == 0 {
debug!("Start resizing DB {}", env_path);
ENV_MAP
.get()
.unwrap()
.write()
.get_mut(&env_path)
.unwrap()
.resizing
.store(true, Ordering::Relaxed);
// Wait to make sure there are no more active txs left.
thread::sleep(Duration::from_millis(1000));
break;
}
thread::sleep(Duration::from_millis(10));
thread::sleep(Duration::from_millis(100));
}
unsafe {
@@ -465,17 +464,15 @@ impl Store {
});
return;
} else {
let mut w_env_map = ENV_MAP.get().unwrap().write();
let env_state = w_env_map.get_mut(&env_path).unwrap();
debug!("Start immediate resizing DB {}", env_path);
env_state.resizing.store(true, Ordering::Relaxed);
unsafe {
match env.resize(new_size) {
Ok(_) => debug!("End resizing DB {}", env_path),
Err(e) => error!("Resize DB {} error: {:?}", env_path, e),
}
}
let mut w_env_map = ENV_MAP.get().unwrap().write();
let env_state = w_env_map.get_mut(&env_path).unwrap();
env_state.resizing.store(false, Ordering::Relaxed);
}
}
@@ -599,27 +596,27 @@ impl Store {
self.wait_for_resize();
TxCounter::on_change_tx_count(&self.env_path, true);
match self.env.clone().static_read_txn() {
Ok(read) => {
let db_res = self.get_db(db_key);
match db_res {
Ok(db) => Ok(DatabaseIterator::new(
self,
Arc::new(db.clone()),
read,
deserialize,
)),
Err(e) => {
TxCounter::on_change_tx_count(&self.env_path, false);
Err(Error::from(e))
let res = {
match self.env.clone().static_read_txn() {
Ok(read) => {
let db_res = self.get_db(db_key);
match db_res {
Ok(db) => Ok(DatabaseIterator::new(
self,
Arc::new(db.clone()),
read,
deserialize,
)),
Err(e) => Err(Error::from(e)),
}
}
Err(e) => Err(Error::from(e)),
}
Err(e) => {
TxCounter::on_change_tx_count(&self.env_path, false);
Err(Error::from(e))
}
};
if res.is_err() {
TxCounter::on_change_tx_count(&self.env_path, false);
}
res
}
/// Builds a new batch to be used with this store.
@@ -627,13 +624,11 @@ impl Store {
self.maybe_resize();
TxCounter::on_change_tx_count(&self.env_path, true);
match Batch::new(self) {
Ok(batch) => Ok(batch),
Err(e) => {
TxCounter::on_change_tx_count(&self.env_path, false);
Err(e)
}
let res = { Batch::new(self) };
if res.is_err() {
TxCounter::on_change_tx_count(&self.env_path, false);
}
res
}
}
@@ -763,28 +758,27 @@ impl<'a> Batch<'a> {
self.store.wait_for_resize();
TxCounter::on_change_tx_count(&self.store.env_path, true);
let read = self.write.nested_read_txn();
match read {
Ok(read) => {
let db_res = self.store.get_db(db_key);
match db_res {
Ok(db) => Ok(DatabaseIterator::new(
self.store,
Arc::new(db.clone()),
read,
deserialize,
)),
Err(e) => {
TxCounter::on_change_tx_count(&self.store.env_path, false);
Err(Error::from(e))
let res = {
match self.write.nested_read_txn() {
Ok(read) => {
let db_res = self.store.get_db(db_key);
match db_res {
Ok(db) => Ok(DatabaseIterator::new(
self.store,
Arc::new(db.clone()),
read,
deserialize,
)),
Err(e) => Err(Error::from(e)),
}
}
Err(e) => Err(Error::from(e)),
}
Err(e) => {
TxCounter::on_change_tx_count(&self.store.env_path, false);
Err(Error::from(e))
}
};
if res.is_err() {
TxCounter::on_change_tx_count(&self.store.env_path, false);
}
res
}
/// Gets a `Readable` value from the database by provided key and deserialization strategy.
@@ -823,19 +817,22 @@ impl<'a> Batch<'a> {
/// commit, abandoned otherwise.
pub fn child(&mut self) -> Result<Batch<'_>, Error> {
TxCounter::on_change_tx_count(&self.store.env_path, true);
match self.store.env.nested_write_txn(&mut self.write) {
Ok(write) => Ok(Batch {
store: self.store,
write,
tx_counter: TxCounter {
env_path: self.store.env_path.clone(),
},
}),
Err(e) => {
TxCounter::on_change_tx_count(&self.store.env_path, false);
Err(Error::from(e))
let res = {
match self.store.env.nested_write_txn(&mut self.write) {
Ok(write) => Ok(Batch {
store: self.store,
write,
tx_counter: TxCounter {
env_path: self.store.env_path.clone(),
},
}),
Err(e) => Err(Error::from(e)),
}
};
if res.is_err() {
TxCounter::on_change_tx_count(&self.store.env_path, false);
}
res
}
}