Merge branch 'pibd_peers_fix' into grim

This commit is contained in:
ardocrat
2026-05-20 20:05:52 +03:00
4 changed files with 52 additions and 78 deletions
+1 -1
View File
@@ -585,7 +585,7 @@ impl Desegmenter {
// Explicitly add segment identifier to request if not exists.
let mut maybe_add_to_request = |seg_id: SegmentTypeIdentifier| {
if return_vec.iter().any(|i| i == &seg_id) {
if !return_vec.iter().any(|i| i == &seg_id) {
if return_vec.len() >= max_elements {
return_vec.pop();
}
-41
View File
@@ -13,7 +13,6 @@
// limitations under the License.
use crate::util::{Mutex, RwLock};
use chrono::Duration;
use lru_cache::LruCache;
use std::fmt;
use std::fs::File;
@@ -52,7 +51,6 @@ const MAX_PEER_MSG_PER_MIN: u64 = 500;
enum State {
Connected,
Banned,
Blocked(DateTime<Utc>, u32),
}
pub struct Peer {
@@ -194,45 +192,6 @@ impl Peer {
State::Banned == *self.state.read()
}
/// Whether this peer has been blocked.
pub fn is_blocked(&self) -> bool {
match *self.state.read() {
State::Blocked(expiry, _) => expiry > Utc::now(),
_ => false,
}
}
/// Set this peer status to blocked.
pub fn set_blocked(&self) {
if self.is_blocked() {
return;
}
let times = {
match *self.state.read() {
State::Blocked(_, times) => times + 1,
_ => 1,
}
};
let duration = match times {
1 => 60, // 1m
2 => 180, // 3m
_ => 600, // 10m
};
let expiry = Utc::now() + Duration::seconds(duration);
*self.state.write() = State::Blocked(expiry, times);
debug!(
"state_sync: block peer {} for {} times: {}",
self.info.addr, duration, times
);
}
/// Unblock blocked peer.
pub fn unblock(&self) {
if self.is_blocked() {
*self.state.write() = State::Connected;
}
}
/// Whether this peer is stuck on sync.
pub fn is_stuck(&self) -> (bool, Difficulty) {
let peer_live_info = self.info.live_info.read();
+45 -27
View File
@@ -45,6 +45,7 @@ pub struct Peers {
pub adapter: Arc<dyn ChainAdapter>,
store: PeerStore,
peers: RwLock<HashMap<PeerAddr, Arc<Peer>>>,
blocked: RwLock<HashMap<PeerAddr, (DateTime<Utc>, u32)>>,
config: P2PConfig,
}
@@ -55,6 +56,7 @@ impl Peers {
store,
config,
peers: RwLock::new(HashMap::new()),
blocked: RwLock::new(HashMap::new()),
}
}
@@ -458,32 +460,55 @@ impl Peers {
}
}
/// Temporary block a peer without banning it.
pub fn block_peer(&self, peer_addr: PeerAddr, reason: &str) -> Result<(), Error> {
let peers = self.peers.try_read_for(LOCK_TIMEOUT).ok_or_else(|| {
error!("block_peer: failed to get peers lock");
Error::PeerException
})?;
match peers.get(&peer_addr) {
None => Err(Error::PeerNotFound),
Some(peer) => {
warn!("blocking peer {} ({})", peer_addr, reason);
peer.set_blocked();
Ok(())
/// Whether this peer has been blocked.
pub fn is_blocked(&self, peer_addr: PeerAddr) -> bool {
match self.blocked.try_read_for(LOCK_TIMEOUT) {
Some(peers) => match peers.get(&peer_addr) {
None => false,
Some((expiry, _)) => expiry > &Utc::now(),
},
None => {
error!("is_blocked: failed to get peers lock");
false
}
}
}
/// Unblock blocked peers.
pub fn unblock_peers(&self) -> Result<(), Error> {
let peers = self.peers.try_read_for(LOCK_TIMEOUT).ok_or_else(|| {
error!("unblock_peers: failed to get peers lock");
/// Temporary block a peer without banning it.
pub fn block_peer(&self, peer_addr: PeerAddr, reason: &str) -> Result<(), Error> {
let mut blocked = self.blocked.try_write_for(LOCK_TIMEOUT).ok_or_else(|| {
error!("block_peer: failed to get blocked lock");
Error::PeerException
})?;
let peers = peers.iter().into_iter();
let _ = peers
.filter(|(_, peer)| peer.is_blocked())
.map(|(_, peer)| peer.unblock());
let times = {
match blocked.get(&peer_addr) {
Some((_, times)) => times + 1,
None => 1,
}
};
let duration = match times {
1 => 60, // 1m
2 => 180, // 3m
_ => 600, // 10m
};
let expiry = Utc::now() + Duration::seconds(duration);
blocked.insert(peer_addr, (expiry, times));
warn!(
"state_sync: block peer {} ({}) for {} times: {}",
peer_addr, reason, duration, times
);
Ok(())
}
/// Unblock all blocked peers.
pub fn unblock_peers(&self) -> Result<(), Error> {
let mut blocked = self.blocked.try_write_for(LOCK_TIMEOUT).ok_or_else(|| {
error!("unblock_peers: failed to get blocked lock");
Error::PeerException
})?;
blocked.clear();
Ok(())
}
@@ -849,13 +874,6 @@ impl<I: Iterator<Item = Arc<Peer>>> PeersIter<I> {
}
}
/// Filter non-blocked peers.
pub fn non_blocked(self) -> PeersIter<impl Iterator<Item = Arc<Peer>>> {
PeersIter {
iter: self.iter.filter(|p| !p.is_blocked()),
}
}
/// Filter peers with the provided difficulty comparison fn.
///
/// with_difficulty(|x| x > diff)
+6 -9
View File
@@ -377,7 +377,10 @@ impl StateSync {
// abort PIBD and fall back to txhashset download
// Waiting a minute helps ensures that the cancellation isn't simply due to a single non-PIBD enabled
// peer having the max difficulty
if available_pibd_peers().count() == 0 {
if available_pibd_peers()
.with_filter(|p| !peers.is_blocked(p.info.addr))
.count() == 0
{
if let None = self.earliest_zero_pibd_peer_time {
self.set_earliest_zero_pibd_peer_time(Some(Utc::now()));
}
@@ -413,21 +416,15 @@ impl StateSync {
.and_then(|(_, addr)| *addr);
let peer = available_pibd_peers()
.outbound()
.non_blocked()
.with_filter(|p| !peers.is_blocked(p.info.addr))
.exclude(excluded_peer)
.choose_random()
.or_else(|| {
available_pibd_peers()
.inbound()
.non_blocked()
.with_filter(|p| !peers.is_blocked(p.info.addr))
.exclude(excluded_peer)
.choose_random()
.or_else(|| {
// Select from blocked if we have no peers (could be network issue).
available_pibd_peers()
.exclude(excluded_peer)
.choose_random()
})
});
trace!("Chosen peer is {:?}", peer);