lmdb: handle errors at iterator
This commit is contained in:
+5
-3
@@ -1299,9 +1299,11 @@ impl Chain {
|
||||
|
||||
// Remove old blocks (including short lived fork blocks) which height < tail.height
|
||||
for block in batch.blocks_iter()? {
|
||||
if block.header.height < tail.height {
|
||||
let _ = batch.delete_block(&block.hash());
|
||||
count += 1;
|
||||
if let Ok(block) = block {
|
||||
if block.header.height < tail.height {
|
||||
let _ = batch.delete_block(&block.hash());
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -409,13 +409,17 @@ impl<T: PosEntry> PruneableListIndex for MultiIndex<T> {
|
||||
let mut entry_count = 0;
|
||||
let list_db_key = Some(self.list_prefix);
|
||||
for key in batch.db.iter(list_db_key, |k, _| Ok(k.to_vec()))? {
|
||||
let _ = batch.delete(list_db_key, &key);
|
||||
list_count += 1;
|
||||
if let Ok(key) = key {
|
||||
let _ = batch.delete(list_db_key, &key);
|
||||
list_count += 1;
|
||||
}
|
||||
}
|
||||
let entry_db_key = Some(self.entry_prefix);
|
||||
for key in batch.db.iter(entry_db_key, |k, _| Ok(k.to_vec()))? {
|
||||
let _ = batch.delete(entry_db_key, &key);
|
||||
entry_count += 1;
|
||||
if let Ok(key) = key {
|
||||
let _ = batch.delete(entry_db_key, &key);
|
||||
entry_count += 1;
|
||||
}
|
||||
}
|
||||
debug!(
|
||||
"clear: lists deleted: {}, entries deleted: {}",
|
||||
|
||||
+7
-3
@@ -339,7 +339,9 @@ impl<'a> Batch<'a> {
|
||||
}
|
||||
|
||||
/// Iterator over the output_pos index.
|
||||
pub fn output_pos_iter(&self) -> Result<impl Iterator<Item = (Vec<u8>, CommitPos)>, Error> {
|
||||
pub fn output_pos_iter(
|
||||
&self,
|
||||
) -> Result<impl Iterator<Item = Result<(Vec<u8>, CommitPos), Error>>, Error> {
|
||||
let protocol_version = self.db.protocol_version();
|
||||
self.db.iter(Some(OUTPUT_POS_PREFIX), move |k, mut v| {
|
||||
ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
|
||||
@@ -459,7 +461,7 @@ impl<'a> Batch<'a> {
|
||||
|
||||
/// Iterator over all full blocks in the db.
|
||||
/// Uses default db serialization strategy via db protocol version.
|
||||
pub fn blocks_iter(&self) -> Result<impl Iterator<Item = Block> + 'a, Error> {
|
||||
pub fn blocks_iter(&self) -> Result<impl Iterator<Item = Result<Block, Error>> + 'a, Error> {
|
||||
let protocol_version = self.db.protocol_version();
|
||||
self.db.iter(Some(BLOCK_PREFIX), move |_, mut v| {
|
||||
ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
|
||||
@@ -469,7 +471,9 @@ impl<'a> Batch<'a> {
|
||||
|
||||
/// Iterator over raw data for full blocks in the db.
|
||||
/// Used during block migration (we need flexibility around deserialization).
|
||||
pub fn blocks_raw_iter(&self) -> Result<impl Iterator<Item = (Vec<u8>, Vec<u8>)>, Error> {
|
||||
pub fn blocks_raw_iter(
|
||||
&self,
|
||||
) -> Result<impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), Error>>, Error> {
|
||||
self.db
|
||||
.iter(Some(BLOCK_PREFIX), |k, v| Ok((k.to_vec(), v.to_vec())))
|
||||
}
|
||||
|
||||
@@ -644,21 +644,23 @@ impl TxHashSet {
|
||||
// Iterate over the current output_pos index, removing any entries that
|
||||
// do not point to to the expected output.
|
||||
let mut removed_count = 0;
|
||||
for (key, pos1) in batch.output_pos_iter()? {
|
||||
let pos0 = pos1.pos - 1;
|
||||
if let Some(out) = output_pmmr.get_data(pos0) {
|
||||
if let Ok(pos0_via_mmr) = batch.get_output_pos(&out.commitment()) {
|
||||
// If the pos matches and the index key matches the commitment
|
||||
// then keep the entry, other we want to clean it up.
|
||||
if pos0 == pos0_via_mmr
|
||||
&& batch.is_match_output_pos_key(&key, &out.commitment())
|
||||
{
|
||||
continue;
|
||||
for kp in batch.output_pos_iter()? {
|
||||
if let Ok((key, pos1)) = kp {
|
||||
let pos0 = pos1.pos - 1;
|
||||
if let Some(out) = output_pmmr.get_data(pos0) {
|
||||
if let Ok(pos0_via_mmr) = batch.get_output_pos(&out.commitment()) {
|
||||
// If the pos matches and the index key matches the commitment
|
||||
// then keep the entry, other we want to clean it up.
|
||||
if pos0 == pos0_via_mmr
|
||||
&& batch.is_match_output_pos_key(&key, &out.commitment())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
batch.delete(Some(store::OUTPUT_POS_PREFIX), &key)?;
|
||||
removed_count += 1;
|
||||
}
|
||||
batch.delete(Some(store::OUTPUT_POS_PREFIX), &key)?;
|
||||
removed_count += 1;
|
||||
}
|
||||
debug!(
|
||||
"init_output_pos_index: removed {} stale index entries",
|
||||
|
||||
+4
-1
@@ -279,7 +279,10 @@ impl Peers {
|
||||
pub fn all_peer_data(&self) -> Vec<PeerData> {
|
||||
match self.store.iter_batch() {
|
||||
Ok(batch) => match batch.peers_iter() {
|
||||
Ok(iter) => iter.collect(),
|
||||
Ok(iter) => iter
|
||||
.filter(|p| p.is_ok())
|
||||
.map(|p| p.ok().unwrap())
|
||||
.collect(),
|
||||
Err(e) => {
|
||||
error!("failed to get all peer data: {:?}", e);
|
||||
vec![]
|
||||
|
||||
+12
-4
@@ -195,7 +195,7 @@ pub struct PeersIterBatch<'a> {
|
||||
|
||||
impl<'a> PeersIterBatch<'a> {
|
||||
/// Iterator over all known peers.
|
||||
pub fn peers_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
|
||||
pub fn peers_iter(&self) -> Result<impl Iterator<Item = Result<PeerData, Error>>, Error> {
|
||||
let protocol_version = self.db.protocol_version();
|
||||
self.db.iter(Some(PEER_PREFIX), move |_, mut v| {
|
||||
ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
|
||||
@@ -212,6 +212,8 @@ impl<'a> PeersIterBatch<'a> {
|
||||
) -> Result<Vec<PeerData>, Error> {
|
||||
let peers = self
|
||||
.peers_iter()?
|
||||
.filter(|p| p.is_ok())
|
||||
.map(|p| p.ok().unwrap())
|
||||
.filter(|p| p.flags == state && p.capabilities.contains(cap))
|
||||
.choose_multiple(&mut thread_rng(), count);
|
||||
Ok(peers)
|
||||
@@ -220,7 +222,11 @@ impl<'a> PeersIterBatch<'a> {
|
||||
/// List all known peers
|
||||
/// Used for /v1/peers/all api endpoint
|
||||
pub fn all_peers(&self) -> Result<Vec<PeerData>, Error> {
|
||||
let peers: Vec<PeerData> = self.peers_iter()?.collect();
|
||||
let peers: Vec<PeerData> = self
|
||||
.peers_iter()?
|
||||
.filter(|p| p.is_ok())
|
||||
.map(|p| p.ok().unwrap())
|
||||
.collect();
|
||||
Ok(peers)
|
||||
}
|
||||
|
||||
@@ -232,8 +238,10 @@ impl<'a> PeersIterBatch<'a> {
|
||||
let mut to_remove = vec![];
|
||||
|
||||
for x in self.peers_iter()? {
|
||||
if predicate(&x) {
|
||||
to_remove.push(x)
|
||||
if let Ok(x) = x {
|
||||
if predicate(&x) {
|
||||
to_remove.push(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+22
-24
@@ -770,35 +770,33 @@ impl<F, T> Iterator for DatabaseIterator<F, T>
|
||||
where
|
||||
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
|
||||
{
|
||||
type Item = T;
|
||||
type Item = Result<T, Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if let Some(k) = self.keys.iter().skip(self.skip).next() {
|
||||
let v = self.db.get(&self.read, k).unwrap_or(None);
|
||||
if let Some(v) = v {
|
||||
return match (self.deserialize)(k, v) {
|
||||
Ok(v) => {
|
||||
self.skip += 1;
|
||||
self.skip_keys += 1;
|
||||
Some(v)
|
||||
match self.db.get(&self.read, k) {
|
||||
Ok(v) => {
|
||||
if let Some(v) = v {
|
||||
return match (self.deserialize)(k, v) {
|
||||
Ok(v) => {
|
||||
self.skip += 1;
|
||||
self.skip_keys += 1;
|
||||
Some(Ok(v))
|
||||
}
|
||||
Err(e) => {
|
||||
error!("db iter: error deserializing: {}", e);
|
||||
Some(Err(Error::from(e)))
|
||||
}
|
||||
};
|
||||
}
|
||||
Err(_) => None,
|
||||
};
|
||||
}
|
||||
Err(e) => {
|
||||
return {
|
||||
error!("db iter: error read value: {}", e);
|
||||
Some(Err(Error::from(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if self.total_keys > self.skip_keys {
|
||||
let keys = if let Ok(iter) = self.db.iter(&self.read) {
|
||||
iter.move_between_keys()
|
||||
.skip(self.skip_keys)
|
||||
.take(10000)
|
||||
.filter(|kv| kv.is_ok())
|
||||
.map(|kv| kv.unwrap().0.to_vec())
|
||||
.collect::<Vec<Vec<u8>>>()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
self.skip = 0;
|
||||
self.keys = keys;
|
||||
return self.next();
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user