From 376c85bab5fffa608e372c21e95241018e741252 Mon Sep 17 00:00:00 2001 From: ardocrat Date: Thu, 8 Jan 2026 22:40:59 +0300 Subject: [PATCH] Fix PIBD sync stuck (#3817) * pibd: record segment peer to exclude it from re-request * desegmenter is blocked on a missing kernel * segment iter * add indexing * pibd: log excluded peer * segment batch optimization * pibd: debug logging for stale requests * pibd: filter excluded peer * pibd: check existing kernel segment identifier * pibd: simplify peer selection condition --------- Co-authored-by: Joerg --- chain/src/pibd_params.rs | 5 +- chain/src/txhashset/desegmenter.rs | 349 +++++++++++++++++++--------- chain/src/types.rs | 28 ++- p2p/src/peers.rs | 8 + servers/src/grin/sync/state_sync.rs | 50 +++- 5 files changed, 316 insertions(+), 124 deletions(-) diff --git a/chain/src/pibd_params.rs b/chain/src/pibd_params.rs index 44fa7769..fe8dbc7a 100644 --- a/chain/src/pibd_params.rs +++ b/chain/src/pibd_params.rs @@ -31,9 +31,12 @@ pub const KERNEL_SEGMENT_HEIGHT: u8 = 11; /// Maximum number of received segments to cache (across all trees) before we stop requesting others pub const MAX_CACHED_SEGMENTS: usize = 15; +/// Number of segments to apply in a single LMDB transaction +pub const SEGMENT_APPLY_BATCH_SIZE: usize = 4; + /// How long the state sync should wait after requesting a segment from a peer before /// deciding the segment isn't going to arrive. The syncer will then re-request the segment -pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 60; +pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 20; /// Number of simultaneous requests for segments we should make. Note this is currently /// divisible by 3 to try and evenly spread requests amount the 3 main MMRs (Bitmap segments diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index fbbe37ec..e4bb773c 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -50,6 +50,7 @@ pub struct Desegmenter { default_output_segment_height: u8, default_rangeproof_segment_height: u8, default_kernel_segment_height: u8, + segment_apply_batch_size: usize, bitmap_accumulator: BitmapAccumulator, bitmap_segment_cache: Vec>, @@ -93,6 +94,7 @@ impl Desegmenter { default_output_segment_height: pibd_params::OUTPUT_SEGMENT_HEIGHT, default_rangeproof_segment_height: pibd_params::RANGEPROOF_SEGMENT_HEIGHT, default_kernel_segment_height: pibd_params::KERNEL_SEGMENT_HEIGHT, + segment_apply_batch_size: pibd_params::SEGMENT_APPLY_BATCH_SIZE, bitmap_segment_cache: vec![], output_segment_cache: vec![], rangeproof_segment_cache: vec![], @@ -384,6 +386,12 @@ impl Desegmenter { .find(|s| s.1.identifier().idx == bmp_idx) { self.apply_bitmap_segment(idx)?; + } else { + debug!( + "desegmenter: waiting for bitmap segment idx {} (cache size {})", + bmp_idx, + self.bitmap_segment_cache.len() + ); } } else { // Check if we need to finalize bitmap @@ -394,48 +402,72 @@ impl Desegmenter { // Check if we can apply the next output segment(s) if let Some(next_output_idx) = self.next_required_output_segment_index() { - if let Some((idx, _seg)) = self - .output_segment_cache - .iter() - .enumerate() - .find(|s| s.1.identifier().idx == next_output_idx) - { - self.apply_output_segment(idx)?; - } - } else { - if self.output_segment_cache.len() >= self.max_cached_segments { - self.output_segment_cache = vec![]; + let segments = Self::take_segment_batch( + &mut self.output_segment_cache, + next_output_idx, + self.segment_apply_batch_size, + ); + if segments.is_empty() { + debug!( + "desegmenter: waiting for output segment idx {} (cache size {})", + next_output_idx, + self.output_segment_cache.len() + ); + } else { + self.apply_output_segments(segments)?; } + } else if self.output_segment_cache.len() >= self.max_cached_segments { + debug!( + "desegmenter: dropping {} cached output segments waiting for next requirement", + self.output_segment_cache.len() + ); + self.output_segment_cache = vec![]; } // Check if we can apply the next rangeproof segment if let Some(next_rp_idx) = self.next_required_rangeproof_segment_index() { - if let Some((idx, _seg)) = self - .rangeproof_segment_cache - .iter() - .enumerate() - .find(|s| s.1.identifier().idx == next_rp_idx) - { - self.apply_rangeproof_segment(idx)?; - } - } else { - if self.rangeproof_segment_cache.len() >= self.max_cached_segments { - self.rangeproof_segment_cache = vec![]; + let segments = Self::take_segment_batch( + &mut self.rangeproof_segment_cache, + next_rp_idx, + self.segment_apply_batch_size, + ); + if segments.is_empty() { + debug!( + "desegmenter: waiting for rangeproof segment idx {} (cache size {})", + next_rp_idx, + self.rangeproof_segment_cache.len() + ); + } else { + self.apply_rangeproof_segments(segments)?; } + } else if self.rangeproof_segment_cache.len() >= self.max_cached_segments { + debug!( + "desegmenter: dropping {} cached rangeproof segments waiting for next requirement", + self.rangeproof_segment_cache.len() + ); + self.rangeproof_segment_cache = vec![]; } // Check if we can apply the next kernel segment if let Some(next_kernel_idx) = self.next_required_kernel_segment_index() { - if let Some((idx, _seg)) = self - .kernel_segment_cache - .iter() - .enumerate() - .find(|s| s.1.identifier().idx == next_kernel_idx) - { - self.apply_kernel_segment(idx)?; - } - } else { - if self.kernel_segment_cache.len() >= self.max_cached_segments { - self.kernel_segment_cache = vec![]; + let segments = Self::take_segment_batch( + &mut self.kernel_segment_cache, + next_kernel_idx, + self.segment_apply_batch_size, + ); + if segments.is_empty() { + debug!( + "desegmenter: waiting for kernel segment idx {} (cache size {})", + next_kernel_idx, + self.kernel_segment_cache.len() + ); + } else { + self.apply_kernel_segments(segments)?; } + } else if self.kernel_segment_cache.len() >= self.max_cached_segments { + debug!( + "desegmenter: dropping {} cached kernel segments waiting for next requirement", + self.kernel_segment_cache.len() + ); + self.kernel_segment_cache = vec![]; } } Ok(()) @@ -478,79 +510,105 @@ impl Desegmenter { local_kernel_mmr_size = txhashset.kernel_mmr_size(); local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size(); } - // TODO: Fix, alternative approach, this is very inefficient - let mut output_identifier_iter = SegmentIdentifier::traversal_iter( + let total_output_segments = SegmentIdentifier::count_segments_required( self.archive_header.output_mmr_size, self.default_output_segment_height, ); - let mut elems_added = 0; - while let Some(output_id) = output_identifier_iter.next() { - // Advance output iterator to next needed position - let (_first, last) = - output_id.segment_pos_range(self.archive_header.output_mmr_size); - if last <= local_output_mmr_size { - continue; - } - if self.output_segment_cache.len() >= self.max_cached_segments { - break; - } - if !self.has_output_segment_with_id(output_id) { - return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id)); - elems_added += 1; - } - if elems_added == max_elements / 3 { - break; + if let Some(mut next_output_idx) = self.next_required_output_segment_index() { + while (next_output_idx as usize) < total_output_segments { + if self.output_segment_cache.len() >= self.max_cached_segments { + break; + } + if elems_added == max_elements / 3 { + break; + } + let output_id = SegmentIdentifier { + height: self.default_output_segment_height, + idx: next_output_idx, + }; + let (_first, last) = + output_id.segment_pos_range(self.archive_header.output_mmr_size); + if last > local_output_mmr_size && !self.has_output_segment_with_id(output_id) { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id)); + elems_added += 1; + } + next_output_idx += 1; } } - let mut rangeproof_identifier_iter = SegmentIdentifier::traversal_iter( + let total_rangeproof_segments = SegmentIdentifier::count_segments_required( self.archive_header.output_mmr_size, self.default_rangeproof_segment_height, ); - elems_added = 0; - while let Some(rp_id) = rangeproof_identifier_iter.next() { - let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size); - // Advance rangeproof iterator to next needed position - if last <= local_rangeproof_mmr_size { - continue; - } - if self.rangeproof_segment_cache.len() >= self.max_cached_segments { - break; - } - if !self.has_rangeproof_segment_with_id(rp_id) { - return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id)); - elems_added += 1; - } - if elems_added == max_elements / 3 { - break; + if let Some(mut next_rp_idx) = self.next_required_rangeproof_segment_index() { + while (next_rp_idx as usize) < total_rangeproof_segments { + if self.rangeproof_segment_cache.len() >= self.max_cached_segments { + break; + } + if elems_added == max_elements / 3 { + break; + } + let rp_id = SegmentIdentifier { + height: self.default_rangeproof_segment_height, + idx: next_rp_idx, + }; + let (_first, last) = + rp_id.segment_pos_range(self.archive_header.output_mmr_size); + if last > local_rangeproof_mmr_size + && !self.has_rangeproof_segment_with_id(rp_id) + { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id)); + elems_added += 1; + } + next_rp_idx += 1; } } - let mut kernel_identifier_iter = SegmentIdentifier::traversal_iter( + let total_kernel_segments = SegmentIdentifier::count_segments_required( self.archive_header.kernel_mmr_size, self.default_kernel_segment_height, ); - elems_added = 0; - while let Some(k_id) = kernel_identifier_iter.next() { - // Advance kernel iterator to next needed position - let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size); - // Advance rangeproof iterator to next needed position - if last <= local_kernel_mmr_size { - continue; + if let Some(mut next_kernel_idx) = self.next_required_kernel_segment_index() { + while (next_kernel_idx as usize) < total_kernel_segments { + if self.kernel_segment_cache.len() >= self.max_cached_segments { + break; + } + if elems_added == max_elements / 3 { + break; + } + let k_id = SegmentIdentifier { + height: self.default_kernel_segment_height, + idx: next_kernel_idx, + }; + let (_first, last) = + k_id.segment_pos_range(self.archive_header.kernel_mmr_size); + if last > local_kernel_mmr_size && !self.has_kernel_segment_with_id(k_id) { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id)); + elems_added += 1; + } + next_kernel_idx += 1; } - if self.kernel_segment_cache.len() >= self.max_cached_segments { - break; - } - if !self.has_kernel_segment_with_id(k_id) { - return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id)); - elems_added += 1; - } - if elems_added == max_elements / 3 { - break; + } + } + // Always ensure we explicitly ask for the very next kernel segment we are waiting on. + // The regular round-robin above can get saturated with outputs and rangeproofs while + // the desegmenter is blocked on a missing kernel, so we force this one in. + if let Some(next_kernel_idx) = self.next_required_kernel_segment_index() { + let seg_id = SegmentIdentifier { + height: self.default_kernel_segment_height, + idx: next_kernel_idx, + }; + let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id); + if !self.has_kernel_segment_with_id(seg_id) + && !return_vec.iter().any(|x| x == &next_kernel_seg_id) + { + if return_vec.len() >= max_elements { + return_vec.pop(); } + return_vec.push(next_kernel_seg_id); } } if return_vec.is_empty() && self.bitmap_cache.is_some() { @@ -708,12 +766,36 @@ impl Desegmenter { } } - /// Apply an output segment at the index cache - pub fn apply_output_segment(&mut self, idx: usize) -> Result<(), Error> { - let segment = self.output_segment_cache.remove(idx); + fn take_segment_batch( + cache: &mut Vec>, + start_idx: u64, + max_segments: usize, + ) -> Vec> { + let mut result = Vec::new(); + let mut next_idx = start_idx; + while result.len() < max_segments { + if let Some(pos) = cache.iter().position(|s| s.identifier().idx == next_idx) { + result.push(cache.remove(pos)); + next_idx += 1; + } else { + break; + } + } + result + } + + /// Apply a batch of output segments + pub fn apply_output_segments( + &mut self, + segments: Vec>, + ) -> Result<(), Error> { + if segments.is_empty() { + return Ok(()); + } trace!( - "pibd_desegmenter: applying output segment at segment idx {}", - segment.identifier().idx + "pibd_desegmenter: applying {} output segment(s) starting at idx {}", + segments.len(), + segments.first().map(|s| s.identifier().idx).unwrap_or(0) ); let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); @@ -724,7 +806,9 @@ impl Desegmenter { &mut batch, |ext, _batch| { let extension = &mut ext.extension; - extension.apply_output_segment(segment)?; + for segment in segments { + extension.apply_output_segment(segment)?; + } Ok(()) }, )?; @@ -821,12 +905,18 @@ impl Desegmenter { } } - /// Apply a rangeproof segment at the index cache - pub fn apply_rangeproof_segment(&mut self, idx: usize) -> Result<(), Error> { - let segment = self.rangeproof_segment_cache.remove(idx); + /// Apply a batch of rangeproof segments + pub fn apply_rangeproof_segments( + &mut self, + segments: Vec>, + ) -> Result<(), Error> { + if segments.is_empty() { + return Ok(()); + } trace!( - "pibd_desegmenter: applying rangeproof segment at segment idx {}", - segment.identifier().idx + "pibd_desegmenter: applying {} rangeproof segment(s) starting at idx {}", + segments.len(), + segments.first().map(|s| s.identifier().idx).unwrap_or(0) ); let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); @@ -837,7 +927,9 @@ impl Desegmenter { &mut batch, |ext, _batch| { let extension = &mut ext.extension; - extension.apply_rangeproof_segment(segment)?; + for segment in segments { + extension.apply_rangeproof_segment(segment)?; + } Ok(()) }, )?; @@ -920,12 +1012,16 @@ impl Desegmenter { } } - /// Apply a kernel segment at the index cache - pub fn apply_kernel_segment(&mut self, idx: usize) -> Result<(), Error> { - let segment = self.kernel_segment_cache.remove(idx); - trace!( - "pibd_desegmenter: applying kernel segment at segment idx {}", - segment.identifier().idx + /// Apply a batch of kernel segments at the index cache + pub fn apply_kernel_segments(&mut self, segments: Vec>) -> Result<(), Error> { + if segments.is_empty() { + return Ok(()); + } + let first_idx = segments.first().map(|s| s.identifier().idx).unwrap_or(0); + debug!( + "pibd_desegmenter: applying {} kernel segment(s) starting at idx {}", + segments.len(), + first_idx ); let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); @@ -936,7 +1032,20 @@ impl Desegmenter { &mut batch, |ext, _batch| { let extension = &mut ext.extension; - extension.apply_kernel_segment(segment)?; + for segment in segments { + let seg_idx = segment.identifier().idx; + if let Err(e) = extension.apply_kernel_segment(segment) { + error!( + "pibd_desegmenter: failed to apply kernel segment idx {}: {}", + seg_idx, e + ); + return Err(e); + } + debug!( + "pibd_desegmenter: successfully applied kernel segment idx {}", + seg_idx + ); + } Ok(()) }, )?; @@ -985,13 +1094,31 @@ impl Desegmenter { /// Adds a Kernel segment pub fn add_kernel_segment(&mut self, segment: Segment) -> Result<(), Error> { - trace!("pibd_desegmenter: add kernel segment"); - segment.validate( - self.archive_header.kernel_mmr_size, // Last MMR pos at the height being validated - None, - self.archive_header.kernel_root, // Kernel root we're checking for - )?; + let idx = segment.identifier().idx; + debug!( + "pibd_desegmenter: received kernel segment idx {} (cache size {})", + idx, + self.kernel_segment_cache.len() + ); + segment + .validate( + self.archive_header.kernel_mmr_size, // Last MMR pos at the height being validated + None, + self.archive_header.kernel_root, // Kernel root we're checking for + ) + .map_err(|e| { + error!( + "pibd_desegmenter: kernel segment idx {} failed validation: {}", + idx, e + ); + e + })?; self.cache_kernel_segment(segment); + debug!( + "pibd_desegmenter: cached kernel segment idx {} (cache size {})", + idx, + self.kernel_segment_cache.len() + ); Ok(()) } } diff --git a/chain/src/types.rs b/chain/src/types.rs index 7797acc3..370f4a95 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -16,6 +16,7 @@ use chrono::prelude::{DateTime, Utc}; use chrono::Duration; +use std::net::SocketAddr; use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; use crate::core::core::{pmmr, Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier}; @@ -155,14 +156,17 @@ pub struct PIBDSegmentContainer { pub identifier: SegmentTypeIdentifier, /// Time at which this request was made pub request_time: DateTime, + /// Peer that most recently received this request + pub last_peer: Option, } impl PIBDSegmentContainer { /// Return container with timestamp - pub fn new(identifier: SegmentTypeIdentifier) -> Self { + pub fn new(identifier: SegmentTypeIdentifier, peer_addr: Option) -> Self { Self { identifier, request_time: Utc::now(), + last_peer: peer_addr, } } } @@ -278,28 +282,38 @@ impl SyncState { } /// Update PIBD segment list - pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier) { + pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier, peer_addr: SocketAddr) { + debug!("sync_state: tracking PIBD request for {:?}", id); self.requested_pibd_segments .write() - .push(PIBDSegmentContainer::new(id.clone())); + .push(PIBDSegmentContainer::new(id.clone(), Some(peer_addr))); } /// Remove segment from list pub fn remove_pibd_segment(&self, id: &SegmentTypeIdentifier) { + debug!("sync_state: removing PIBD request tracking for {:?}", id); self.requested_pibd_segments .write() .retain(|i| &i.identifier != id); } /// Remove segments with request timestamps less than cutoff time - pub fn remove_stale_pibd_requests(&self, timeout_seconds: i64) { + pub fn remove_stale_pibd_requests( + &self, + timeout_seconds: i64, + ) -> Vec<(SegmentTypeIdentifier, Option)> { let cutoff_time = Utc::now() - Duration::seconds(timeout_seconds); + let mut removed_segments = vec![]; self.requested_pibd_segments.write().retain(|i| { if i.request_time <= cutoff_time { - debug!("Removing + retrying PIBD request after timeout: {:?}", i) - }; - i.request_time > cutoff_time + debug!("Removing + retrying PIBD request after timeout: {:?}", i); + removed_segments.push((i.identifier.clone(), i.last_peer)); + false + } else { + true + } }); + removed_segments } /// Check whether segment is in request list diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 2ee8b548..adc24a4c 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -15,6 +15,7 @@ use crate::util::RwLock; use std::collections::HashMap; use std::fs::File; +use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; @@ -774,6 +775,13 @@ impl>> PeersIter { } } + /// Filter to exclude peer by address. + pub fn exclude(self, addr: Option) -> PeersIter>> { + PeersIter { + iter: self.iter.filter(move |p| Some(p.info.addr.0) != addr), + } + } + /// Filter peers with the provided difficulty comparison fn. /// /// with_difficulty(|x| x > diff) diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index bdd8d1fe..aaee2e6a 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -250,9 +250,10 @@ impl StateSync { let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); - // Remove stale requests, if we haven't recieved the segment within a minute re-request + // Remove stale requests, if we haven't received the segment within a minute re-request // TODO: verify timing - self.sync_state + let stale_segments = self + .sync_state .remove_stale_pibd_requests(pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS); // Apply segments... TODO: figure out how this should be called, might @@ -280,12 +281,24 @@ impl StateSync { // (12 is divisible by 3, to try and evenly spread the requests among the 3 // main pmmrs. Bitmaps segments will always be requested first) next_segment_ids = d.next_desired_segments(pibd_params::SEGMENT_REQUEST_COUNT); + if !next_segment_ids.is_empty() { + debug!( + "state_sync: requesting next PIBD segments {:?}", + next_segment_ids + ); + } else { + trace!("state_sync: no PIBD segments requested this loop"); + } } // For each segment, pick a desirable peer and send message // (Provided we're not waiting for a response for this message from someone else) for seg_id in next_segment_ids.iter() { if self.sync_state.contains_pibd_segment(seg_id) { + debug!( + "state_sync: segment {:?} already requested, waiting for response", + seg_id + ); trace!("Request list contains, continuing: {:?}", seg_id); continue; } @@ -330,16 +343,26 @@ impl StateSync { self.set_earliest_zero_pibd_peer_time(None) } - // Choose a random "most work" peer, preferring outbound if at all possible. + // Choose a random "most work" peer, excluding peer from stale segment and preferring outbound if at all possible. + let excluded_peer = stale_segments + .iter() + .find(|(stale_id, _)| stale_id == seg_id) + .and_then(|(_, addr)| *addr); let peer = peers_iter_pibd() .outbound() + .exclude(excluded_peer) .choose_random() - .or_else(|| peers_iter_pibd().inbound().choose_random()); + .or_else(|| { + peers_iter_pibd() + .inbound() + .exclude(excluded_peer) + .choose_random() + }); trace!("Chosen peer is {:?}", peer); if let Some(p) = peer { // add to list of segments that are being tracked - self.sync_state.add_pibd_segment(seg_id); + self.sync_state.add_pibd_segment(seg_id, p.info.addr.0); let res = match seg_id.segment_type { SegmentType::Bitmap => p.send_bitmap_segment_request( archive_header.hash(), @@ -364,6 +387,23 @@ impl StateSync { p.info.addr, e ); self.sync_state.remove_pibd_segment(seg_id); + } else if let Some(prev_peer) = excluded_peer { + if p.info.addr.0 != prev_peer { + info!( + "state_sync: retrying segment {:?} with new peer {} (previously {})", + seg_id, p.info.addr, prev_peer + ); + } else { + debug!( + "state_sync: requested segment {:?} from peer {}", + seg_id, p.info.addr + ); + } + } else { + debug!( + "state_sync: requested segment {:?} from peer {}", + seg_id, p.info.addr + ); } } }