lmdb: use static reader for iterator, count existing batches for stable resize

This commit is contained in:
ardocrat
2026-04-24 18:13:30 +03:00
parent 70040d2160
commit deb5b49310
3 changed files with 114 additions and 75 deletions
+2 -4
View File
@@ -309,9 +309,7 @@ impl<'a> Batch<'a> {
}
/// Iterator over the output_pos index.
pub fn output_pos_iter(
&self,
) -> Result<impl Iterator<Item = (Vec<u8>, CommitPos)> + 'a, Error> {
pub fn output_pos_iter(&self) -> Result<impl Iterator<Item = (Vec<u8>, CommitPos)>, Error> {
let key = to_key(OUTPUT_POS_PREFIX, "");
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |k, mut v| {
@@ -440,7 +438,7 @@ impl<'a> Batch<'a> {
/// Iterator over raw data for full blocks in the db.
/// Used during block migration (we need flexibility around deserialization).
pub fn blocks_raw_iter(&self) -> Result<impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a, Error> {
pub fn blocks_raw_iter(&self) -> Result<impl Iterator<Item = (Vec<u8>, Vec<u8>)>, Error> {
let key = to_key(BLOCK_PREFIX, "");
self.db.iter(&key, |k, v| Ok((k.to_vec(), v.to_vec())))
}
+1 -1
View File
@@ -184,7 +184,7 @@ pub struct PeersIterBatch<'a> {
impl<'a> PeersIterBatch<'a> {
/// Iterator over all known peers.
pub fn peers_iter(&self) -> Result<impl Iterator<Item = PeerData> + 'a, Error> {
pub fn peers_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
let key = to_key(PEER_PREFIX, "");
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |_, mut v| {
+111 -70
View File
@@ -85,6 +85,8 @@ const DEFAULT_ENV_NAME: &'static str = "lmdb";
/// Mapping of database path to environment.
static ENV_MAP: OnceLock<Arc<RwLock<HashMap<String, Env<WithoutTls>>>>> = OnceLock::new();
/// Mapping of database path to count of active batches to wait before resizing.
static ENV_BATCHES_COUNT: OnceLock<Arc<RwLock<HashMap<String, u32>>>> = OnceLock::new();
/// Mapping of database path to check if database is resizing.
static ENV_RESIZING: OnceLock<Arc<RwLock<HashMap<String, bool>>>> = OnceLock::new();
@@ -146,7 +148,7 @@ impl Store {
}
env_options.open(&full_path)?
};
let (resize, new_size) = Self::needs_resize(&env, alloc_chunk_size);
let (resize, new_size) = needs_resize(&env, alloc_chunk_size);
if resize {
unsafe {
env.resize(new_size)?;
@@ -238,54 +240,6 @@ impl Store {
self.version
}
/// Determines whether the environment needs a resize based on a simple percentage threshold.
pub fn needs_resize(env: &Env<WithoutTls>, alloc_chunk_size: usize) -> (bool, usize) {
let env_info = env.info();
let stat = env.stat();
let size_used = stat.page_size as usize * env_info.last_page_number;
trace!("DB map size: {}", env_info.map_size);
trace!("Space used: {}", size_used);
trace!("Space remaining: {}", env_info.map_size - size_used);
let resize_percent = RESIZE_PERCENT;
trace!(
"Percent used: {:.*} Percent threshold: {:.*}",
4,
size_used as f64 / env_info.map_size as f64,
4,
resize_percent
);
let resize = if size_used as f32 / env_info.map_size as f32 > resize_percent
|| env_info.map_size < alloc_chunk_size
{
trace!("Resize threshold met (percent-based)");
true
} else {
trace!("Resize threshold not met (percent-based)");
false
};
let new_size = if resize {
if env_info.map_size < alloc_chunk_size {
alloc_chunk_size
} else {
let mut tot = env_info.map_size - (env_info.map_size % alloc_chunk_size);
while size_used as f32 / tot as f32 > RESIZE_MIN_TARGET_PERCENT {
tot += alloc_chunk_size;
}
tot
}
} else {
env_info.map_size
};
if resize {
debug!("Resizing DB to {} from {}", new_size, env_info.map_size);
}
(resize, new_size)
}
/// Gets a value from the db, provided its key.
/// Deserializes the retrieved data using the provided function.
fn get_with<F, T>(&self, key: &[u8], read: &RoTxn, deserialize: F) -> Result<Option<T>, Error>
@@ -326,16 +280,12 @@ impl Store {
}
/// Produces an iterator from the provided key prefix.
pub fn iter<F, T>(
&'_ self,
prefix: &[u8],
deserialize: F,
) -> Result<PrefixIterator<'_, F, T>, Error>
pub fn iter<F, T>(&self, prefix: &[u8], deserialize: F) -> Result<PrefixIterator<F, T>, Error>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
self.wait_for_resize();
let read = self.env.read_txn()?;
let read = self.env.clone().static_read_txn()?;
Ok(PrefixIterator::new(
self.db.clone(),
read,
@@ -356,14 +306,14 @@ impl Store {
break;
}
debug!("Wait on {}, resizing DB", self.name);
thread::sleep(Duration::from_millis(500));
thread::sleep(Duration::from_millis(100));
}
}
/// Resize database environment if needed.
fn maybe_resize(&self) -> Result<(), Error> {
self.wait_for_resize();
let (resize, new_size) = Self::needs_resize(&self.env, self.alloc_chunk_size);
let (resize, new_size) = needs_resize(&self.env, self.alloc_chunk_size);
if resize {
let res_map = ENV_RESIZING.get().unwrap();
{
@@ -372,7 +322,14 @@ impl Store {
}
debug!("Start resizing {} DB", self.name);
unsafe {
thread::sleep(Duration::from_millis(2000));
let batches_count =
ENV_BATCHES_COUNT.get_or_init(|| Arc::new(RwLock::new(HashMap::new())));
let batches = batches_count.write();
let cur = batches.get(&self.env_path).unwrap_or(&0);
debug!("Wait {} batches to complete", cur);
while cur != &0 {
thread::sleep(Duration::from_millis(100));
}
self.env.resize(new_size)?;
}
{
@@ -387,21 +344,41 @@ impl Store {
/// Builds a new batch to be used with this store.
pub fn batch(&self) -> Result<Batch<'_>, Error> {
self.maybe_resize()?;
on_change_batches_count(&self.env_path, true);
Ok(Batch::new(self)?)
}
}
/// Batches counter to decrement value on drop.
struct BatchesCounter<'a> {
env_path: &'a String,
}
impl Drop for BatchesCounter<'_> {
fn drop(&mut self) {
on_change_batches_count(&self.env_path, false);
}
}
/// Batch to write multiple Writeables to db in an atomic manner.
pub struct Batch<'a> {
store: &'a Store,
write: RwTxn<'a>,
#[allow(dead_code)]
counter: BatchesCounter<'a>,
}
impl<'a> Batch<'a> {
/// Creates a new batch for provided db.
pub fn new(store: &'a Store) -> Result<Batch<'a>, Error> {
let write = store.env.write_txn()?;
Ok(Batch { store, write })
Ok(Batch {
store,
write,
counter: BatchesCounter {
env_path: &store.env_path,
},
})
}
/// Writes a single key/value pair to the db.
@@ -455,11 +432,7 @@ impl<'a> Batch<'a> {
}
/// Produces an iterator from the provided key prefix.
pub fn iter<F, T>(
&self,
prefix: &[u8],
deserialize: F,
) -> Result<PrefixIterator<'a, F, T>, Error>
pub fn iter<F, T>(&self, prefix: &[u8], deserialize: F) -> Result<PrefixIterator<F, T>, Error>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
@@ -500,28 +473,32 @@ impl<'a> Batch<'a> {
/// commit, abandoned otherwise.
pub fn child(&mut self) -> Result<Batch<'_>, Error> {
self.store.maybe_resize()?;
on_change_batches_count(&self.store.env_path, true);
let write = self.store.env.nested_write_txn(&mut self.write)?;
Ok(Batch {
store: self.store,
write,
counter: BatchesCounter {
env_path: &self.store.env_path,
},
})
}
}
/// An iterator based on key prefix.
/// Caller is responsible for deserialization of the data.
pub struct PrefixIterator<'a, F, T>
pub struct PrefixIterator<F, T>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
db: Arc<Database<Bytes, Bytes>>,
read: Arc<RoTxn<'a, WithoutTls>>,
read: Arc<RoTxn<'static, WithoutTls>>,
keys: Vec<Vec<u8>>,
skip: usize,
deserialize: F,
}
impl<'a, F, T> Iterator for PrefixIterator<'a, F, T>
impl<F, T> Iterator for PrefixIterator<F, T>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
@@ -544,17 +521,17 @@ where
}
}
impl<'a, F, T> PrefixIterator<'a, F, T>
impl<F, T> PrefixIterator<F, T>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
/// Initialize a new prefix iterator.
pub fn new(
db: Arc<Database<Bytes, Bytes>>,
read: RoTxn<'a, WithoutTls>,
read: RoTxn<'static, WithoutTls>,
prefix: &[u8],
deserialize: F,
) -> PrefixIterator<'a, F, T> {
) -> PrefixIterator<F, T> {
let keys = if let Ok(iter) = db.prefix_iter(&read, &prefix) {
iter.move_between_keys()
.filter(|kv| kv.is_ok())
@@ -572,3 +549,67 @@ where
}
}
}
/// Determines whether the environment needs a resize based on a simple percentage threshold.
pub fn needs_resize(env: &Env<WithoutTls>, alloc_chunk_size: usize) -> (bool, usize) {
let env_info = env.info();
let stat = env.stat();
let size_used = stat.page_size as usize * env_info.last_page_number;
trace!("DB map size: {}", env_info.map_size);
trace!("Space used: {}", size_used);
trace!("Space remaining: {}", env_info.map_size - size_used);
let resize_percent = RESIZE_PERCENT;
trace!(
"Percent used: {:.*} Percent threshold: {:.*}",
4,
size_used as f64 / env_info.map_size as f64,
4,
resize_percent
);
let resize = if size_used as f32 / env_info.map_size as f32 > resize_percent
|| env_info.map_size < alloc_chunk_size
{
trace!("Resize threshold met (percent-based)");
true
} else {
trace!("Resize threshold not met (percent-based)");
false
};
let new_size = if resize {
if env_info.map_size < alloc_chunk_size {
alloc_chunk_size
} else {
let mut tot = env_info.map_size - (env_info.map_size % alloc_chunk_size);
while size_used as f32 / tot as f32 > RESIZE_MIN_TARGET_PERCENT {
tot += alloc_chunk_size;
}
tot
}
} else {
env_info.map_size
};
if resize {
debug!("Resizing DB to {} from {}", new_size, env_info.map_size);
}
(resize, new_size)
}
/// Increment or decrement active batches count for current environment.
fn on_change_batches_count(env_path: &String, inc: bool) {
let batches_count = ENV_BATCHES_COUNT.get_or_init(|| Arc::new(RwLock::new(HashMap::new())));
let mut w_batches = batches_count.write();
let batches = w_batches.clone();
let count = {
let cur = batches.get(env_path).unwrap_or(&0);
if inc {
cur + 1
} else {
cur - 1
}
};
w_batches.insert(env_path.clone(), count);
}