From 52949f825a98277db96ad850df0116740a6e29f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Wed, 10 Jun 2026 09:54:36 +0100 Subject: [PATCH] chore: add retries for retrieving chain data (#6847) * chore: add retries for retrieving chain data * added retry backoff --- common/nyxd-scraper-shared/src/rpc_client.rs | 76 ++++++++++++++++++-- 1 file changed, 70 insertions(+), 6 deletions(-) diff --git a/common/nyxd-scraper-shared/src/rpc_client.rs b/common/nyxd-scraper-shared/src/rpc_client.rs index 0d41e907e0..d57e090ee3 100644 --- a/common/nyxd-scraper-shared/src/rpc_client.rs +++ b/common/nyxd-scraper-shared/src/rpc_client.rs @@ -10,6 +10,7 @@ use crate::{Any, MessageRegistry, ParsedTransactionDetails, default_message_regi use futures::StreamExt; use futures::future::join3; use std::collections::BTreeMap; +use std::future::Future; use std::sync::Arc; use tendermint::{Block, Hash}; use tendermint_rpc::endpoint::{block, block_results, tx, validators}; @@ -18,6 +19,38 @@ use tokio::sync::Mutex; use tracing::{debug, instrument, warn}; use url::Url; +const MAX_QUERY_ATTEMPTS: usize = 3; + +/// Runs `op` up to `max_attempts` times (at least once), returning the first success or, on full +/// exhaustion, the last error encountered. +async fn query_with_retries(mut max_attempts: usize, op: F) -> Result +where + F: Fn() -> Fut, + Fut: Future>, +{ + if max_attempts == 0 { + max_attempts = 1; + } + + let mut last_err = None; + + for i in 0..max_attempts { + match op().await { + Ok(result) => return Ok(result), + Err(err) => { + debug!("query failed, retrying {}/{max_attempts} - {err}", i + 1); + last_err = Some(err); + } + } + + tokio::time::sleep(std::time::Duration::from_millis(300 * (i as u64 + 1))).await; + } + + // SAFETY: max_attempts >= 1, so we only reach here after at least one recorded failure + #[allow(clippy::unwrap_used)] + Err(last_err.unwrap()) +} + #[derive(Debug, Clone, Copy)] pub struct RetrievalConfig { pub get_validators: bool, @@ -173,13 +206,24 @@ impl RpcClient { }) } + #[instrument(skip(self), err(Display))] + async fn get_block_results_with_retries( + &self, + height: u32, + max_attempts: usize, + ) -> Result { + query_with_retries(max_attempts, || self.get_block_results(height)).await + } + async fn maybe_get_block_results( &self, height: u32, retrieve: bool, ) -> Result, ScraperError> { if retrieve { - self.get_block_results(height).await.map(Some) + self.get_block_results_with_retries(height, MAX_QUERY_ATTEMPTS) + .await + .map(Some) } else { Ok(None) } @@ -219,8 +263,6 @@ impl RpcClient { // "Data is just a wrapper for a list of transactions, where transactions are arbitrary byte arrays" // source: https://github.com/tendermint/spec/blob/d46cd7f573a2c6a2399fcab2cde981330aa63f37/spec/core/data_structures.md#data - // - // I hate that zip as much as you, dear reader, but for some reason the compiler didn't let me remove the `move` futures::stream::iter( raw.iter() .map(tx_hash) @@ -228,12 +270,14 @@ impl RpcClient { .zip(std::iter::repeat(ordered_results.clone())), ) .for_each_concurrent(4, |((id, tx_hash), ordered_results)| async move { - let res = self.get_transaction_result(tx_hash).await; + let res = self + .get_transaction_result_with_retries(tx_hash, MAX_QUERY_ATTEMPTS) + .await; ordered_results.lock().await.insert(id, res); }) .await; - // safety the futures have completed so we MUST have the only arc reference + // safety: the futures have completed so we MUST have the only arc reference #[allow(clippy::unwrap_used)] let inner = Arc::into_inner(ordered_results).unwrap().into_inner(); @@ -266,6 +310,15 @@ impl RpcClient { }) } + #[instrument(skip(self, tx_hash), fields(tx_hash = %tx_hash), err(Display))] + async fn get_transaction_result_with_retries( + &self, + tx_hash: Hash, + max_attempts: usize, + ) -> Result { + query_with_retries(max_attempts, || self.get_transaction_result(tx_hash)).await + } + #[instrument(skip(self))] pub async fn get_validators_details( &self, @@ -282,13 +335,24 @@ impl RpcClient { }) } + #[instrument(skip(self), err(Display))] + async fn get_validators_details_with_retries( + &self, + height: u32, + max_attempts: usize, + ) -> Result { + query_with_retries(max_attempts, || self.get_validators_details(height)).await + } + async fn maybe_get_validators_details( &self, height: u32, retrieve: bool, ) -> Result, ScraperError> { if retrieve { - self.get_validators_details(height).await.map(Some) + self.get_validators_details_with_retries(height, MAX_QUERY_ATTEMPTS) + .await + .map(Some) } else { Ok(None) }