peer: unknown state for new peers, check peers state on every monitor (128 healthy non-connected + 128 defuncts + 128 unknown), mark peer as defunct when ping not passed, do not crash on toml parse with dns failure

This commit is contained in:
ardocrat
2026-04-04 01:58:35 +03:00
parent af0c1dca02
commit ab9715d9a9
4 changed files with 97 additions and 73 deletions
+25 -12
View File
@@ -142,6 +142,7 @@ impl Peers {
}
false
}
/// Ban a peer, disconnecting it if we're currently connected
pub fn ban_peer(&self, peer_addr: PeerAddr, ban_reason: ReasonForBan) -> Result<(), Error> {
// Update the peer in peers db
@@ -178,6 +179,13 @@ impl Peers {
}
}
pub fn is_healthy(&self, peer_addr: PeerAddr) -> bool {
if let Ok(peer) = self.store.get_peer(peer_addr) {
return peer.flags == State::Healthy;
}
false
}
fn broadcast<F>(&self, obj_name: &str, inner: F) -> u32
where
F: Fn(&Peer) -> Result<bool, Error>,
@@ -261,6 +269,8 @@ impl Peers {
break;
}
};
// Mark peer as defunct after ping failure.
let _ = self.update_state(p.info.addr, State::Defunct);
p.stop();
peers.remove(&p.info.addr);
}
@@ -702,21 +712,24 @@ impl NetAdapter for Peers {
trace!("Received {} peer addrs, saving.", peer_addrs.len());
let mut to_save: Vec<PeerData> = Vec::new();
for pa in peer_addrs {
if let Ok(e) = self.exists_peer(pa) {
if e {
if let Ok(mut p) = self.get_peer(pa) {
if self.is_healthy(pa) || self.is_banned(pa) {
continue;
}
p.flags = State::Unknown;
to_save.push(p);
} else {
let peer = PeerData {
addr: pa,
capabilities: Capabilities::UNKNOWN,
user_agent: "".to_string(),
flags: State::Unknown,
last_banned: 0,
ban_reason: ReasonForBan::None,
last_connected: 0,
};
to_save.push(peer);
}
let peer = PeerData {
addr: pa,
capabilities: Capabilities::UNKNOWN,
user_agent: "".to_string(),
flags: State::Healthy,
last_banned: 0,
ban_reason: ReasonForBan::None,
last_connected: Utc::now().timestamp(),
};
to_save.push(peer);
}
if let Err(e) = self.save_peers(to_save) {
error!("Could not save received peer addresses: {:?}", e);
+2 -1
View File
@@ -27,13 +27,14 @@ const STORE_SUBPATH: &str = "peers";
const PEER_PREFIX: u8 = b'P';
// Types of messages
// Types of peers
enum_from_primitive! {
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum State {
Healthy = 0,
Banned = 1,
Defunct = 2,
Unknown = 3,
}
}
+7 -4
View File
@@ -183,10 +183,13 @@ impl<'de> Visitor<'de> for PeerAddrs {
Ok(ip) => peers.push(PeerAddr(ip)),
// If that fails it's probably a DNS record
Err(_) => {
let socket_addrs = entry.to_socket_addrs().map_err(|_| {
serde::de::Error::custom(format!("Unable to resolve DNS: {}", entry))
})?;
peers.append(&mut socket_addrs.map(PeerAddr).collect());
let socket_addrs: Result<std::vec::IntoIter<SocketAddr>, M::Error> =
entry.to_socket_addrs().map_err(|_| {
serde::de::Error::custom(format!("Unable to resolve DNS: {}", entry))
});
if let Ok(socket_addrs) = socket_addrs {
peers.append(&mut socket_addrs.map(PeerAddr).collect());
}
}
}
}
+63 -56
View File
@@ -133,9 +133,10 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
// regularly check if we need to acquire more peers and if so, gets
// them from db
let mut total_count = 0;
let mut healthy_count = 0;
let mut banned_count = 0;
let mut healthy = vec![];
let mut defuncts = vec![];
let mut unknown = vec![];
for x in peers.all_peer_data().into_iter() {
match x.flags {
@@ -154,8 +155,9 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
banned_count += 1;
}
}
p2p::State::Healthy => healthy_count += 1,
p2p::State::Defunct => defuncts.push(x),
p2p::State::Healthy => healthy.push(x.addr),
p2p::State::Defunct => defuncts.push(x.addr),
p2p::State::Unknown => unknown.push(x.addr),
}
total_count += 1;
}
@@ -167,15 +169,16 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
debug!(
"monitor_peers: on {}:{}, {} connected ({} most_work). \
all {} = {} healthy + {} banned + {} defunct",
all {} = {} healthy + {} banned + {} defunct + {} unknown",
config.host,
config.port,
peers_count,
most_work_count,
total_count,
healthy_count,
healthy.len(),
banned_count,
defuncts.len(),
unknown.len()
);
// maintenance step first, clean up p2p server peers
@@ -185,64 +188,74 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
config.clone(),
);
if peers.enough_outbound_peers() {
return;
}
if !peers.enough_outbound_peers() {
// loop over connected peers that can provide peer lists
// ask them for their list of peers
let mut connected_peers: Vec<PeerAddr> = vec![];
for p in peers
.iter()
.with_capabilities(p2p::Capabilities::PEER_LIST)
.connected()
{
trace!(
"monitor_peers: {}:{} ask {} for more peers",
config.host,
config.port,
p.info.addr,
);
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
connected_peers.push(p.info.addr)
}
// loop over connected peers that can provide peer lists
// ask them for their list of peers
let mut connected_peers: Vec<PeerAddr> = vec![];
for p in peers
.iter()
.with_capabilities(p2p::Capabilities::PEER_LIST)
.connected()
{
trace!(
"monitor_peers: {}:{} ask {} for more peers",
config.host,
config.port,
p.info.addr,
);
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
connected_peers.push(p.info.addr)
}
// Attempt to connect to any preferred peers.
let peers_preferred = config.peers_preferred.unwrap_or(PeerAddrs::default());
for p in peers_preferred {
if !connected_peers.is_empty() {
if !connected_peers.contains(&p) {
// Attempt to connect to any preferred peers.
let peers_preferred = config.peers_preferred.unwrap_or(PeerAddrs::default());
for p in peers_preferred {
if !connected_peers.is_empty() {
if !connected_peers.contains(&p) {
let _ = tx.send(p);
}
} else {
let _ = tx.send(p);
}
} else {
let _ = tx.send(p);
}
}
// take a random defunct peer and mark it healthy: over a long enough period any
// peer will see another as defunct eventually, gives us a chance to retry
if let Some(peer) = defuncts.into_iter().choose(&mut thread_rng()) {
let _ = peers.update_state(peer.addr, p2p::State::Healthy);
}
// find some peers from our db
// and queue them up for a connection attempt
// intentionally make too many attempts (2x) as some (most?) will fail
// as many nodes in our db are not publicly accessible
let mut new_peers = vec![];
let max_peer_attempts = 128;
let new_peers = peers.find_peers(
p2p::State::Healthy,
p2p::Capabilities::UNKNOWN,
max_peer_attempts as usize,
);
// check random disconnected healthy peers.
for hpa in healthy
.iter()
.filter(|p| peers.get_connected_peer(**p).is_none())
.choose_multiple(&mut thread_rng(), max_peer_attempts)
{
new_peers.push(hpa);
}
// check random unknown peers received from peer list request.
for upa in unknown
.iter()
.choose_multiple(&mut thread_rng(), max_peer_attempts)
{
new_peers.push(upa);
}
// always check random defunct peers.
for dpa in defuncts
.iter()
.choose_multiple(&mut thread_rng(), max_peer_attempts)
{
new_peers.push(dpa);
}
// Only queue up connection attempts for candidate peers where we
// are confident we do not yet know about this peer.
// The call to is_known() may fail due to contention on the peers map.
// Do not attempt any connection where is_known() fails for any reason.
for p in new_peers {
if let Ok(false) = peers.is_known(p.addr) {
tx.send(p.addr).unwrap();
for pa in new_peers {
if let Ok(false) = peers.is_known(*pa) {
tx.send(*pa).unwrap();
}
}
}
@@ -275,7 +288,7 @@ fn connect_to_seeds_and_peers(
// check if we have some peers in db
// look for peers that are able to give us other peers (via PEER_LIST capability)
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 100);
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 128);
// if so, get their addresses, otherwise use our seeds
let peer_addrs = if peers.len() > 3 {
@@ -307,19 +320,14 @@ fn listen_for_addrs(
) {
// Pull everything currently on the queue off the queue.
// Does not block so addrs may be empty.
// We will take(max_peers) from this later but we want to drain the rx queue
// We will take(max_peers) from this later, but we want to drain the rx queue
// here to prevent it backing up.
let addrs: Vec<PeerAddr> = rx.try_iter().collect();
// If we have a healthy number of outbound peers then we are done here.
if peers.enough_outbound_peers() {
return;
}
// Note: We drained the rx queue earlier to keep it under control.
// Even if there are many addresses to try we will only try a bounded number of them for safety.
let connect_min_interval = 30;
let max_outbound_attempts = 128;
let max_outbound_attempts = 128 * 3;
for addr in addrs.into_iter().take(max_outbound_attempts) {
// ignore the duplicate connecting to same peer within 30 seconds
let now = Utc::now();
@@ -349,7 +357,6 @@ fn listen_for_addrs(
if p.info.capabilities.contains(p2p::Capabilities::PEER_LIST) {
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
}
let _ = peers_c.update_state(addr, p2p::State::Healthy);
}
Err(_) => {
let _ = peers_c.update_state(addr, p2p::State::Defunct);