Merge branch 'lmdb_update' into grim

This commit is contained in:
ardocrat
2026-05-26 16:30:10 +03:00
7 changed files with 100 additions and 47 deletions
+10 -5
View File
@@ -1293,19 +1293,24 @@ impl Chain {
return Ok(()); return Ok(());
} }
let mut count = 0;
let tail_hash = header_pmmr.get_header_hash_by_height(head.height - horizon)?; let tail_hash = header_pmmr.get_header_hash_by_height(head.height - horizon)?;
let tail = batch.get_block_header(&tail_hash)?; let tail = batch.get_block_header(&tail_hash)?;
// Remove old blocks (including short lived fork blocks) which height < tail.height // Remove old blocks (including short-lived fork blocks) which height < tail.height
for block in batch.blocks_iter()? { let mut blocks_to_delete = vec![];
let iter = batch.blocks_iter()?;
for block in iter {
if let Ok(block) = block { if let Ok(block) = block {
if block.header.height < tail.height { if block.header.height < tail.height {
let _ = batch.delete_block(&block.hash()); blocks_to_delete.push(block.hash());
count += 1;
} }
} }
} }
let mut count = 0;
for bh in blocks_to_delete {
let _ = batch.delete_block(&bh);
count += 1;
}
batch.save_body_tail(&Tip::from_header(&tail))?; batch.save_body_tail(&Tip::from_header(&tail))?;
+20 -6
View File
@@ -405,22 +405,36 @@ impl<T: PosEntry> RewindableListIndex for MultiIndex<T> {
impl<T: PosEntry> PruneableListIndex for MultiIndex<T> { impl<T: PosEntry> PruneableListIndex for MultiIndex<T> {
fn clear(&self, batch: &mut Batch<'_>) -> Result<(), Error> { fn clear(&self, batch: &mut Batch<'_>) -> Result<(), Error> {
let mut list_count = 0; let mut lists_to_delete = vec![];
let mut entry_count = 0;
let list_db_key = Some(self.list_prefix); let list_db_key = Some(self.list_prefix);
for key in batch.db.iter(list_db_key, |k, _| Ok(k.to_vec()))? { for key in batch.db.iter(list_db_key, |k, _| Ok(k.to_vec()))? {
if let Ok(key) = key { if let Ok(key) = key {
let _ = batch.delete(list_db_key, &key); lists_to_delete.push(key);
list_count += 1;
} }
} }
let mut list_count = 0;
for l in lists_to_delete {
match batch.delete(list_db_key, &l) {
Ok(_) => list_count += 1,
Err(_) => {}
}
}
let mut entries_to_delete = vec![];
let entry_db_key = Some(self.entry_prefix); let entry_db_key = Some(self.entry_prefix);
for key in batch.db.iter(entry_db_key, |k, _| Ok(k.to_vec()))? { for key in batch.db.iter(entry_db_key, |k, _| Ok(k.to_vec()))? {
if let Ok(key) = key { if let Ok(key) = key {
let _ = batch.delete(entry_db_key, &key); entries_to_delete.push(key);
entry_count += 1;
} }
} }
let mut entry_count = 0;
for e in entries_to_delete {
match batch.delete(entry_db_key, &e) {
Ok(_) => entry_count += 1,
Err(_) => {}
}
}
debug!( debug!(
"clear: lists deleted: {}, entries deleted: {}", "clear: lists deleted: {}, entries deleted: {}",
list_count, entry_count list_count, entry_count
+5 -5
View File
@@ -340,8 +340,8 @@ impl<'a> Batch<'a> {
/// Iterator over the output_pos index. /// Iterator over the output_pos index.
pub fn output_pos_iter( pub fn output_pos_iter(
&self, &'a self,
) -> Result<impl Iterator<Item = Result<(Vec<u8>, CommitPos), Error>>, Error> { ) -> Result<impl Iterator<Item = Result<(Vec<u8>, CommitPos), Error>> + 'a, Error> {
let protocol_version = self.db.protocol_version(); let protocol_version = self.db.protocol_version();
self.db.iter(Some(OUTPUT_POS_PREFIX), move |k, mut v| { self.db.iter(Some(OUTPUT_POS_PREFIX), move |k, mut v| {
ser::deserialize(&mut v, protocol_version, DeserializationMode::default()) ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
@@ -461,7 +461,7 @@ impl<'a> Batch<'a> {
/// Iterator over all full blocks in the db. /// Iterator over all full blocks in the db.
/// Uses default db serialization strategy via db protocol version. /// Uses default db serialization strategy via db protocol version.
pub fn blocks_iter(&self) -> Result<impl Iterator<Item = Result<Block, Error>> + 'a, Error> { pub fn blocks_iter(&'a self) -> Result<impl Iterator<Item = Result<Block, Error>> + 'a, Error> {
let protocol_version = self.db.protocol_version(); let protocol_version = self.db.protocol_version();
self.db.iter(Some(BLOCK_PREFIX), move |_, mut v| { self.db.iter(Some(BLOCK_PREFIX), move |_, mut v| {
ser::deserialize(&mut v, protocol_version, DeserializationMode::default()) ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
@@ -472,8 +472,8 @@ impl<'a> Batch<'a> {
/// Iterator over raw data for full blocks in the db. /// Iterator over raw data for full blocks in the db.
/// Used during block migration (we need flexibility around deserialization). /// Used during block migration (we need flexibility around deserialization).
pub fn blocks_raw_iter( pub fn blocks_raw_iter(
&self, &'a self,
) -> Result<impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), Error>>, Error> { ) -> Result<impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), Error>> + 'a, Error> {
self.db self.db
.iter(Some(BLOCK_PREFIX), |k, v| Ok((k.to_vec(), v.to_vec()))) .iter(Some(BLOCK_PREFIX), |k, v| Ok((k.to_vec(), v.to_vec())))
} }
+7 -3
View File
@@ -643,7 +643,7 @@ impl TxHashSet {
// Iterate over the current output_pos index, removing any entries that // Iterate over the current output_pos index, removing any entries that
// do not point to to the expected output. // do not point to to the expected output.
let mut removed_count = 0; let mut pos_to_delete = vec![];
for kp in batch.output_pos_iter()? { for kp in batch.output_pos_iter()? {
if let Ok((key, pos1)) = kp { if let Ok((key, pos1)) = kp {
let pos0 = pos1.pos - 1; let pos0 = pos1.pos - 1;
@@ -658,10 +658,14 @@ impl TxHashSet {
} }
} }
} }
batch.delete(Some(store::OUTPUT_POS_PREFIX), &key)?; pos_to_delete.push(key);
removed_count += 1;
} }
} }
let mut removed_count = 0;
for p in pos_to_delete {
batch.delete(Some(store::OUTPUT_POS_PREFIX), &p)?;
removed_count += 1;
}
debug!( debug!(
"init_output_pos_index: removed {} stale index entries", "init_output_pos_index: removed {} stale index entries",
removed_count removed_count
+3 -1
View File
@@ -195,7 +195,9 @@ pub struct PeersIterBatch<'a> {
impl<'a> PeersIterBatch<'a> { impl<'a> PeersIterBatch<'a> {
/// Iterator over all known peers. /// Iterator over all known peers.
pub fn peers_iter(&self) -> Result<impl Iterator<Item = Result<PeerData, Error>>, Error> { pub fn peers_iter(
&'a self,
) -> Result<impl Iterator<Item = Result<PeerData, Error>> + 'a, Error> {
let protocol_version = self.db.protocol_version(); let protocol_version = self.db.protocol_version();
self.db.iter(Some(PEER_PREFIX), move |_, mut v| { self.db.iter(Some(PEER_PREFIX), move |_, mut v| {
ser::deserialize(&mut v, protocol_version, DeserializationMode::default()) ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
+50 -21
View File
@@ -252,9 +252,15 @@ impl Store {
let migrate_from = Path::new(root_path).join(env_name); let migrate_from = Path::new(root_path).join(env_name);
if migrate_from.exists() { if migrate_from.exists() {
match s.migrate_to_default_env(db_name, &migrate_from) { match s.migrate_to_default_env(db_name, &migrate_from) {
Ok(_) => { Ok(_) => match fs::remove_dir_all(&migrate_from) {
let _ = fs::remove_dir_all(&migrate_from); Ok(_) => {}
} Err(e) => {
return Err(Error::FileErr(format!(
"Can not remove old DB file: {:?}",
e
)));
}
},
Err(e) => { Err(e) => {
error!("DB {} migration error: {:?}", env_name, e); error!("DB {} migration error: {:?}", env_name, e);
match s.clear() { match s.clear() {
@@ -310,7 +316,7 @@ impl Store {
db.put(&mut write_to, key, &v)?; db.put(&mut write_to, key, &v)?;
count += 1; count += 1;
} else { } else {
error!("Migration: unknown DB key: {}", db_name[0]); warn!("Migration: unknown DB key: {}", db_name[0]);
} }
} else { } else {
self.def_db.put(&mut write_to, k, &v)?; self.def_db.put(&mut write_to, k, &v)?;
@@ -560,11 +566,11 @@ impl Store {
} }
/// Produces an iterator from the provided database key. /// Produces an iterator from the provided database key.
pub fn iter<F, T>( pub fn iter<'a, F, T>(
&self, &self,
db_key: Option<u8>, db_key: Option<u8>,
deserialize: F, deserialize: F,
) -> Result<DatabaseIterator<F, T>, Error> ) -> Result<DatabaseIterator<'a, F, T>, Error>
where where
F: Fn(&[u8], &[u8]) -> Result<T, Error>, F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{ {
@@ -662,7 +668,8 @@ impl<'a> Batch<'a> {
/// Writes a single key/value pair to the provided database key. /// Writes a single key/value pair to the provided database key.
pub fn put(&mut self, db_key: Option<u8>, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn put(&mut self, db_key: Option<u8>, key: &[u8], value: &[u8]) -> Result<(), Error> {
let db = self.store.get_db(db_key)?; let db = self.store.get_db(db_key)?;
db.put(&mut self.write, key, value)?; let w = &mut self.write;
db.put(w, key, value)?;
Ok(()) Ok(())
} }
@@ -724,14 +731,38 @@ impl<'a> Batch<'a> {
/// Produces an iterator from the provided database key. /// Produces an iterator from the provided database key.
pub fn iter<F, T>( pub fn iter<F, T>(
&self, &'a self,
db_key: Option<u8>, db_key: Option<u8>,
deserialize: F, deserialize: F,
) -> Result<DatabaseIterator<F, T>, Error> ) -> Result<DatabaseIterator<'a, F, T>, Error>
where where
F: Fn(&[u8], &[u8]) -> Result<T, Error>, F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{ {
self.store.iter(db_key, deserialize) self.store.wait_for_resize();
TxCounter::on_change_tx_count(&self.store.env_path, true);
let read = self.write.nested_read_txn();
match read {
Ok(read) => {
let db_res = self.store.get_db(db_key);
match db_res {
Ok(db) => Ok(DatabaseIterator::new(
self.store,
Arc::new(db.clone()),
read,
deserialize,
)),
Err(e) => {
TxCounter::on_change_tx_count(&self.store.env_path, false);
Err(Error::from(e))
}
}
}
Err(e) => {
TxCounter::on_change_tx_count(&self.store.env_path, false);
Err(Error::from(e))
}
}
} }
/// Gets a `Readable` value from the database by provided key and deserialization strategy. /// Gets a `Readable` value from the database by provided key and deserialization strategy.
@@ -788,12 +819,12 @@ impl<'a> Batch<'a> {
/// An iterator based on database key. /// An iterator based on database key.
/// Caller is responsible for deserialization of the data. /// Caller is responsible for deserialization of the data.
pub struct DatabaseIterator<F, T> pub struct DatabaseIterator<'a, F, T>
where where
F: Fn(&[u8], &[u8]) -> Result<T, Error>, F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{ {
db: Arc<Database<Bytes, Bytes>>, db: Arc<Database<Bytes, Bytes>>,
read: Arc<RoTxn<'static, WithoutTls>>, read: Arc<RoTxn<'a, WithoutTls>>,
keys: Vec<Vec<u8>>, keys: Vec<Vec<u8>>,
total_keys: usize, total_keys: usize,
skip_cur: usize, skip_cur: usize,
@@ -803,7 +834,7 @@ where
tx_counter: TxCounter, tx_counter: TxCounter,
} }
impl<F, T> Iterator for DatabaseIterator<F, T> impl<F, T> Iterator for DatabaseIterator<'_, F, T>
where where
F: Fn(&[u8], &[u8]) -> Result<T, Error>, F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{ {
@@ -811,15 +842,13 @@ where
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if let Some(k) = self.keys.iter().skip(self.skip_cur).next() { if let Some(k) = self.keys.iter().skip(self.skip_cur).next() {
self.skip_total += 1;
self.skip_cur += 1;
match self.db.get(&self.read, k) { match self.db.get(&self.read, k) {
Ok(v) => { Ok(v) => {
if let Some(v) = v { if let Some(v) = v {
return match (self.deserialize)(k, v) { return match (self.deserialize)(k, v) {
Ok(v) => { Ok(v) => Some(Ok(v)),
self.skip_total += 1;
self.skip_cur += 1;
Some(Ok(v))
}
Err(e) => { Err(e) => {
error!("db iter: error deserializing: {}", e); error!("db iter: error deserializing: {}", e);
Some(Err(Error::from(e))) Some(Err(Error::from(e)))
@@ -853,7 +882,7 @@ where
} }
} }
impl<F, T> DatabaseIterator<F, T> impl<'a, F, T> DatabaseIterator<'a, F, T>
where where
F: Fn(&[u8], &[u8]) -> Result<T, Error>, F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{ {
@@ -861,9 +890,9 @@ where
pub fn new( pub fn new(
store: &Store, store: &Store,
db: Arc<Database<Bytes, Bytes>>, db: Arc<Database<Bytes, Bytes>>,
read: RoTxn<'static, WithoutTls>, read: RoTxn<'a, WithoutTls>,
deserialize: F, deserialize: F,
) -> DatabaseIterator<F, T> { ) -> DatabaseIterator<'a, F, T> {
let (keys, total_keys) = if let Ok(iter) = db.iter(&read) { let (keys, total_keys) = if let Ok(iter) = db.iter(&read) {
let total = iter.move_between_keys().count(); let total = iter.move_between_keys().count();
let keys = if let Ok(iter) = db.iter(&read) { let keys = if let Ok(iter) = db.iter(&read) {
+5 -6
View File
@@ -118,13 +118,12 @@ fn test_iter() -> Result<(), store::Error> {
let mut batch = store.batch()?; let mut batch = store.batch()?;
batch.put(Some(prefix), &key, &value)?; batch.put(Some(prefix), &key, &value)?;
// TODO - This is not currently possible (and we need to be aware of this).
// Currently our SerIterator is limited to using a ReadTransaction only.
//
// Check we can see the new entry via an iterator using the uncommitted batch. // Check we can see the new entry via an iterator using the uncommitted batch.
// let mut iter: SerIterator<Vec<u8>> = batch.iter(&[0])?; {
// assert_eq!(iter.next(), Some((key.to_vec(), value.to_vec()))); let mut iter = batch.iter(Some(prefix), |_, v| Ok(v.to_vec()))?;
// assert_eq!(iter.next(), None); assert_eq!(iter.next(), Some(Ok(value.to_vec())));
assert_eq!(iter.next(), None);
}
// Check we can not yet see the new entry via an iterator outside the uncommitted batch. // Check we can not yet see the new entry via an iterator outside the uncommitted batch.
let mut iter = store.iter(Some(prefix), |_, v| Ok(v.to_vec()))?; let mut iter = store.iter(Some(prefix), |_, v| Ok(v.to_vec()))?;