Merge branch 'master' into pibd_peers_fix

This commit is contained in:
ardocrat
2026-04-22 11:08:02 +03:00
6 changed files with 188 additions and 137 deletions
+137 -92
View File
@@ -53,7 +53,6 @@ pub const TESTNET_DNS_SEEDS: &[&str] = &[
pub fn connect_and_monitor(
p2p_server: Arc<p2p::Server>,
seed_list: Box<dyn Fn() -> Vec<PeerAddr> + Send>,
config: P2PConfig,
stop_state: Arc<StopState>,
) -> std::io::Result<thread::JoinHandle<()>> {
@@ -67,7 +66,7 @@ pub fn connect_and_monitor(
let (tx, rx) = mpsc::channel();
// check seeds first
connect_to_seeds_and_peers(peers.clone(), tx.clone(), seed_list, config);
connect_to_seeds_and_peers(peers.clone(), tx.clone(), config);
let mut prev = DateTime::<Utc>::MIN_UTC;
let mut prev_expire_check = DateTime::<Utc>::MIN_UTC;
@@ -130,12 +129,20 @@ pub fn connect_and_monitor(
}
fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sender<PeerAddr>) {
// maintenance step first, clean up p2p server peers
peers.clean_peers(
config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize,
config.clone(),
);
// regularly check if we need to acquire more peers and if so, gets
// them from db
let mut total_count = 0;
let mut healthy_count = 0;
let mut banned_count = 0;
let mut healthy = vec![];
let mut defuncts = vec![];
let mut unknown = vec![];
for x in peers.all_peer_data().into_iter() {
match x.flags {
@@ -154,8 +161,9 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
banned_count += 1;
}
}
p2p::State::Healthy => healthy_count += 1,
p2p::State::Healthy => healthy.push(x),
p2p::State::Defunct => defuncts.push(x),
p2p::State::Unknown => unknown.push(x.addr),
}
total_count += 1;
}
@@ -167,82 +175,112 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
debug!(
"monitor_peers: on {}:{}, {} connected ({} most_work). \
all {} = {} healthy + {} banned + {} defunct",
all {} = {} healthy + {} banned + {} defunct + {} unknown",
config.host,
config.port,
peers_count,
most_work_count,
total_count,
healthy_count,
healthy.len(),
banned_count,
defuncts.len(),
unknown.len()
);
// maintenance step first, clean up p2p server peers
peers.clean_peers(
config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize,
config.clone(),
);
if peers.enough_outbound_peers() {
// Connect to seeds again if there is no peers at database,
// helps to avoid stuck when 1st request to seed list was failed.
if total_count == 0 {
connect_to_seeds_and_peers(peers.clone(), tx.clone(), config);
return;
}
// loop over connected peers that can provide peer lists
// ask them for their list of peers
let mut connected_peers: Vec<PeerAddr> = vec![];
for p in peers
.iter()
.with_capabilities(p2p::Capabilities::PEER_LIST)
.connected()
{
trace!(
"monitor_peers: {}:{} ask {} for more peers",
config.host,
config.port,
p.info.addr,
);
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
connected_peers.push(p.info.addr)
}
// Attempt to connect to any preferred peers.
let peers_preferred = config.peers_preferred.unwrap_or(PeerAddrs::default());
for p in peers_preferred {
if !connected_peers.is_empty() {
if !connected_peers.contains(&p) {
let _ = tx.send(p);
}
} else {
let _ = tx.send(p);
if !peers.enough_outbound_peers() {
// loop over connected peers that can provide peer lists
// ask them for their list of peers
let mut connected_peers: Vec<PeerAddr> = vec![];
for p in peers
.iter()
.with_capabilities(p2p::Capabilities::PEER_LIST)
.connected()
{
trace!(
"monitor_peers: {}:{} ask {} for more peers",
config.host,
config.port,
p.info.addr,
);
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
connected_peers.push(p.info.addr)
}
}
// take a random defunct peer and mark it healthy: over a long enough period any
// peer will see another as defunct eventually, gives us a chance to retry
if let Some(peer) = defuncts.into_iter().choose(&mut thread_rng()) {
let _ = peers.update_state(peer.addr, p2p::State::Healthy);
// Attempt to connect to any preferred peers.
let default_peers = PeerAddrs::default();
let peers_preferred = config.peers_preferred.as_ref().unwrap_or(&default_peers);
for p in peers_preferred.peers.iter() {
if !connected_peers.is_empty() {
if !connected_peers.contains(&p) {
let _ = tx.send(*p);
}
} else {
let _ = tx.send(*p);
}
}
}
// find some peers from our db
// and queue them up for a connection attempt
// intentionally make too many attempts (2x) as some (most?) will fail
// as many nodes in our db are not publicly accessible
let mut new_peers = vec![];
let max_peer_attempts = 128;
let new_peers = peers.find_peers(
p2p::State::Healthy,
p2p::Capabilities::UNKNOWN,
max_peer_attempts as usize,
let max_attempt_delay = Duration::hours(1).num_seconds();
// check maximum 64 random disconnected healthy peers no more often than 1 hour per peer.
for hp in healthy
.iter()
.filter(|p| {
peers.get_connected_peer(p.addr).is_none()
&& Utc::now().timestamp() - p.last_attempt >= max_attempt_delay
})
.choose_multiple(&mut thread_rng(), max_peer_attempts / 2)
{
new_peers.push(&hp.addr);
}
// always check min 32 (max 96, if there are no healthy) random unknown peers received from peer list request.
let req_unk_count = cmp::max(
max_peer_attempts / 2 - new_peers.len() + max_peer_attempts / 4,
max_peer_attempts / 4,
);
for upa in unknown
.iter()
.choose_multiple(&mut thread_rng(), req_unk_count)
{
new_peers.push(upa);
}
debug!(
"monitor_peers: check {} healthy, {} unknown, {} defuncts",
cmp::min(
new_peers.len() as i32,
((new_peers.len() - req_unk_count) as i32).abs()
),
cmp::min(new_peers.len(), req_unk_count),
max_peer_attempts - new_peers.len()
);
// check min 32 (max 128, if there are no healthy and unknown) random defunct peers no more often than 1 hour per peer.
for dp in defuncts
.iter()
.filter(|p| Utc::now().timestamp() - p.last_attempt >= max_attempt_delay)
.choose_multiple(&mut thread_rng(), max_peer_attempts - new_peers.len())
{
new_peers.push(&dp.addr);
}
// Only queue up connection attempts for candidate peers where we
// are confident we do not yet know about this peer.
// The call to is_known() may fail due to contention on the peers map.
// Do not attempt any connection where is_known() fails for any reason.
for p in new_peers {
if let Ok(false) = peers.is_known(p.addr) {
tx.send(p.addr).unwrap();
for pa in new_peers {
if let Ok(false) = peers.is_known(*pa) {
tx.send(*pa).unwrap();
}
}
}
@@ -252,10 +290,10 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
fn connect_to_seeds_and_peers(
peers: Arc<p2p::Peers>,
tx: mpsc::Sender<PeerAddr>,
seed_list: Box<dyn Fn() -> Vec<PeerAddr>>,
config: P2PConfig,
) {
let peers_deny = config.peers_deny.unwrap_or(PeerAddrs::default());
let default_peers = PeerAddrs::default();
let peers_deny = config.peers_deny.as_ref().unwrap_or(&default_peers);
// If "peers_allow" is explicitly configured then just use this list
// remembering to filter out "peers_deny".
@@ -267,7 +305,7 @@ fn connect_to_seeds_and_peers(
}
// Always try our "peers_preferred" remembering to filter out "peers_deny".
if let Some(peers) = config.peers_preferred {
if let Some(peers) = config.peers_preferred.as_ref() {
for addr in peers.difference(peers_deny.as_slice()) {
let _ = tx.send(addr);
}
@@ -275,13 +313,13 @@ fn connect_to_seeds_and_peers(
// check if we have some peers in db
// look for peers that are able to give us other peers (via PEER_LIST capability)
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 100);
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 128);
// if so, get their addresses, otherwise use our seeds
let peer_addrs = if peers.len() > 3 {
peers.iter().map(|p| p.addr).collect::<Vec<_>>()
} else {
seed_list()
seed_list(&config)
};
if peer_addrs.is_empty() {
@@ -307,15 +345,10 @@ fn listen_for_addrs(
) {
// Pull everything currently on the queue off the queue.
// Does not block so addrs may be empty.
// We will take(max_peers) from this later but we want to drain the rx queue
// We will take(max_peers) from this later, but we want to drain the rx queue
// here to prevent it backing up.
let addrs: Vec<PeerAddr> = rx.try_iter().collect();
// If we have a healthy number of outbound peers then we are done here.
if peers.enough_outbound_peers() {
return;
}
// Note: We drained the rx queue earlier to keep it under control.
// Even if there are many addresses to try we will only try a bounded number of them for safety.
let connect_min_interval = 30;
@@ -343,13 +376,15 @@ fn listen_for_addrs(
.name("peer_connect".to_string())
.spawn(move || match p2p_c.connect(addr) {
Ok(p) => {
if peers_c.enough_outbound_peers() {
return;
}
// If peer advertizes PEER_LIST then ask it for more peers that support PEER_LIST.
// We want to build a local db of possible peers to connect to.
// We do not necessarily care (at this point in time) what other capabilities these peers support.
if p.info.capabilities.contains(p2p::Capabilities::PEER_LIST) {
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
}
let _ = peers_c.update_state(addr, p2p::State::Healthy);
}
Err(_) => {
let _ = peers_c.update_state(addr, p2p::State::Defunct);
@@ -373,27 +408,43 @@ fn listen_for_addrs(
}
}
pub fn default_dns_seeds() -> Box<dyn Fn() -> Vec<PeerAddr> + Send> {
Box::new(|| {
let net_seeds = if global::is_testnet() {
TESTNET_DNS_SEEDS
} else {
MAINNET_DNS_SEEDS
};
resolve_dns_to_addrs(
&net_seeds
.iter()
.map(|s| {
s.to_string()
+ if global::is_testnet() {
":13414"
} else {
":3414"
}
})
.collect(),
)
})
fn seed_list(config: &P2PConfig) -> Vec<PeerAddr> {
match config.seeding_type {
p2p::Seeding::None => {
warn!("No seed configured, will stay solo until connected to");
vec![]
}
p2p::Seeding::List => match &config.seeds {
Some(seeds) => seeds.peers.clone(),
None => {
error!("Seeds must be configured for seeding type List");
vec![]
}
},
p2p::Seeding::DNSSeed => default_dns_seeds(),
_ => vec![],
}
}
fn default_dns_seeds() -> Vec<PeerAddr> {
let net_seeds = if global::is_testnet() {
TESTNET_DNS_SEEDS
} else {
MAINNET_DNS_SEEDS
};
resolve_dns_to_addrs(
&net_seeds
.iter()
.map(|s| {
s.to_string()
+ if global::is_testnet() {
":13414"
} else {
":3414"
}
})
.collect(),
)
}
/// Convenience function to resolve dns addresses from DNS records
@@ -414,9 +465,3 @@ pub fn resolve_dns_to_addrs(dns_records: &Vec<String>) -> Vec<PeerAddr> {
debug!("Resolved addresses: {:?}", addresses);
addresses
}
/// Convenience function when the seed list is immediately known. Mostly used
/// for tests.
pub fn predefined_seeds(addrs: Vec<PeerAddr>) -> Box<dyn Fn() -> Vec<PeerAddr> + Send> {
Box::new(move || addrs.clone())
}
-18
View File
@@ -237,26 +237,8 @@ impl Server {
let mut connect_thread = None;
if config.p2p_config.seeding_type != p2p::Seeding::Programmatic {
let seed_list = match config.p2p_config.seeding_type {
p2p::Seeding::None => {
warn!("No seed configured, will stay solo until connected to");
seed::predefined_seeds(vec![])
}
p2p::Seeding::List => match &config.p2p_config.seeds {
Some(seeds) => seed::predefined_seeds(seeds.peers.clone()),
None => {
return Err(Error::Configuration(
"Seeds must be configured for seeding type List".to_owned(),
));
}
},
p2p::Seeding::DNSSeed => seed::default_dns_seeds(),
_ => unreachable!(),
};
connect_thread = Some(seed::connect_and_monitor(
p2p_server.clone(),
seed_list,
config.p2p_config.clone(),
stop_state.clone(),
)?);