Fix PIBD sync stuck (#3817)
* pibd: record segment peer to exclude it from re-request * desegmenter is blocked on a missing kernel * segment iter * add indexing * pibd: log excluded peer * segment batch optimization * pibd: debug logging for stale requests * pibd: filter excluded peer * pibd: check existing kernel segment identifier * pibd: simplify peer selection condition --------- Co-authored-by: Joerg <wiesche89@googlemail.com>
This commit is contained in:
@@ -31,9 +31,12 @@ pub const KERNEL_SEGMENT_HEIGHT: u8 = 11;
|
||||
/// Maximum number of received segments to cache (across all trees) before we stop requesting others
|
||||
pub const MAX_CACHED_SEGMENTS: usize = 15;
|
||||
|
||||
/// Number of segments to apply in a single LMDB transaction
|
||||
pub const SEGMENT_APPLY_BATCH_SIZE: usize = 4;
|
||||
|
||||
/// How long the state sync should wait after requesting a segment from a peer before
|
||||
/// deciding the segment isn't going to arrive. The syncer will then re-request the segment
|
||||
pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 60;
|
||||
pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 20;
|
||||
|
||||
/// Number of simultaneous requests for segments we should make. Note this is currently
|
||||
/// divisible by 3 to try and evenly spread requests amount the 3 main MMRs (Bitmap segments
|
||||
|
||||
+238
-111
@@ -50,6 +50,7 @@ pub struct Desegmenter {
|
||||
default_output_segment_height: u8,
|
||||
default_rangeproof_segment_height: u8,
|
||||
default_kernel_segment_height: u8,
|
||||
segment_apply_batch_size: usize,
|
||||
|
||||
bitmap_accumulator: BitmapAccumulator,
|
||||
bitmap_segment_cache: Vec<Segment<BitmapChunk>>,
|
||||
@@ -93,6 +94,7 @@ impl Desegmenter {
|
||||
default_output_segment_height: pibd_params::OUTPUT_SEGMENT_HEIGHT,
|
||||
default_rangeproof_segment_height: pibd_params::RANGEPROOF_SEGMENT_HEIGHT,
|
||||
default_kernel_segment_height: pibd_params::KERNEL_SEGMENT_HEIGHT,
|
||||
segment_apply_batch_size: pibd_params::SEGMENT_APPLY_BATCH_SIZE,
|
||||
bitmap_segment_cache: vec![],
|
||||
output_segment_cache: vec![],
|
||||
rangeproof_segment_cache: vec![],
|
||||
@@ -384,6 +386,12 @@ impl Desegmenter {
|
||||
.find(|s| s.1.identifier().idx == bmp_idx)
|
||||
{
|
||||
self.apply_bitmap_segment(idx)?;
|
||||
} else {
|
||||
debug!(
|
||||
"desegmenter: waiting for bitmap segment idx {} (cache size {})",
|
||||
bmp_idx,
|
||||
self.bitmap_segment_cache.len()
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// Check if we need to finalize bitmap
|
||||
@@ -394,48 +402,72 @@ impl Desegmenter {
|
||||
|
||||
// Check if we can apply the next output segment(s)
|
||||
if let Some(next_output_idx) = self.next_required_output_segment_index() {
|
||||
if let Some((idx, _seg)) = self
|
||||
.output_segment_cache
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|s| s.1.identifier().idx == next_output_idx)
|
||||
{
|
||||
self.apply_output_segment(idx)?;
|
||||
}
|
||||
} else {
|
||||
if self.output_segment_cache.len() >= self.max_cached_segments {
|
||||
self.output_segment_cache = vec![];
|
||||
let segments = Self::take_segment_batch(
|
||||
&mut self.output_segment_cache,
|
||||
next_output_idx,
|
||||
self.segment_apply_batch_size,
|
||||
);
|
||||
if segments.is_empty() {
|
||||
debug!(
|
||||
"desegmenter: waiting for output segment idx {} (cache size {})",
|
||||
next_output_idx,
|
||||
self.output_segment_cache.len()
|
||||
);
|
||||
} else {
|
||||
self.apply_output_segments(segments)?;
|
||||
}
|
||||
} else if self.output_segment_cache.len() >= self.max_cached_segments {
|
||||
debug!(
|
||||
"desegmenter: dropping {} cached output segments waiting for next requirement",
|
||||
self.output_segment_cache.len()
|
||||
);
|
||||
self.output_segment_cache = vec![];
|
||||
}
|
||||
// Check if we can apply the next rangeproof segment
|
||||
if let Some(next_rp_idx) = self.next_required_rangeproof_segment_index() {
|
||||
if let Some((idx, _seg)) = self
|
||||
.rangeproof_segment_cache
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|s| s.1.identifier().idx == next_rp_idx)
|
||||
{
|
||||
self.apply_rangeproof_segment(idx)?;
|
||||
}
|
||||
} else {
|
||||
if self.rangeproof_segment_cache.len() >= self.max_cached_segments {
|
||||
self.rangeproof_segment_cache = vec![];
|
||||
let segments = Self::take_segment_batch(
|
||||
&mut self.rangeproof_segment_cache,
|
||||
next_rp_idx,
|
||||
self.segment_apply_batch_size,
|
||||
);
|
||||
if segments.is_empty() {
|
||||
debug!(
|
||||
"desegmenter: waiting for rangeproof segment idx {} (cache size {})",
|
||||
next_rp_idx,
|
||||
self.rangeproof_segment_cache.len()
|
||||
);
|
||||
} else {
|
||||
self.apply_rangeproof_segments(segments)?;
|
||||
}
|
||||
} else if self.rangeproof_segment_cache.len() >= self.max_cached_segments {
|
||||
debug!(
|
||||
"desegmenter: dropping {} cached rangeproof segments waiting for next requirement",
|
||||
self.rangeproof_segment_cache.len()
|
||||
);
|
||||
self.rangeproof_segment_cache = vec![];
|
||||
}
|
||||
// Check if we can apply the next kernel segment
|
||||
if let Some(next_kernel_idx) = self.next_required_kernel_segment_index() {
|
||||
if let Some((idx, _seg)) = self
|
||||
.kernel_segment_cache
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|s| s.1.identifier().idx == next_kernel_idx)
|
||||
{
|
||||
self.apply_kernel_segment(idx)?;
|
||||
}
|
||||
} else {
|
||||
if self.kernel_segment_cache.len() >= self.max_cached_segments {
|
||||
self.kernel_segment_cache = vec![];
|
||||
let segments = Self::take_segment_batch(
|
||||
&mut self.kernel_segment_cache,
|
||||
next_kernel_idx,
|
||||
self.segment_apply_batch_size,
|
||||
);
|
||||
if segments.is_empty() {
|
||||
debug!(
|
||||
"desegmenter: waiting for kernel segment idx {} (cache size {})",
|
||||
next_kernel_idx,
|
||||
self.kernel_segment_cache.len()
|
||||
);
|
||||
} else {
|
||||
self.apply_kernel_segments(segments)?;
|
||||
}
|
||||
} else if self.kernel_segment_cache.len() >= self.max_cached_segments {
|
||||
debug!(
|
||||
"desegmenter: dropping {} cached kernel segments waiting for next requirement",
|
||||
self.kernel_segment_cache.len()
|
||||
);
|
||||
self.kernel_segment_cache = vec![];
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -478,79 +510,105 @@ impl Desegmenter {
|
||||
local_kernel_mmr_size = txhashset.kernel_mmr_size();
|
||||
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
|
||||
}
|
||||
// TODO: Fix, alternative approach, this is very inefficient
|
||||
let mut output_identifier_iter = SegmentIdentifier::traversal_iter(
|
||||
let total_output_segments = SegmentIdentifier::count_segments_required(
|
||||
self.archive_header.output_mmr_size,
|
||||
self.default_output_segment_height,
|
||||
);
|
||||
|
||||
let mut elems_added = 0;
|
||||
while let Some(output_id) = output_identifier_iter.next() {
|
||||
// Advance output iterator to next needed position
|
||||
let (_first, last) =
|
||||
output_id.segment_pos_range(self.archive_header.output_mmr_size);
|
||||
if last <= local_output_mmr_size {
|
||||
continue;
|
||||
}
|
||||
if self.output_segment_cache.len() >= self.max_cached_segments {
|
||||
break;
|
||||
}
|
||||
if !self.has_output_segment_with_id(output_id) {
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
if let Some(mut next_output_idx) = self.next_required_output_segment_index() {
|
||||
while (next_output_idx as usize) < total_output_segments {
|
||||
if self.output_segment_cache.len() >= self.max_cached_segments {
|
||||
break;
|
||||
}
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
}
|
||||
let output_id = SegmentIdentifier {
|
||||
height: self.default_output_segment_height,
|
||||
idx: next_output_idx,
|
||||
};
|
||||
let (_first, last) =
|
||||
output_id.segment_pos_range(self.archive_header.output_mmr_size);
|
||||
if last > local_output_mmr_size && !self.has_output_segment_with_id(output_id) {
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
next_output_idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let mut rangeproof_identifier_iter = SegmentIdentifier::traversal_iter(
|
||||
let total_rangeproof_segments = SegmentIdentifier::count_segments_required(
|
||||
self.archive_header.output_mmr_size,
|
||||
self.default_rangeproof_segment_height,
|
||||
);
|
||||
|
||||
elems_added = 0;
|
||||
while let Some(rp_id) = rangeproof_identifier_iter.next() {
|
||||
let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size);
|
||||
// Advance rangeproof iterator to next needed position
|
||||
if last <= local_rangeproof_mmr_size {
|
||||
continue;
|
||||
}
|
||||
if self.rangeproof_segment_cache.len() >= self.max_cached_segments {
|
||||
break;
|
||||
}
|
||||
if !self.has_rangeproof_segment_with_id(rp_id) {
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
if let Some(mut next_rp_idx) = self.next_required_rangeproof_segment_index() {
|
||||
while (next_rp_idx as usize) < total_rangeproof_segments {
|
||||
if self.rangeproof_segment_cache.len() >= self.max_cached_segments {
|
||||
break;
|
||||
}
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
}
|
||||
let rp_id = SegmentIdentifier {
|
||||
height: self.default_rangeproof_segment_height,
|
||||
idx: next_rp_idx,
|
||||
};
|
||||
let (_first, last) =
|
||||
rp_id.segment_pos_range(self.archive_header.output_mmr_size);
|
||||
if last > local_rangeproof_mmr_size
|
||||
&& !self.has_rangeproof_segment_with_id(rp_id)
|
||||
{
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
next_rp_idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let mut kernel_identifier_iter = SegmentIdentifier::traversal_iter(
|
||||
let total_kernel_segments = SegmentIdentifier::count_segments_required(
|
||||
self.archive_header.kernel_mmr_size,
|
||||
self.default_kernel_segment_height,
|
||||
);
|
||||
|
||||
elems_added = 0;
|
||||
while let Some(k_id) = kernel_identifier_iter.next() {
|
||||
// Advance kernel iterator to next needed position
|
||||
let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
|
||||
// Advance rangeproof iterator to next needed position
|
||||
if last <= local_kernel_mmr_size {
|
||||
continue;
|
||||
if let Some(mut next_kernel_idx) = self.next_required_kernel_segment_index() {
|
||||
while (next_kernel_idx as usize) < total_kernel_segments {
|
||||
if self.kernel_segment_cache.len() >= self.max_cached_segments {
|
||||
break;
|
||||
}
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
}
|
||||
let k_id = SegmentIdentifier {
|
||||
height: self.default_kernel_segment_height,
|
||||
idx: next_kernel_idx,
|
||||
};
|
||||
let (_first, last) =
|
||||
k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
|
||||
if last > local_kernel_mmr_size && !self.has_kernel_segment_with_id(k_id) {
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
next_kernel_idx += 1;
|
||||
}
|
||||
if self.kernel_segment_cache.len() >= self.max_cached_segments {
|
||||
break;
|
||||
}
|
||||
if !self.has_kernel_segment_with_id(k_id) {
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Always ensure we explicitly ask for the very next kernel segment we are waiting on.
|
||||
// The regular round-robin above can get saturated with outputs and rangeproofs while
|
||||
// the desegmenter is blocked on a missing kernel, so we force this one in.
|
||||
if let Some(next_kernel_idx) = self.next_required_kernel_segment_index() {
|
||||
let seg_id = SegmentIdentifier {
|
||||
height: self.default_kernel_segment_height,
|
||||
idx: next_kernel_idx,
|
||||
};
|
||||
let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id);
|
||||
if !self.has_kernel_segment_with_id(seg_id)
|
||||
&& !return_vec.iter().any(|x| x == &next_kernel_seg_id)
|
||||
{
|
||||
if return_vec.len() >= max_elements {
|
||||
return_vec.pop();
|
||||
}
|
||||
return_vec.push(next_kernel_seg_id);
|
||||
}
|
||||
}
|
||||
if return_vec.is_empty() && self.bitmap_cache.is_some() {
|
||||
@@ -708,12 +766,36 @@ impl Desegmenter {
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply an output segment at the index cache
|
||||
pub fn apply_output_segment(&mut self, idx: usize) -> Result<(), Error> {
|
||||
let segment = self.output_segment_cache.remove(idx);
|
||||
fn take_segment_batch<T>(
|
||||
cache: &mut Vec<Segment<T>>,
|
||||
start_idx: u64,
|
||||
max_segments: usize,
|
||||
) -> Vec<Segment<T>> {
|
||||
let mut result = Vec::new();
|
||||
let mut next_idx = start_idx;
|
||||
while result.len() < max_segments {
|
||||
if let Some(pos) = cache.iter().position(|s| s.identifier().idx == next_idx) {
|
||||
result.push(cache.remove(pos));
|
||||
next_idx += 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Apply a batch of output segments
|
||||
pub fn apply_output_segments(
|
||||
&mut self,
|
||||
segments: Vec<Segment<OutputIdentifier>>,
|
||||
) -> Result<(), Error> {
|
||||
if segments.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
trace!(
|
||||
"pibd_desegmenter: applying output segment at segment idx {}",
|
||||
segment.identifier().idx
|
||||
"pibd_desegmenter: applying {} output segment(s) starting at idx {}",
|
||||
segments.len(),
|
||||
segments.first().map(|s| s.identifier().idx).unwrap_or(0)
|
||||
);
|
||||
let mut header_pmmr = self.header_pmmr.write();
|
||||
let mut txhashset = self.txhashset.write();
|
||||
@@ -724,7 +806,9 @@ impl Desegmenter {
|
||||
&mut batch,
|
||||
|ext, _batch| {
|
||||
let extension = &mut ext.extension;
|
||||
extension.apply_output_segment(segment)?;
|
||||
for segment in segments {
|
||||
extension.apply_output_segment(segment)?;
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
@@ -821,12 +905,18 @@ impl Desegmenter {
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply a rangeproof segment at the index cache
|
||||
pub fn apply_rangeproof_segment(&mut self, idx: usize) -> Result<(), Error> {
|
||||
let segment = self.rangeproof_segment_cache.remove(idx);
|
||||
/// Apply a batch of rangeproof segments
|
||||
pub fn apply_rangeproof_segments(
|
||||
&mut self,
|
||||
segments: Vec<Segment<RangeProof>>,
|
||||
) -> Result<(), Error> {
|
||||
if segments.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
trace!(
|
||||
"pibd_desegmenter: applying rangeproof segment at segment idx {}",
|
||||
segment.identifier().idx
|
||||
"pibd_desegmenter: applying {} rangeproof segment(s) starting at idx {}",
|
||||
segments.len(),
|
||||
segments.first().map(|s| s.identifier().idx).unwrap_or(0)
|
||||
);
|
||||
let mut header_pmmr = self.header_pmmr.write();
|
||||
let mut txhashset = self.txhashset.write();
|
||||
@@ -837,7 +927,9 @@ impl Desegmenter {
|
||||
&mut batch,
|
||||
|ext, _batch| {
|
||||
let extension = &mut ext.extension;
|
||||
extension.apply_rangeproof_segment(segment)?;
|
||||
for segment in segments {
|
||||
extension.apply_rangeproof_segment(segment)?;
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
@@ -920,12 +1012,16 @@ impl Desegmenter {
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply a kernel segment at the index cache
|
||||
pub fn apply_kernel_segment(&mut self, idx: usize) -> Result<(), Error> {
|
||||
let segment = self.kernel_segment_cache.remove(idx);
|
||||
trace!(
|
||||
"pibd_desegmenter: applying kernel segment at segment idx {}",
|
||||
segment.identifier().idx
|
||||
/// Apply a batch of kernel segments at the index cache
|
||||
pub fn apply_kernel_segments(&mut self, segments: Vec<Segment<TxKernel>>) -> Result<(), Error> {
|
||||
if segments.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let first_idx = segments.first().map(|s| s.identifier().idx).unwrap_or(0);
|
||||
debug!(
|
||||
"pibd_desegmenter: applying {} kernel segment(s) starting at idx {}",
|
||||
segments.len(),
|
||||
first_idx
|
||||
);
|
||||
let mut header_pmmr = self.header_pmmr.write();
|
||||
let mut txhashset = self.txhashset.write();
|
||||
@@ -936,7 +1032,20 @@ impl Desegmenter {
|
||||
&mut batch,
|
||||
|ext, _batch| {
|
||||
let extension = &mut ext.extension;
|
||||
extension.apply_kernel_segment(segment)?;
|
||||
for segment in segments {
|
||||
let seg_idx = segment.identifier().idx;
|
||||
if let Err(e) = extension.apply_kernel_segment(segment) {
|
||||
error!(
|
||||
"pibd_desegmenter: failed to apply kernel segment idx {}: {}",
|
||||
seg_idx, e
|
||||
);
|
||||
return Err(e);
|
||||
}
|
||||
debug!(
|
||||
"pibd_desegmenter: successfully applied kernel segment idx {}",
|
||||
seg_idx
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
@@ -985,13 +1094,31 @@ impl Desegmenter {
|
||||
|
||||
/// Adds a Kernel segment
|
||||
pub fn add_kernel_segment(&mut self, segment: Segment<TxKernel>) -> Result<(), Error> {
|
||||
trace!("pibd_desegmenter: add kernel segment");
|
||||
segment.validate(
|
||||
self.archive_header.kernel_mmr_size, // Last MMR pos at the height being validated
|
||||
None,
|
||||
self.archive_header.kernel_root, // Kernel root we're checking for
|
||||
)?;
|
||||
let idx = segment.identifier().idx;
|
||||
debug!(
|
||||
"pibd_desegmenter: received kernel segment idx {} (cache size {})",
|
||||
idx,
|
||||
self.kernel_segment_cache.len()
|
||||
);
|
||||
segment
|
||||
.validate(
|
||||
self.archive_header.kernel_mmr_size, // Last MMR pos at the height being validated
|
||||
None,
|
||||
self.archive_header.kernel_root, // Kernel root we're checking for
|
||||
)
|
||||
.map_err(|e| {
|
||||
error!(
|
||||
"pibd_desegmenter: kernel segment idx {} failed validation: {}",
|
||||
idx, e
|
||||
);
|
||||
e
|
||||
})?;
|
||||
self.cache_kernel_segment(segment);
|
||||
debug!(
|
||||
"pibd_desegmenter: cached kernel segment idx {} (cache size {})",
|
||||
idx,
|
||||
self.kernel_segment_cache.len()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
+21
-7
@@ -16,6 +16,7 @@
|
||||
|
||||
use chrono::prelude::{DateTime, Utc};
|
||||
use chrono::Duration;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use crate::core::core::hash::{Hash, Hashed, ZERO_HASH};
|
||||
use crate::core::core::{pmmr, Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier};
|
||||
@@ -155,14 +156,17 @@ pub struct PIBDSegmentContainer {
|
||||
pub identifier: SegmentTypeIdentifier,
|
||||
/// Time at which this request was made
|
||||
pub request_time: DateTime<Utc>,
|
||||
/// Peer that most recently received this request
|
||||
pub last_peer: Option<SocketAddr>,
|
||||
}
|
||||
|
||||
impl PIBDSegmentContainer {
|
||||
/// Return container with timestamp
|
||||
pub fn new(identifier: SegmentTypeIdentifier) -> Self {
|
||||
pub fn new(identifier: SegmentTypeIdentifier, peer_addr: Option<SocketAddr>) -> Self {
|
||||
Self {
|
||||
identifier,
|
||||
request_time: Utc::now(),
|
||||
last_peer: peer_addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -278,28 +282,38 @@ impl SyncState {
|
||||
}
|
||||
|
||||
/// Update PIBD segment list
|
||||
pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier) {
|
||||
pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier, peer_addr: SocketAddr) {
|
||||
debug!("sync_state: tracking PIBD request for {:?}", id);
|
||||
self.requested_pibd_segments
|
||||
.write()
|
||||
.push(PIBDSegmentContainer::new(id.clone()));
|
||||
.push(PIBDSegmentContainer::new(id.clone(), Some(peer_addr)));
|
||||
}
|
||||
|
||||
/// Remove segment from list
|
||||
pub fn remove_pibd_segment(&self, id: &SegmentTypeIdentifier) {
|
||||
debug!("sync_state: removing PIBD request tracking for {:?}", id);
|
||||
self.requested_pibd_segments
|
||||
.write()
|
||||
.retain(|i| &i.identifier != id);
|
||||
}
|
||||
|
||||
/// Remove segments with request timestamps less than cutoff time
|
||||
pub fn remove_stale_pibd_requests(&self, timeout_seconds: i64) {
|
||||
pub fn remove_stale_pibd_requests(
|
||||
&self,
|
||||
timeout_seconds: i64,
|
||||
) -> Vec<(SegmentTypeIdentifier, Option<SocketAddr>)> {
|
||||
let cutoff_time = Utc::now() - Duration::seconds(timeout_seconds);
|
||||
let mut removed_segments = vec![];
|
||||
self.requested_pibd_segments.write().retain(|i| {
|
||||
if i.request_time <= cutoff_time {
|
||||
debug!("Removing + retrying PIBD request after timeout: {:?}", i)
|
||||
};
|
||||
i.request_time > cutoff_time
|
||||
debug!("Removing + retrying PIBD request after timeout: {:?}", i);
|
||||
removed_segments.push((i.identifier.clone(), i.last_peer));
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
removed_segments
|
||||
}
|
||||
|
||||
/// Check whether segment is in request list
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use crate::util::RwLock;
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -774,6 +775,13 @@ impl<I: Iterator<Item = Arc<Peer>>> PeersIter<I> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Filter to exclude peer by address.
|
||||
pub fn exclude(self, addr: Option<SocketAddr>) -> PeersIter<impl Iterator<Item = Arc<Peer>>> {
|
||||
PeersIter {
|
||||
iter: self.iter.filter(move |p| Some(p.info.addr.0) != addr),
|
||||
}
|
||||
}
|
||||
|
||||
/// Filter peers with the provided difficulty comparison fn.
|
||||
///
|
||||
/// with_difficulty(|x| x > diff)
|
||||
|
||||
@@ -250,9 +250,10 @@ impl StateSync {
|
||||
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
|
||||
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
|
||||
|
||||
// Remove stale requests, if we haven't recieved the segment within a minute re-request
|
||||
// Remove stale requests, if we haven't received the segment within a minute re-request
|
||||
// TODO: verify timing
|
||||
self.sync_state
|
||||
let stale_segments = self
|
||||
.sync_state
|
||||
.remove_stale_pibd_requests(pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS);
|
||||
|
||||
// Apply segments... TODO: figure out how this should be called, might
|
||||
@@ -280,12 +281,24 @@ impl StateSync {
|
||||
// (12 is divisible by 3, to try and evenly spread the requests among the 3
|
||||
// main pmmrs. Bitmaps segments will always be requested first)
|
||||
next_segment_ids = d.next_desired_segments(pibd_params::SEGMENT_REQUEST_COUNT);
|
||||
if !next_segment_ids.is_empty() {
|
||||
debug!(
|
||||
"state_sync: requesting next PIBD segments {:?}",
|
||||
next_segment_ids
|
||||
);
|
||||
} else {
|
||||
trace!("state_sync: no PIBD segments requested this loop");
|
||||
}
|
||||
}
|
||||
|
||||
// For each segment, pick a desirable peer and send message
|
||||
// (Provided we're not waiting for a response for this message from someone else)
|
||||
for seg_id in next_segment_ids.iter() {
|
||||
if self.sync_state.contains_pibd_segment(seg_id) {
|
||||
debug!(
|
||||
"state_sync: segment {:?} already requested, waiting for response",
|
||||
seg_id
|
||||
);
|
||||
trace!("Request list contains, continuing: {:?}", seg_id);
|
||||
continue;
|
||||
}
|
||||
@@ -330,16 +343,26 @@ impl StateSync {
|
||||
self.set_earliest_zero_pibd_peer_time(None)
|
||||
}
|
||||
|
||||
// Choose a random "most work" peer, preferring outbound if at all possible.
|
||||
// Choose a random "most work" peer, excluding peer from stale segment and preferring outbound if at all possible.
|
||||
let excluded_peer = stale_segments
|
||||
.iter()
|
||||
.find(|(stale_id, _)| stale_id == seg_id)
|
||||
.and_then(|(_, addr)| *addr);
|
||||
let peer = peers_iter_pibd()
|
||||
.outbound()
|
||||
.exclude(excluded_peer)
|
||||
.choose_random()
|
||||
.or_else(|| peers_iter_pibd().inbound().choose_random());
|
||||
.or_else(|| {
|
||||
peers_iter_pibd()
|
||||
.inbound()
|
||||
.exclude(excluded_peer)
|
||||
.choose_random()
|
||||
});
|
||||
trace!("Chosen peer is {:?}", peer);
|
||||
|
||||
if let Some(p) = peer {
|
||||
// add to list of segments that are being tracked
|
||||
self.sync_state.add_pibd_segment(seg_id);
|
||||
self.sync_state.add_pibd_segment(seg_id, p.info.addr.0);
|
||||
let res = match seg_id.segment_type {
|
||||
SegmentType::Bitmap => p.send_bitmap_segment_request(
|
||||
archive_header.hash(),
|
||||
@@ -364,6 +387,23 @@ impl StateSync {
|
||||
p.info.addr, e
|
||||
);
|
||||
self.sync_state.remove_pibd_segment(seg_id);
|
||||
} else if let Some(prev_peer) = excluded_peer {
|
||||
if p.info.addr.0 != prev_peer {
|
||||
info!(
|
||||
"state_sync: retrying segment {:?} with new peer {} (previously {})",
|
||||
seg_id, p.info.addr, prev_peer
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
"state_sync: requested segment {:?} from peer {}",
|
||||
seg_id, p.info.addr
|
||||
);
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"state_sync: requested segment {:?} from peer {}",
|
||||
seg_id, p.info.addr
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user