More reliable peers check (#3824)

* peer: unknown state for new peers, check peers state on every monitor (128 healthy non-connected + 128 defuncts + 128 unknown), mark peer as defunct when ping not passed, do not crash on toml parse with dns failure

* p2p: cleanup before selection at monitor, add outbound to connected list only when there is not enough peers + disconnect extra peer immediately, reconnect to seeds at monitor to avoid stuck, update only defunct state to unknown when received existing peer address

* p2p: reduced amount of total peers to check at monitor

* p2p: do not check healthy and defunct peers more often than once per hour, store last connection attempt, do not ask for more peers when there is enough outbound

* peer: update last_attempt when changing peer state to other than Banned

* fix: log of peers amount to check
This commit is contained in:
ardocrat
2026-04-09 18:55:22 +03:00
committed by GitHub
parent af0c1dca02
commit 90dab5fcc6
6 changed files with 188 additions and 137 deletions
+26 -14
View File
@@ -61,6 +61,7 @@ impl Peers {
/// Adds the peer to our internal peer mapping. Note that the peer is still
/// returned so the server can run it.
pub fn add_connected(&self, peer: Arc<Peer>) -> Result<(), Error> {
let enough_outbound = self.enough_outbound_peers();
let peer_data: PeerData;
{
// Scope for peers vector lock - dont hold the peers lock while adding to lmdb
@@ -76,9 +77,12 @@ impl Peers {
last_banned: 0,
ban_reason: ReasonForBan::None,
last_connected: Utc::now().timestamp(),
last_attempt: Utc::now().timestamp(),
};
debug!("Adding newly connected peer {}.", peer_data.addr);
peers.insert(peer_data.addr, peer);
if !enough_outbound || !peer.info.is_outbound() {
debug!("Adding newly connected peer {}.", peer_data.addr);
peers.insert(peer_data.addr, peer);
}
}
debug!("Saving newly connected peer {}.", peer_data.addr);
if let Err(e) = self.save_peer(&peer_data) {
@@ -98,6 +102,7 @@ impl Peers {
last_banned: Utc::now().timestamp(),
ban_reason,
last_connected: Utc::now().timestamp(),
last_attempt: Utc::now().timestamp(),
};
debug!("Banning peer {}.", addr);
self.save_peer(&peer_data)
@@ -142,6 +147,7 @@ impl Peers {
}
false
}
/// Ban a peer, disconnecting it if we're currently connected
pub fn ban_peer(&self, peer_addr: PeerAddr, ban_reason: ReasonForBan) -> Result<(), Error> {
// Update the peer in peers db
@@ -261,6 +267,8 @@ impl Peers {
break;
}
};
// Mark peer as defunct after ping failure.
let _ = self.update_state(p.info.addr, State::Defunct);
p.stop();
peers.remove(&p.info.addr);
}
@@ -702,21 +710,25 @@ impl NetAdapter for Peers {
trace!("Received {} peer addrs, saving.", peer_addrs.len());
let mut to_save: Vec<PeerData> = Vec::new();
for pa in peer_addrs {
if let Ok(e) = self.exists_peer(pa) {
if e {
if let Ok(mut p) = self.get_peer(pa) {
if p.flags != State::Defunct {
continue;
}
p.flags = State::Unknown;
to_save.push(p);
} else {
let peer = PeerData {
addr: pa,
capabilities: Capabilities::UNKNOWN,
user_agent: "".to_string(),
flags: State::Unknown,
last_banned: 0,
ban_reason: ReasonForBan::None,
last_connected: 0,
last_attempt: 0,
};
to_save.push(peer);
}
let peer = PeerData {
addr: pa,
capabilities: Capabilities::UNKNOWN,
user_agent: "".to_string(),
flags: State::Healthy,
last_banned: 0,
ban_reason: ReasonForBan::None,
last_connected: Utc::now().timestamp(),
};
to_save.push(peer);
}
if let Err(e) = self.save_peers(to_save) {
error!("Could not save received peer addresses: {:?}", e);
+3
View File
@@ -187,6 +187,9 @@ impl Server {
&self.handshake,
self.peers.clone(),
)?;
if self.peers.enough_outbound_peers() {
peer.stop();
}
let peer = Arc::new(peer);
self.peers.add_connected(peer.clone())?;
Ok(peer)
+15 -9
View File
@@ -27,13 +27,14 @@ const STORE_SUBPATH: &str = "peers";
const PEER_PREFIX: u8 = b'P';
// Types of messages
// Types of peers
enum_from_primitive! {
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum State {
Healthy = 0,
Banned = 1,
Defunct = 2,
Unknown = 3,
}
}
@@ -55,6 +56,8 @@ pub struct PeerData {
pub ban_reason: ReasonForBan,
/// Time when we last connected to this peer.
pub last_connected: i64,
/// Time when last connection attempt happened to this peer.
pub last_attempt: i64,
}
impl Writeable for PeerData {
@@ -67,7 +70,8 @@ impl Writeable for PeerData {
[write_u8, self.flags as u8],
[write_i64, self.last_banned],
[write_i32, self.ban_reason as i32],
[write_i64, self.last_connected]
[write_i64, self.last_connected],
[write_i64, self.last_attempt]
);
Ok(())
}
@@ -81,12 +85,10 @@ impl Readable for PeerData {
let (fl, lb, br) = ser_multiread!(reader, read_u8, read_i64, read_i32);
let lc = reader.read_i64();
// this only works because each PeerData is read in its own vector and this
// is the last data element
let last_connected = match lc {
Err(_) => Utc::now().timestamp(),
Ok(lc) => lc,
};
let last_connected = lc.unwrap_or_else(|_| Utc::now().timestamp());
let la = reader.read_i64();
let last_attempt = la.unwrap_or_else(|_| 0);
let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?;
let capabilities = Capabilities::from_bits_truncate(capab);
@@ -97,10 +99,11 @@ impl Readable for PeerData {
addr,
capabilities,
user_agent,
flags: flags,
flags,
last_banned: lb,
ban_reason,
last_connected,
last_attempt,
}),
None => Err(ser::Error::CorruptedData),
}
@@ -187,6 +190,7 @@ impl PeerStore {
/// Convenience method to load a peer data, update its status and save it
/// back. If new state is Banned its last banned time will be updated too.
/// If new state is Defunct last connection attempt will be updated too.
pub fn update_state(&self, peer_addr: PeerAddr, new_state: State) -> Result<(), Error> {
let batch = self.db.batch()?;
@@ -197,6 +201,8 @@ impl PeerStore {
peer.flags = new_state;
if new_state == State::Banned {
peer.last_banned = Utc::now().timestamp();
} else {
peer.last_attempt = Utc::now().timestamp();
}
batch.put_ser(&peer_key(peer_addr)[..], &peer)?;
+7 -4
View File
@@ -183,10 +183,13 @@ impl<'de> Visitor<'de> for PeerAddrs {
Ok(ip) => peers.push(PeerAddr(ip)),
// If that fails it's probably a DNS record
Err(_) => {
let socket_addrs = entry.to_socket_addrs().map_err(|_| {
serde::de::Error::custom(format!("Unable to resolve DNS: {}", entry))
})?;
peers.append(&mut socket_addrs.map(PeerAddr).collect());
let socket_addrs: Result<std::vec::IntoIter<SocketAddr>, M::Error> =
entry.to_socket_addrs().map_err(|_| {
serde::de::Error::custom(format!("Unable to resolve DNS: {}", entry))
});
if let Ok(socket_addrs) = socket_addrs {
peers.append(&mut socket_addrs.map(PeerAddr).collect());
}
}
}
}
+137 -92
View File
@@ -53,7 +53,6 @@ pub const TESTNET_DNS_SEEDS: &[&str] = &[
pub fn connect_and_monitor(
p2p_server: Arc<p2p::Server>,
seed_list: Box<dyn Fn() -> Vec<PeerAddr> + Send>,
config: P2PConfig,
stop_state: Arc<StopState>,
) -> std::io::Result<thread::JoinHandle<()>> {
@@ -67,7 +66,7 @@ pub fn connect_and_monitor(
let (tx, rx) = mpsc::channel();
// check seeds first
connect_to_seeds_and_peers(peers.clone(), tx.clone(), seed_list, config);
connect_to_seeds_and_peers(peers.clone(), tx.clone(), config);
let mut prev = DateTime::<Utc>::MIN_UTC;
let mut prev_expire_check = DateTime::<Utc>::MIN_UTC;
@@ -130,12 +129,20 @@ pub fn connect_and_monitor(
}
fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sender<PeerAddr>) {
// maintenance step first, clean up p2p server peers
peers.clean_peers(
config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize,
config.clone(),
);
// regularly check if we need to acquire more peers and if so, gets
// them from db
let mut total_count = 0;
let mut healthy_count = 0;
let mut banned_count = 0;
let mut healthy = vec![];
let mut defuncts = vec![];
let mut unknown = vec![];
for x in peers.all_peer_data().into_iter() {
match x.flags {
@@ -154,8 +161,9 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
banned_count += 1;
}
}
p2p::State::Healthy => healthy_count += 1,
p2p::State::Healthy => healthy.push(x),
p2p::State::Defunct => defuncts.push(x),
p2p::State::Unknown => unknown.push(x.addr),
}
total_count += 1;
}
@@ -167,82 +175,112 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
debug!(
"monitor_peers: on {}:{}, {} connected ({} most_work). \
all {} = {} healthy + {} banned + {} defunct",
all {} = {} healthy + {} banned + {} defunct + {} unknown",
config.host,
config.port,
peers_count,
most_work_count,
total_count,
healthy_count,
healthy.len(),
banned_count,
defuncts.len(),
unknown.len()
);
// maintenance step first, clean up p2p server peers
peers.clean_peers(
config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize,
config.clone(),
);
if peers.enough_outbound_peers() {
// Connect to seeds again if there is no peers at database,
// helps to avoid stuck when 1st request to seed list was failed.
if total_count == 0 {
connect_to_seeds_and_peers(peers.clone(), tx.clone(), config);
return;
}
// loop over connected peers that can provide peer lists
// ask them for their list of peers
let mut connected_peers: Vec<PeerAddr> = vec![];
for p in peers
.iter()
.with_capabilities(p2p::Capabilities::PEER_LIST)
.connected()
{
trace!(
"monitor_peers: {}:{} ask {} for more peers",
config.host,
config.port,
p.info.addr,
);
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
connected_peers.push(p.info.addr)
}
// Attempt to connect to any preferred peers.
let peers_preferred = config.peers_preferred.unwrap_or(PeerAddrs::default());
for p in peers_preferred {
if !connected_peers.is_empty() {
if !connected_peers.contains(&p) {
let _ = tx.send(p);
}
} else {
let _ = tx.send(p);
if !peers.enough_outbound_peers() {
// loop over connected peers that can provide peer lists
// ask them for their list of peers
let mut connected_peers: Vec<PeerAddr> = vec![];
for p in peers
.iter()
.with_capabilities(p2p::Capabilities::PEER_LIST)
.connected()
{
trace!(
"monitor_peers: {}:{} ask {} for more peers",
config.host,
config.port,
p.info.addr,
);
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
connected_peers.push(p.info.addr)
}
}
// take a random defunct peer and mark it healthy: over a long enough period any
// peer will see another as defunct eventually, gives us a chance to retry
if let Some(peer) = defuncts.into_iter().choose(&mut thread_rng()) {
let _ = peers.update_state(peer.addr, p2p::State::Healthy);
// Attempt to connect to any preferred peers.
let default_peers = PeerAddrs::default();
let peers_preferred = config.peers_preferred.as_ref().unwrap_or(&default_peers);
for p in peers_preferred.peers.iter() {
if !connected_peers.is_empty() {
if !connected_peers.contains(&p) {
let _ = tx.send(*p);
}
} else {
let _ = tx.send(*p);
}
}
}
// find some peers from our db
// and queue them up for a connection attempt
// intentionally make too many attempts (2x) as some (most?) will fail
// as many nodes in our db are not publicly accessible
let mut new_peers = vec![];
let max_peer_attempts = 128;
let new_peers = peers.find_peers(
p2p::State::Healthy,
p2p::Capabilities::UNKNOWN,
max_peer_attempts as usize,
let max_attempt_delay = Duration::hours(1).num_seconds();
// check maximum 64 random disconnected healthy peers no more often than 1 hour per peer.
for hp in healthy
.iter()
.filter(|p| {
peers.get_connected_peer(p.addr).is_none()
&& Utc::now().timestamp() - p.last_attempt >= max_attempt_delay
})
.choose_multiple(&mut thread_rng(), max_peer_attempts / 2)
{
new_peers.push(&hp.addr);
}
// always check min 32 (max 96, if there are no healthy) random unknown peers received from peer list request.
let req_unk_count = cmp::max(
max_peer_attempts / 2 - new_peers.len() + max_peer_attempts / 4,
max_peer_attempts / 4,
);
for upa in unknown
.iter()
.choose_multiple(&mut thread_rng(), req_unk_count)
{
new_peers.push(upa);
}
debug!(
"monitor_peers: check {} healthy, {} unknown, {} defuncts",
cmp::min(
new_peers.len() as i32,
((new_peers.len() - req_unk_count) as i32).abs()
),
cmp::min(new_peers.len(), req_unk_count),
max_peer_attempts - new_peers.len()
);
// check min 32 (max 128, if there are no healthy and unknown) random defunct peers no more often than 1 hour per peer.
for dp in defuncts
.iter()
.filter(|p| Utc::now().timestamp() - p.last_attempt >= max_attempt_delay)
.choose_multiple(&mut thread_rng(), max_peer_attempts - new_peers.len())
{
new_peers.push(&dp.addr);
}
// Only queue up connection attempts for candidate peers where we
// are confident we do not yet know about this peer.
// The call to is_known() may fail due to contention on the peers map.
// Do not attempt any connection where is_known() fails for any reason.
for p in new_peers {
if let Ok(false) = peers.is_known(p.addr) {
tx.send(p.addr).unwrap();
for pa in new_peers {
if let Ok(false) = peers.is_known(*pa) {
tx.send(*pa).unwrap();
}
}
}
@@ -252,10 +290,10 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
fn connect_to_seeds_and_peers(
peers: Arc<p2p::Peers>,
tx: mpsc::Sender<PeerAddr>,
seed_list: Box<dyn Fn() -> Vec<PeerAddr>>,
config: P2PConfig,
) {
let peers_deny = config.peers_deny.unwrap_or(PeerAddrs::default());
let default_peers = PeerAddrs::default();
let peers_deny = config.peers_deny.as_ref().unwrap_or(&default_peers);
// If "peers_allow" is explicitly configured then just use this list
// remembering to filter out "peers_deny".
@@ -267,7 +305,7 @@ fn connect_to_seeds_and_peers(
}
// Always try our "peers_preferred" remembering to filter out "peers_deny".
if let Some(peers) = config.peers_preferred {
if let Some(peers) = config.peers_preferred.as_ref() {
for addr in peers.difference(peers_deny.as_slice()) {
let _ = tx.send(addr);
}
@@ -275,13 +313,13 @@ fn connect_to_seeds_and_peers(
// check if we have some peers in db
// look for peers that are able to give us other peers (via PEER_LIST capability)
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 100);
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 128);
// if so, get their addresses, otherwise use our seeds
let peer_addrs = if peers.len() > 3 {
peers.iter().map(|p| p.addr).collect::<Vec<_>>()
} else {
seed_list()
seed_list(&config)
};
if peer_addrs.is_empty() {
@@ -307,15 +345,10 @@ fn listen_for_addrs(
) {
// Pull everything currently on the queue off the queue.
// Does not block so addrs may be empty.
// We will take(max_peers) from this later but we want to drain the rx queue
// We will take(max_peers) from this later, but we want to drain the rx queue
// here to prevent it backing up.
let addrs: Vec<PeerAddr> = rx.try_iter().collect();
// If we have a healthy number of outbound peers then we are done here.
if peers.enough_outbound_peers() {
return;
}
// Note: We drained the rx queue earlier to keep it under control.
// Even if there are many addresses to try we will only try a bounded number of them for safety.
let connect_min_interval = 30;
@@ -343,13 +376,15 @@ fn listen_for_addrs(
.name("peer_connect".to_string())
.spawn(move || match p2p_c.connect(addr) {
Ok(p) => {
if peers_c.enough_outbound_peers() {
return;
}
// If peer advertizes PEER_LIST then ask it for more peers that support PEER_LIST.
// We want to build a local db of possible peers to connect to.
// We do not necessarily care (at this point in time) what other capabilities these peers support.
if p.info.capabilities.contains(p2p::Capabilities::PEER_LIST) {
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
}
let _ = peers_c.update_state(addr, p2p::State::Healthy);
}
Err(_) => {
let _ = peers_c.update_state(addr, p2p::State::Defunct);
@@ -373,27 +408,43 @@ fn listen_for_addrs(
}
}
pub fn default_dns_seeds() -> Box<dyn Fn() -> Vec<PeerAddr> + Send> {
Box::new(|| {
let net_seeds = if global::is_testnet() {
TESTNET_DNS_SEEDS
} else {
MAINNET_DNS_SEEDS
};
resolve_dns_to_addrs(
&net_seeds
.iter()
.map(|s| {
s.to_string()
+ if global::is_testnet() {
":13414"
} else {
":3414"
}
})
.collect(),
)
})
fn seed_list(config: &P2PConfig) -> Vec<PeerAddr> {
match config.seeding_type {
p2p::Seeding::None => {
warn!("No seed configured, will stay solo until connected to");
vec![]
}
p2p::Seeding::List => match &config.seeds {
Some(seeds) => seeds.peers.clone(),
None => {
error!("Seeds must be configured for seeding type List");
vec![]
}
},
p2p::Seeding::DNSSeed => default_dns_seeds(),
_ => vec![],
}
}
fn default_dns_seeds() -> Vec<PeerAddr> {
let net_seeds = if global::is_testnet() {
TESTNET_DNS_SEEDS
} else {
MAINNET_DNS_SEEDS
};
resolve_dns_to_addrs(
&net_seeds
.iter()
.map(|s| {
s.to_string()
+ if global::is_testnet() {
":13414"
} else {
":3414"
}
})
.collect(),
)
}
/// Convenience function to resolve dns addresses from DNS records
@@ -414,9 +465,3 @@ pub fn resolve_dns_to_addrs(dns_records: &Vec<String>) -> Vec<PeerAddr> {
debug!("Resolved addresses: {:?}", addresses);
addresses
}
/// Convenience function when the seed list is immediately known. Mostly used
/// for tests.
pub fn predefined_seeds(addrs: Vec<PeerAddr>) -> Box<dyn Fn() -> Vec<PeerAddr> + Send> {
Box::new(move || addrs.clone())
}
-18
View File
@@ -237,26 +237,8 @@ impl Server {
let mut connect_thread = None;
if config.p2p_config.seeding_type != p2p::Seeding::Programmatic {
let seed_list = match config.p2p_config.seeding_type {
p2p::Seeding::None => {
warn!("No seed configured, will stay solo until connected to");
seed::predefined_seeds(vec![])
}
p2p::Seeding::List => match &config.p2p_config.seeds {
Some(seeds) => seed::predefined_seeds(seeds.peers.clone()),
None => {
return Err(Error::Configuration(
"Seeds must be configured for seeding type List".to_owned(),
));
}
},
p2p::Seeding::DNSSeed => seed::default_dns_seeds(),
_ => unreachable!(),
};
connect_thread = Some(seed::connect_and_monitor(
p2p_server.clone(),
seed_list,
config.p2p_config.clone(),
stop_state.clone(),
)?);