p2p: cleanup before selection at monitor, add outbound to connected list only when there is not enough peers + disconnect extra peer immediately, reconnect to seeds at monitor to avoid stuck, update only defunct state to unknown when received existing peer address
This commit is contained in:
+6
-10
@@ -61,6 +61,7 @@ impl Peers {
|
||||
/// Adds the peer to our internal peer mapping. Note that the peer is still
|
||||
/// returned so the server can run it.
|
||||
pub fn add_connected(&self, peer: Arc<Peer>) -> Result<(), Error> {
|
||||
let enough_outbound = self.enough_outbound_peers();
|
||||
let peer_data: PeerData;
|
||||
{
|
||||
// Scope for peers vector lock - dont hold the peers lock while adding to lmdb
|
||||
@@ -77,8 +78,10 @@ impl Peers {
|
||||
ban_reason: ReasonForBan::None,
|
||||
last_connected: Utc::now().timestamp(),
|
||||
};
|
||||
debug!("Adding newly connected peer {}.", peer_data.addr);
|
||||
peers.insert(peer_data.addr, peer);
|
||||
if !enough_outbound || !peer.info.is_outbound() {
|
||||
debug!("Adding newly connected peer {}.", peer_data.addr);
|
||||
peers.insert(peer_data.addr, peer);
|
||||
}
|
||||
}
|
||||
debug!("Saving newly connected peer {}.", peer_data.addr);
|
||||
if let Err(e) = self.save_peer(&peer_data) {
|
||||
@@ -179,13 +182,6 @@ impl Peers {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_healthy(&self, peer_addr: PeerAddr) -> bool {
|
||||
if let Ok(peer) = self.store.get_peer(peer_addr) {
|
||||
return peer.flags == State::Healthy;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
fn broadcast<F>(&self, obj_name: &str, inner: F) -> u32
|
||||
where
|
||||
F: Fn(&Peer) -> Result<bool, Error>,
|
||||
@@ -713,7 +709,7 @@ impl NetAdapter for Peers {
|
||||
let mut to_save: Vec<PeerData> = Vec::new();
|
||||
for pa in peer_addrs {
|
||||
if let Ok(mut p) = self.get_peer(pa) {
|
||||
if self.is_healthy(pa) || self.is_banned(pa) {
|
||||
if p.flags != State::Defunct {
|
||||
continue;
|
||||
}
|
||||
p.flags = State::Unknown;
|
||||
|
||||
@@ -187,6 +187,9 @@ impl Server {
|
||||
&self.handshake,
|
||||
self.peers.clone(),
|
||||
)?;
|
||||
if self.peers.enough_outbound_peers() {
|
||||
peer.stop();
|
||||
}
|
||||
let peer = Arc::new(peer);
|
||||
self.peers.add_connected(peer.clone())?;
|
||||
Ok(peer)
|
||||
|
||||
+60
-43
@@ -53,7 +53,6 @@ pub const TESTNET_DNS_SEEDS: &[&str] = &[
|
||||
|
||||
pub fn connect_and_monitor(
|
||||
p2p_server: Arc<p2p::Server>,
|
||||
seed_list: Box<dyn Fn() -> Vec<PeerAddr> + Send>,
|
||||
config: P2PConfig,
|
||||
stop_state: Arc<StopState>,
|
||||
) -> std::io::Result<thread::JoinHandle<()>> {
|
||||
@@ -67,7 +66,7 @@ pub fn connect_and_monitor(
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
// check seeds first
|
||||
connect_to_seeds_and_peers(peers.clone(), tx.clone(), seed_list, config);
|
||||
connect_to_seeds_and_peers(peers.clone(), tx.clone(), config);
|
||||
|
||||
let mut prev = DateTime::<Utc>::MIN_UTC;
|
||||
let mut prev_expire_check = DateTime::<Utc>::MIN_UTC;
|
||||
@@ -130,6 +129,13 @@ pub fn connect_and_monitor(
|
||||
}
|
||||
|
||||
fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sender<PeerAddr>) {
|
||||
// maintenance step first, clean up p2p server peers
|
||||
peers.clean_peers(
|
||||
config.peer_max_inbound_count() as usize,
|
||||
config.peer_max_outbound_count() as usize,
|
||||
config.clone(),
|
||||
);
|
||||
|
||||
// regularly check if we need to acquire more peers and if so, gets
|
||||
// them from db
|
||||
let mut total_count = 0;
|
||||
@@ -181,12 +187,12 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
|
||||
unknown.len()
|
||||
);
|
||||
|
||||
// maintenance step first, clean up p2p server peers
|
||||
peers.clean_peers(
|
||||
config.peer_max_inbound_count() as usize,
|
||||
config.peer_max_outbound_count() as usize,
|
||||
config.clone(),
|
||||
);
|
||||
// Connect to seeds again if there is no peers at database,
|
||||
// helps to avoid stuck when 1st request to seed list was failed.
|
||||
if total_count == 0 {
|
||||
connect_to_seeds_and_peers(peers.clone(), tx.clone(), config);
|
||||
return;
|
||||
}
|
||||
|
||||
if !peers.enough_outbound_peers() {
|
||||
// loop over connected peers that can provide peer lists
|
||||
@@ -208,14 +214,15 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
|
||||
}
|
||||
|
||||
// Attempt to connect to any preferred peers.
|
||||
let peers_preferred = config.peers_preferred.unwrap_or(PeerAddrs::default());
|
||||
for p in peers_preferred {
|
||||
let default_peers = PeerAddrs::default();
|
||||
let peers_preferred = config.peers_preferred.as_ref().unwrap_or(&default_peers);
|
||||
for p in peers_preferred.peers.iter() {
|
||||
if !connected_peers.is_empty() {
|
||||
if !connected_peers.contains(&p) {
|
||||
let _ = tx.send(p);
|
||||
let _ = tx.send(*p);
|
||||
}
|
||||
} else {
|
||||
let _ = tx.send(p);
|
||||
let _ = tx.send(*p);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -265,10 +272,10 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
|
||||
fn connect_to_seeds_and_peers(
|
||||
peers: Arc<p2p::Peers>,
|
||||
tx: mpsc::Sender<PeerAddr>,
|
||||
seed_list: Box<dyn Fn() -> Vec<PeerAddr>>,
|
||||
config: P2PConfig,
|
||||
) {
|
||||
let peers_deny = config.peers_deny.unwrap_or(PeerAddrs::default());
|
||||
let default_peers = PeerAddrs::default();
|
||||
let peers_deny = config.peers_deny.as_ref().unwrap_or(&default_peers);
|
||||
|
||||
// If "peers_allow" is explicitly configured then just use this list
|
||||
// remembering to filter out "peers_deny".
|
||||
@@ -280,7 +287,7 @@ fn connect_to_seeds_and_peers(
|
||||
}
|
||||
|
||||
// Always try our "peers_preferred" remembering to filter out "peers_deny".
|
||||
if let Some(peers) = config.peers_preferred {
|
||||
if let Some(peers) = config.peers_preferred.as_ref() {
|
||||
for addr in peers.difference(peers_deny.as_slice()) {
|
||||
let _ = tx.send(addr);
|
||||
}
|
||||
@@ -294,7 +301,7 @@ fn connect_to_seeds_and_peers(
|
||||
let peer_addrs = if peers.len() > 3 {
|
||||
peers.iter().map(|p| p.addr).collect::<Vec<_>>()
|
||||
} else {
|
||||
seed_list()
|
||||
seed_list(&config)
|
||||
};
|
||||
|
||||
if peer_addrs.is_empty() {
|
||||
@@ -380,27 +387,43 @@ fn listen_for_addrs(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default_dns_seeds() -> Box<dyn Fn() -> Vec<PeerAddr> + Send> {
|
||||
Box::new(|| {
|
||||
let net_seeds = if global::is_testnet() {
|
||||
TESTNET_DNS_SEEDS
|
||||
} else {
|
||||
MAINNET_DNS_SEEDS
|
||||
};
|
||||
resolve_dns_to_addrs(
|
||||
&net_seeds
|
||||
.iter()
|
||||
.map(|s| {
|
||||
s.to_string()
|
||||
+ if global::is_testnet() {
|
||||
":13414"
|
||||
} else {
|
||||
":3414"
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
})
|
||||
fn seed_list(config: &P2PConfig) -> Vec<PeerAddr> {
|
||||
match config.seeding_type {
|
||||
p2p::Seeding::None => {
|
||||
warn!("No seed configured, will stay solo until connected to");
|
||||
vec![]
|
||||
}
|
||||
p2p::Seeding::List => match &config.seeds {
|
||||
Some(seeds) => seeds.peers.clone(),
|
||||
None => {
|
||||
error!("Seeds must be configured for seeding type List");
|
||||
vec![]
|
||||
}
|
||||
},
|
||||
p2p::Seeding::DNSSeed => default_dns_seeds(),
|
||||
_ => vec![],
|
||||
}
|
||||
}
|
||||
|
||||
fn default_dns_seeds() -> Vec<PeerAddr> {
|
||||
let net_seeds = if global::is_testnet() {
|
||||
TESTNET_DNS_SEEDS
|
||||
} else {
|
||||
MAINNET_DNS_SEEDS
|
||||
};
|
||||
resolve_dns_to_addrs(
|
||||
&net_seeds
|
||||
.iter()
|
||||
.map(|s| {
|
||||
s.to_string()
|
||||
+ if global::is_testnet() {
|
||||
":13414"
|
||||
} else {
|
||||
":3414"
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Convenience function to resolve dns addresses from DNS records
|
||||
@@ -421,9 +444,3 @@ pub fn resolve_dns_to_addrs(dns_records: &Vec<String>) -> Vec<PeerAddr> {
|
||||
debug!("Resolved addresses: {:?}", addresses);
|
||||
addresses
|
||||
}
|
||||
|
||||
/// Convenience function when the seed list is immediately known. Mostly used
|
||||
/// for tests.
|
||||
pub fn predefined_seeds(addrs: Vec<PeerAddr>) -> Box<dyn Fn() -> Vec<PeerAddr> + Send> {
|
||||
Box::new(move || addrs.clone())
|
||||
}
|
||||
|
||||
@@ -237,26 +237,8 @@ impl Server {
|
||||
let mut connect_thread = None;
|
||||
|
||||
if config.p2p_config.seeding_type != p2p::Seeding::Programmatic {
|
||||
let seed_list = match config.p2p_config.seeding_type {
|
||||
p2p::Seeding::None => {
|
||||
warn!("No seed configured, will stay solo until connected to");
|
||||
seed::predefined_seeds(vec![])
|
||||
}
|
||||
p2p::Seeding::List => match &config.p2p_config.seeds {
|
||||
Some(seeds) => seed::predefined_seeds(seeds.peers.clone()),
|
||||
None => {
|
||||
return Err(Error::Configuration(
|
||||
"Seeds must be configured for seeding type List".to_owned(),
|
||||
));
|
||||
}
|
||||
},
|
||||
p2p::Seeding::DNSSeed => seed::default_dns_seeds(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
connect_thread = Some(seed::connect_and_monitor(
|
||||
p2p_server.clone(),
|
||||
seed_list,
|
||||
config.p2p_config.clone(),
|
||||
stop_state.clone(),
|
||||
)?);
|
||||
|
||||
Reference in New Issue
Block a user