Merge remote-tracking branch 'grin_ardocrat/peers_fix' into grim

This commit is contained in:
ardocrat
2026-04-06 02:27:21 +03:00
6 changed files with 154 additions and 132 deletions
+23 -14
View File
@@ -61,6 +61,7 @@ impl Peers {
/// Adds the peer to our internal peer mapping. Note that the peer is still
/// returned so the server can run it.
pub fn add_connected(&self, peer: Arc<Peer>) -> Result<(), Error> {
let enough_outbound = self.enough_outbound_peers();
let peer_data: PeerData;
{
// Scope for peers vector lock - dont hold the peers lock while adding to lmdb
@@ -77,8 +78,10 @@ impl Peers {
ban_reason: ReasonForBan::None,
last_connected: Utc::now().timestamp(),
};
debug!("Adding newly connected peer {}.", peer_data.addr);
peers.insert(peer_data.addr, peer);
if !enough_outbound || !peer.info.is_outbound() {
debug!("Adding newly connected peer {}.", peer_data.addr);
peers.insert(peer_data.addr, peer);
}
}
debug!("Saving newly connected peer {}.", peer_data.addr);
if let Err(e) = self.save_peer(&peer_data) {
@@ -142,6 +145,7 @@ impl Peers {
}
false
}
/// Ban a peer, disconnecting it if we're currently connected
pub fn ban_peer(&self, peer_addr: PeerAddr, ban_reason: ReasonForBan) -> Result<(), Error> {
// Update the peer in peers db
@@ -261,6 +265,8 @@ impl Peers {
break;
}
};
// Mark peer as defunct after ping failure.
let _ = self.update_state(p.info.addr, State::Defunct);
p.stop();
peers.remove(&p.info.addr);
}
@@ -747,21 +753,24 @@ impl NetAdapter for Peers {
trace!("Received {} peer addrs, saving.", peer_addrs.len());
let mut to_save: Vec<PeerData> = Vec::new();
for pa in peer_addrs {
if let Ok(e) = self.exists_peer(pa) {
if e {
if let Ok(mut p) = self.get_peer(pa) {
if p.flags != State::Defunct {
continue;
}
p.flags = State::Unknown;
to_save.push(p);
} else {
let peer = PeerData {
addr: pa,
capabilities: Capabilities::UNKNOWN,
user_agent: "".to_string(),
flags: State::Unknown,
last_banned: 0,
ban_reason: ReasonForBan::None,
last_connected: 0,
};
to_save.push(peer);
}
let peer = PeerData {
addr: pa,
capabilities: Capabilities::UNKNOWN,
user_agent: "".to_string(),
flags: State::Healthy,
last_banned: 0,
ban_reason: ReasonForBan::None,
last_connected: Utc::now().timestamp(),
};
to_save.push(peer);
}
if let Err(e) = self.save_peers(to_save) {
error!("Could not save received peer addresses: {:?}", e);
+3
View File
@@ -187,6 +187,9 @@ impl Server {
&self.handshake,
self.peers.clone(),
)?;
if self.peers.enough_outbound_peers() {
peer.stop();
}
let peer = Arc::new(peer);
self.peers.add_connected(peer.clone())?;
Ok(peer)
+2 -1
View File
@@ -27,13 +27,14 @@ const STORE_SUBPATH: &str = "peers";
const PEER_PREFIX: u8 = b'P';
// Types of messages
// Types of peers
enum_from_primitive! {
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum State {
Healthy = 0,
Banned = 1,
Defunct = 2,
Unknown = 3,
}
}
+7 -4
View File
@@ -183,10 +183,13 @@ impl<'de> Visitor<'de> for PeerAddrs {
Ok(ip) => peers.push(PeerAddr(ip)),
// If that fails it's probably a DNS record
Err(_) => {
let socket_addrs = entry.to_socket_addrs().map_err(|_| {
serde::de::Error::custom(format!("Unable to resolve DNS: {}", entry))
})?;
peers.append(&mut socket_addrs.map(PeerAddr).collect());
let socket_addrs: Result<std::vec::IntoIter<SocketAddr>, M::Error> =
entry.to_socket_addrs().map_err(|_| {
serde::de::Error::custom(format!("Unable to resolve DNS: {}", entry))
});
if let Ok(socket_addrs) = socket_addrs {
peers.append(&mut socket_addrs.map(PeerAddr).collect());
}
}
}
}
+119 -95
View File
@@ -53,7 +53,6 @@ pub const TESTNET_DNS_SEEDS: &[&str] = &[
pub fn connect_and_monitor(
p2p_server: Arc<p2p::Server>,
seed_list: Box<dyn Fn() -> Vec<PeerAddr> + Send>,
config: P2PConfig,
stop_state: Arc<StopState>,
) -> std::io::Result<thread::JoinHandle<()>> {
@@ -67,7 +66,7 @@ pub fn connect_and_monitor(
let (tx, rx) = mpsc::channel();
// check seeds first
connect_to_seeds_and_peers(peers.clone(), tx.clone(), seed_list, config);
connect_to_seeds_and_peers(peers.clone(), tx.clone(), config);
let mut prev = DateTime::<Utc>::MIN_UTC;
let mut prev_expire_check = DateTime::<Utc>::MIN_UTC;
@@ -130,12 +129,20 @@ pub fn connect_and_monitor(
}
fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sender<PeerAddr>) {
// maintenance step first, clean up p2p server peers
peers.clean_peers(
config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize,
config.clone(),
);
// regularly check if we need to acquire more peers and if so, gets
// them from db
let mut total_count = 0;
let mut healthy_count = 0;
let mut banned_count = 0;
let mut healthy = vec![];
let mut defuncts = vec![];
let mut unknown = vec![];
for x in peers.all_peer_data().into_iter() {
match x.flags {
@@ -154,8 +161,9 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
banned_count += 1;
}
}
p2p::State::Healthy => healthy_count += 1,
p2p::State::Defunct => defuncts.push(x),
p2p::State::Healthy => healthy.push(x.addr),
p2p::State::Defunct => defuncts.push(x.addr),
p2p::State::Unknown => unknown.push(x.addr),
}
total_count += 1;
}
@@ -167,82 +175,94 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
debug!(
"monitor_peers: on {}:{}, {} connected ({} most_work). \
all {} = {} healthy + {} banned + {} defunct",
all {} = {} healthy + {} banned + {} defunct + {} unknown",
config.host,
config.port,
peers_count,
most_work_count,
total_count,
healthy_count,
healthy.len(),
banned_count,
defuncts.len(),
unknown.len()
);
// maintenance step first, clean up p2p server peers
peers.clean_peers(
config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize,
config.clone(),
);
if peers.enough_outbound_peers() {
// Connect to seeds again if there is no peers at database,
// helps to avoid stuck when 1st request to seed list was failed.
if total_count == 0 {
connect_to_seeds_and_peers(peers.clone(), tx.clone(), config);
return;
}
// loop over connected peers that can provide peer lists
// ask them for their list of peers
let mut connected_peers: Vec<PeerAddr> = vec![];
for p in peers
.iter()
.with_capabilities(p2p::Capabilities::PEER_LIST)
.connected()
{
trace!(
"monitor_peers: {}:{} ask {} for more peers",
config.host,
config.port,
p.info.addr,
);
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
connected_peers.push(p.info.addr)
}
// Attempt to connect to any preferred peers.
let peers_preferred = config.peers_preferred.unwrap_or(PeerAddrs::default());
for p in peers_preferred {
if !connected_peers.is_empty() {
if !connected_peers.contains(&p) {
let _ = tx.send(p);
}
} else {
let _ = tx.send(p);
if !peers.enough_outbound_peers() {
// loop over connected peers that can provide peer lists
// ask them for their list of peers
let mut connected_peers: Vec<PeerAddr> = vec![];
for p in peers
.iter()
.with_capabilities(p2p::Capabilities::PEER_LIST)
.connected()
{
trace!(
"monitor_peers: {}:{} ask {} for more peers",
config.host,
config.port,
p.info.addr,
);
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
connected_peers.push(p.info.addr)
}
}
// take a random defunct peer and mark it healthy: over a long enough period any
// peer will see another as defunct eventually, gives us a chance to retry
if let Some(peer) = defuncts.into_iter().choose(&mut thread_rng()) {
let _ = peers.update_state(peer.addr, p2p::State::Healthy);
// Attempt to connect to any preferred peers.
let default_peers = PeerAddrs::default();
let peers_preferred = config.peers_preferred.as_ref().unwrap_or(&default_peers);
for p in peers_preferred.peers.iter() {
if !connected_peers.is_empty() {
if !connected_peers.contains(&p) {
let _ = tx.send(*p);
}
} else {
let _ = tx.send(*p);
}
}
}
// find some peers from our db
// and queue them up for a connection attempt
// intentionally make too many attempts (2x) as some (most?) will fail
// as many nodes in our db are not publicly accessible
let mut new_peers = vec![];
let max_peer_attempts = 128;
let new_peers = peers.find_peers(
p2p::State::Healthy,
p2p::Capabilities::UNKNOWN,
max_peer_attempts as usize,
);
// check random disconnected healthy peers.
for hpa in healthy
.iter()
.filter(|p| peers.get_connected_peer(**p).is_none())
.choose_multiple(&mut thread_rng(), max_peer_attempts)
{
new_peers.push(hpa);
}
// check random unknown peers received from peer list request.
for upa in unknown
.iter()
.choose_multiple(&mut thread_rng(), max_peer_attempts)
{
new_peers.push(upa);
}
// always check random defunct peers.
for dpa in defuncts
.iter()
.choose_multiple(&mut thread_rng(), max_peer_attempts)
{
new_peers.push(dpa);
}
// Only queue up connection attempts for candidate peers where we
// are confident we do not yet know about this peer.
// The call to is_known() may fail due to contention on the peers map.
// Do not attempt any connection where is_known() fails for any reason.
for p in new_peers {
if let Ok(false) = peers.is_known(p.addr) {
tx.send(p.addr).unwrap();
for pa in new_peers {
if let Ok(false) = peers.is_known(*pa) {
tx.send(*pa).unwrap();
}
}
}
@@ -252,10 +272,10 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
fn connect_to_seeds_and_peers(
peers: Arc<p2p::Peers>,
tx: mpsc::Sender<PeerAddr>,
seed_list: Box<dyn Fn() -> Vec<PeerAddr>>,
config: P2PConfig,
) {
let peers_deny = config.peers_deny.unwrap_or(PeerAddrs::default());
let default_peers = PeerAddrs::default();
let peers_deny = config.peers_deny.as_ref().unwrap_or(&default_peers);
// If "peers_allow" is explicitly configured then just use this list
// remembering to filter out "peers_deny".
@@ -267,7 +287,7 @@ fn connect_to_seeds_and_peers(
}
// Always try our "peers_preferred" remembering to filter out "peers_deny".
if let Some(peers) = config.peers_preferred {
if let Some(peers) = config.peers_preferred.as_ref() {
for addr in peers.difference(peers_deny.as_slice()) {
let _ = tx.send(addr);
}
@@ -275,13 +295,13 @@ fn connect_to_seeds_and_peers(
// check if we have some peers in db
// look for peers that are able to give us other peers (via PEER_LIST capability)
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 100);
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 128);
// if so, get their addresses, otherwise use our seeds
let peer_addrs = if peers.len() > 3 {
peers.iter().map(|p| p.addr).collect::<Vec<_>>()
} else {
seed_list()
seed_list(&config)
};
if peer_addrs.is_empty() {
@@ -307,19 +327,14 @@ fn listen_for_addrs(
) {
// Pull everything currently on the queue off the queue.
// Does not block so addrs may be empty.
// We will take(max_peers) from this later but we want to drain the rx queue
// We will take(max_peers) from this later, but we want to drain the rx queue
// here to prevent it backing up.
let addrs: Vec<PeerAddr> = rx.try_iter().collect();
// If we have a healthy number of outbound peers then we are done here.
if peers.enough_outbound_peers() {
return;
}
// Note: We drained the rx queue earlier to keep it under control.
// Even if there are many addresses to try we will only try a bounded number of them for safety.
let connect_min_interval = 30;
let max_outbound_attempts = 128;
let max_outbound_attempts = 128 * 3;
for addr in addrs.into_iter().take(max_outbound_attempts) {
// ignore the duplicate connecting to same peer within 30 seconds
let now = Utc::now();
@@ -349,7 +364,6 @@ fn listen_for_addrs(
if p.info.capabilities.contains(p2p::Capabilities::PEER_LIST) {
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
}
let _ = peers_c.update_state(addr, p2p::State::Healthy);
}
Err(_) => {
let _ = peers_c.update_state(addr, p2p::State::Defunct);
@@ -373,27 +387,43 @@ fn listen_for_addrs(
}
}
pub fn default_dns_seeds() -> Box<dyn Fn() -> Vec<PeerAddr> + Send> {
Box::new(|| {
let net_seeds = if global::is_testnet() {
TESTNET_DNS_SEEDS
} else {
MAINNET_DNS_SEEDS
};
resolve_dns_to_addrs(
&net_seeds
.iter()
.map(|s| {
s.to_string()
+ if global::is_testnet() {
":13414"
} else {
":3414"
}
})
.collect(),
)
})
fn seed_list(config: &P2PConfig) -> Vec<PeerAddr> {
match config.seeding_type {
p2p::Seeding::None => {
warn!("No seed configured, will stay solo until connected to");
vec![]
}
p2p::Seeding::List => match &config.seeds {
Some(seeds) => seeds.peers.clone(),
None => {
error!("Seeds must be configured for seeding type List");
vec![]
}
},
p2p::Seeding::DNSSeed => default_dns_seeds(),
_ => vec![],
}
}
fn default_dns_seeds() -> Vec<PeerAddr> {
let net_seeds = if global::is_testnet() {
TESTNET_DNS_SEEDS
} else {
MAINNET_DNS_SEEDS
};
resolve_dns_to_addrs(
&net_seeds
.iter()
.map(|s| {
s.to_string()
+ if global::is_testnet() {
":13414"
} else {
":3414"
}
})
.collect(),
)
}
/// Convenience function to resolve dns addresses from DNS records
@@ -414,9 +444,3 @@ pub fn resolve_dns_to_addrs(dns_records: &Vec<String>) -> Vec<PeerAddr> {
debug!("Resolved addresses: {:?}", addresses);
addresses
}
/// Convenience function when the seed list is immediately known. Mostly used
/// for tests.
pub fn predefined_seeds(addrs: Vec<PeerAddr>) -> Box<dyn Fn() -> Vec<PeerAddr> + Send> {
Box::new(move || addrs.clone())
}
-18
View File
@@ -237,26 +237,8 @@ impl Server {
let mut connect_thread = None;
if config.p2p_config.seeding_type != p2p::Seeding::Programmatic {
let seed_list = match config.p2p_config.seeding_type {
p2p::Seeding::None => {
warn!("No seed configured, will stay solo until connected to");
seed::predefined_seeds(vec![])
}
p2p::Seeding::List => match &config.p2p_config.seeds {
Some(seeds) => seed::predefined_seeds(seeds.peers.clone()),
None => {
return Err(Error::Configuration(
"Seeds must be configured for seeding type List".to_owned(),
));
}
},
p2p::Seeding::DNSSeed => seed::default_dns_seeds(),
_ => unreachable!(),
};
connect_thread = Some(seed::connect_and_monitor(
p2p_server.clone(),
seed_list,
config.p2p_config.clone(),
stop_state.clone(),
)?);